In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize Spark session
spark = SparkSession.builder.appName("AllTransformations").getOrCreate()

# Sample RDD and DataFrame
rdd = spark.sparkContext.parallelize([("Alice", 10), ("Bob", 20), ("Alice", 10), ("Charlie", 30)])
df = spark.createDataFrame([("Alice", 10), ("Bob", 20), ("Alice", 10), ("Charlie", 30)], ["Name", "Score"])

print(rdd.collect())
df.show()

[('Alice', 10), ('Bob', 20), ('Alice', 10), ('Charlie', 30)]
+-------+-----+
|   Name|Score|
+-------+-----+
|  Alice|   10|
|    Bob|   20|
|  Alice|   10|
|Charlie|   30|
+-------+-----+



In [0]:
# map() - Narrow

# Applies the lambda function to each element of the RDD.
# map() is an RDD transformation

mapped_rdd = rdd.map(lambda x: (x[0], x[1] * 2))
print(mapped_rdd.collect())


[('Alice', 20), ('Bob', 40), ('Alice', 20), ('Charlie', 60)]


In [0]:
# filter() - Narrow
# Keeps only the elements of the RDD that satisfy the condition.

filtered_rdd = rdd.filter(lambda x: x[1] > 15)
print(filtered_rdd.collect())



[('Bob', 20), ('Charlie', 30)]


In [0]:
# flatMap() - Narrow

# Similar to map(), but flattens the results.
rdd_words = spark.sparkContext.parallelize(["hello world", "good morning"])

flat_mapped_rdd = rdd_words.flatMap(lambda x: x.split(" "))
print(flat_mapped_rdd.collect())


['hello', 'world', 'good', 'morning']


In [0]:
# withColumn() - Narrow

# creates a new column DoubleScore that is Score * 2
df_with_new = df.withColumn("DoubleScore", col("Score") * 2)
df_with_new.show()


+-------+-----+-----------+
|   Name|Score|DoubleScore|
+-------+-----+-----------+
|  Alice|   10|         20|
|    Bob|   20|         40|
|  Alice|   10|         20|
|Charlie|   30|         60|
+-------+-----+-----------+



In [0]:

# select() - Narrow

# Retrieve specific columns from a DataFrame.
df_selected = df.select("Name")
df_selected.show()


+-------+
|   Name|
+-------+
|  Alice|
|    Bob|
|  Alice|
|Charlie|
+-------+



In [0]:
# coalesce() - Narrow

# Avoids full data shuffle by just merging existing partitions.
df_coalesced = df.coalesce(1)
print(df_coalesced.rdd.getNumPartitions())


1


In [0]:
# union() - Wide

# may involve a shuffle, depending on partition alignment.
rdd2 = spark.sparkContext.parallelize([("David", 40)])
union_rdd = rdd.union(rdd2)
print(union_rdd.collect())


df2 = spark.createDataFrame([("Charlie", 35), ("David", 40)], ["Name", "Age"])


df_union = df.union(df2)
df_union.show()

[('Alice', 10), ('Bob', 20), ('Alice', 10), ('Charlie', 30), ('David', 40)]
+-------+-----+
|   Name|Score|
+-------+-----+
|  Alice|   10|
|    Bob|   20|
|  Alice|   10|
|Charlie|   30|
|Charlie|   35|
|  David|   40|
+-------+-----+



In [0]:
# distinct() - Wide

# Removes duplicate rows or elements.
distinct_rdd = rdd.distinct()
print(distinct_rdd.collect())

distinct_df = df.distinct()
distinct_df.show()

[('Bob', 20), ('Charlie', 30), ('Alice', 10)]
+-------+-----+
|   Name|Score|
+-------+-----+
|  Alice|   10|
|    Bob|   20|
|Charlie|   30|
+-------+-----+

