In [17]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate() 
sc = spark.sparkContext

### 5.1 map (NARROW) — change each item 
#### ● What: One input → one output, element by element. 
#### ● Why: Clean, fast; stays local; great for simple conversions. 
#### ● Tiny example (RDD): 

In [18]:
lines = sc.parallelize(["Spark fast", "PySpark easy"]) 
lengths = lines.map(lambda s: len(s))   # lazy 
lengths.take(2) 

[10, 12]

### 5.2 flatMap (NARROW) — expand/split each item 
#### ● What: One input → many outputs (e.g., split sentences into words). 
#### ● Why: Perfect for tokenizing text. 
#### ● Tiny example (RDD): 

In [19]:
words = lines.flatMap(lambda s: s.split()) 
words.take(5)

['Spark', 'fast', 'PySpark', 'easy']

### 5.3 filter (NARROW) — keep rows that match 
#### ● What: Keep only items that pass a condition.
#### ● Why: Remove noise early (before wide steps). 
#### ● Tiny example (RDD):

In [20]:
long_words = words.filter(lambda w: len(w) >= 5) 
long_words.take(5)

['Spark', 'PySpark']

### 5.4 withColumn / select (DataFrame, NARROW)
#### ● What: Add/change columns; choose columns. 
#### ● Why: Column-wise transforms are local (narrow). 
#### ● Tiny example (DF): 

In [21]:
from pyspark.sql import functions as F 
# Create DataFrame
df = spark.createDataFrame(
    [
        ("Ravi", 2, 100.0),
        ("Sita", 4, 200.0)
    ],
    ["name", "qty", "price"]
)

# Show the DataFrame
df.show()
 
df2 = df.withColumn("amount", 
F.col("qty")*F.col("price")).select("name","amount") 
df2.show()

+----+---+-----+
|name|qty|price|
+----+---+-----+
|Ravi|  2|100.0|
|Sita|  4|200.0|
+----+---+-----+

+----+------+
|name|amount|
+----+------+
|Ravi| 200.0|
|Sita| 800.0|
+----+------+



### 5.5 groupByKey vs reduceByKey (RDD, WIDE) 
#### ● What: Both group by key, but reduceByKey is more efficient for sums/counts. 
#### ● Why: reduceByKey combines values before shuffling (less data moved). 
#### ● Tiny example (RDD):

In [22]:
pairs = sc.parallelize([("a",1),("a",1),("b",1),("a",1),("b",1)])
# groupByKey -> heavier shuffle
g = pairs.groupByKey()
sum_g = g.mapValues(lambda it: sum(it))
sum_g.collect()   # ACTION -> [('a', 3), ('b', 2)]
# reduceByKey -> preferred
sum_r = pairs.reduceByKey(lambda a,b: a+b)
sum_r.collect()

[('a', 3), ('b', 2)]

### 5.6 join (WIDE) 
#### ● What: Combine two datasets by key. 
#### ● Why: Very common; can be expensive. Use broadcast for small tables (in DF API). 
#### ● Tiny example (DF with broadcast):

In [23]:
left  = spark.createDataFrame([(1,"Ravi"),(2,"Sita")], 
["id","name"]) 
right = spark.createDataFrame([(1,"East"),(2,"West")], 
["id","region"]) 
from pyspark.sql.functions import broadcast 
joined = left.join(broadcast(right), "id", "left") 
joined.show()

+---+----+------+
| id|name|region|
+---+----+------+
|  1|Ravi|  East|
|  2|Sita|  West|
+---+----+------+



### 5.7 orderBy/sort (WIDE) 
#### ● What: Sort rows. 
#### ● Why: Forces shuffle + sort; use only when needed. 
#### ● Tiny example (DF): 

In [25]:
df2 = df.withColumn("amount", F.col("qty") * F.col("price"))
df2.orderBy(F.desc("amount")).show()


# df.orderBy(F.desc("amount")).show()

+----+---+-----+------+
|name|qty|price|amount|
+----+---+-----+------+
|Sita|  4|200.0| 800.0|
|Ravi|  2|100.0| 200.0|
+----+---+-----+------+



### 5.8 distinct/dropDuplicates (WIDE) 
#### ● What: Remove duplicates. 
#### ● Why: Requires shuffling to find duplicates across partitions. 
#### ● Tiny example (DF): 

In [26]:
df.select("name").distinct().show()

+----+
|name|
+----+
|Ravi|
|Sita|
+----+



In [28]:
from pyspark import StorageLevel 
pairs = sc.parallelize([("a",1),("a",1),("b",1)]) 
hot = pairs.reduceByKey(lambda a,b: a+b)         
hot.persist(StorageLevel.MEMORY_AND_DISK)        
hot.count()
print(hot.take(5))                               
hot.unpersist()

[('a', 2), ('b', 1)]


PythonRDD[137] at RDD at PythonRDD.scala:56

In [29]:
from pyspark.sql import functions as F 
words_df = spark.createDataFrame([("a a b",), ("b c c",), ("a c",)], ["line"]).select(F.explode(F.split("line", r"\s+")).alias("word")) 
counts_df = words_df.groupBy("word").agg(F.count("*").alias("cnt")) 
hot_df = counts_df.cache()
hot_df.count()
hot_df.show(5)
hot_df.unpersist()

+----+---+
|word|cnt|
+----+---+
|   c|  3|
|   b|  2|
|   a|  3|
+----+---+



DataFrame[word: string, cnt: bigint]

In [30]:
pairs = sc.parallelize([("a",1),("a",1),("b",2),("c",3)]) 
reduced = pairs.reduceByKey(lambda a,b: a+b)   # WIDE 
reduced_p = reduced.partitionBy(4)   # WIDE
# (one-time), 4 partitions 
# Inspect partition sizes 
sizes = reduced_p.mapPartitions(lambda it: [sum(1 for _ in 
it)]).collect() 
print("records per partition:", sizes)

records per partition: [0, 0, 2, 1]


In [31]:
text   = sc.parallelize(["a a b", "b c c", "a c"]) 
words  = text.flatMap(lambda s: s.split())            # NARROW
pairs  = words.map(lambda w: (w,1))                   # NARROW
counts = pairs.reduceByKey(lambda a,b: a+b)           # WIDE
# (shuffle) 
print(counts.collect())

[('c', 3), ('a', 3), ('b', 2)]


In [32]:
from pyspark.sql import functions as F 
df  = spark.createDataFrame([("a a b",), ("b c c",), ("a c",)], ["line"]) 
out = (df .select(F.explode(F.split("line", r"\s+")).alias("word")).groupBy("word").agg(F.count("*").alias("cnt")).orderBy(F.desc("cnt")))
out.show()

+----+---+
|word|cnt|
+----+---+
|   a|  3|
|   c|  3|
|   b|  2|
+----+---+



In [33]:
# --- Setup --- 
from pyspark.sql import SparkSession, functions as F 
from pyspark import StorageLevel 


spark = SparkSession.builder.appName("Beginner-Chapter").getOrCreate() 
sc = spark.sparkContext


# --- RDD pipeline: narrow -> wide -> action --- 
text   = sc.parallelize(["a a b", "b c c", "a c"]) 
words  = text.flatMap(lambda s: s.split())
# narrow (lazy) 
pairs  = words.map(lambda w: (w,1))
# narrow (lazy) 
counts = pairs.reduceByKey(lambda a,b: a+b)
# wide (shuffle, lazy) 
print("RDD counts:", counts.collect())
# ACTION


# --- Cache / persist (RDD) ---
hot = pairs.reduceByKey(lambda a,b: a+b)
# wide 
hot.persist(StorageLevel.MEMORY_AND_DISK)
hot.count()

# ACTION (materialize)
print("Top after cache:", hot.take(10))
hot.unpersist()


# --- Partition by key (RDD) ---
counts_p = counts.partitionBy(4)
# one-time reshuffle 
sizes = counts_p.mapPartitions(lambda it: [sum(1 for _ in 
it)]).collect()
print("Records per partition:", sizes)
# ACTION

# --- DataFrame version (same task, optimized) --- 
df  = spark.createDataFrame([("a a b",), ("b c c",), ("a c",)], ["line"]) 
out = (df.select(F.explode(F.split("line", r"\s+")).alias("word"))  # narrow 
        .groupBy("word").agg(F.count("*").alias("cnt"))
# wide 
.orderBy(F.desc("cnt")))
# wide 
out.show()
# ACTION 


# Peek at physical plan (how Spark will execute) 
out.explain("extended")

RDD counts: [('c', 3), ('a', 3), ('b', 2)]
Top after cache: [('c', 3), ('a', 3), ('b', 2)]
Records per partition: [0, 0, 2, 1]
+----+---+
|word|cnt|
+----+---+
|   a|  3|
|   c|  3|
|   b|  2|
+----+---+

== Parsed Logical Plan ==
'Sort ['cnt DESC NULLS LAST], true
+- Aggregate [word#279], [word#279, count(1) AS cnt#280L]
   +- Project [word#279]
      +- Generate explode(split(line#277, \s+, -1)), false, [word#279]
         +- LogicalRDD [line#277], false

== Analyzed Logical Plan ==
word: string, cnt: bigint
Sort [cnt#280L DESC NULLS LAST], true
+- Aggregate [word#279], [word#279, count(1) AS cnt#280L]
   +- Project [word#279]
      +- Generate explode(split(line#277, \s+, -1)), false, [word#279]
         +- LogicalRDD [line#277], false

== Optimized Logical Plan ==
Sort [cnt#280L DESC NULLS LAST], true
+- Aggregate [word#279], [word#279, count(1) AS cnt#280L]
   +- Generate explode(split(line#277, \s+, -1)), [0], false, [word#279]
      +- LogicalRDD [line#277], false

== Physical P