####RDD

In [0]:
from pyspark.sql import SparkSession



2️⃣ Creating RDDs


In [0]:
spark = SparkSession.builder.appName("Create RDD").getOrCreate()
sc = spark.sparkContext

# Create RDD from list
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
print("RDD Elements:", rdd.collect())




RDD Elements: [1, 2, 3, 4, 5]


##3️⃣ RDD Transformations
Transformations create new RDDs from existing ones without modifying the original RDD.



In [0]:
#map() - Apply function to each element
rdd =  sc.parallelize([1,2,3,4,5])
sq=rdd.map(lambda x : x**2)
print("Squared Numbers" , sq.collect())

Squared Numbers [1, 4, 9, 16, 25]


In [0]:
#flatMap() - Flatten nested structures
rdd = sc.parallelize(["hello world", "pyspark tutorial"])
words_rdd = rdd.flatMap(lambda line: line.split(" "))
print("Flattened Words:", words_rdd.collect())


Flattened Words: ['hello', 'world', 'pyspark', 'tutorial']


In [0]:
#filter() - Keep only matching elements
rdd = sc.parallelize([1,2,3,4,5,6])
even = rdd.filter(lambda x:x%2==0)
print("even" , even.collect())

even [2, 4, 6]


In [0]:
#distinct() - Remove duplicates
rdd = sc.parallelize([1, 2, 2, 3, 3, 4])
distinct_rdd = rdd.distinct()
print("Distinct Elements:", distinct_rdd.collect())


#union() - Combine RDDs
rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize([3, 4, 5])
union_rdd = rdd1.union(rdd2)
print("Union Result:", union_rdd.collect())


#intersection() - Find common elements
int_rdd = rdd1.intersection(rdd2)
print("intersection : " , int_rdd.collect())

#cartesian() - Get Cartesian Product
cartesian_rdd = rdd1.cartesian(rdd2)
print("Cartesian Product:", cartesian_rdd.collect())


Distinct Elements: [1, 2, 3, 4]
Union Result: [1, 2, 3, 3, 4, 5]
intersection :  [3]
Cartesian Product: [(1, 3), (1, 4), (1, 5), (2, 3), (2, 4), (2, 5), (3, 3), (3, 4), (3, 5)]


##4️⃣ RDD Actions
Actions return results from an RDD to the driver or store them in an external storage.



In [0]:
#collect() - Retrieve all elements
rdd= sc.parallelize([1,2,3,4])
print(rdd.collect())

print("Count : ",rdd.count())
print("First 2 Elements:", rdd.take(2))

sum_rdd = rdd.reduce(lambda x, y: x + y)
print("Sum of Elements:", sum_rdd)

rdd.foreach(lambda x: print(f"Processing: {x}"))


[1, 2, 3, 4]
Count :  4
First 2 Elements: [1, 2]
Sum of Elements: 10


##5️⃣ RDD Persistence & Caching

cache() → Stores RDD in memory for faster access
persist(level) → Stores RDD with different storage levels (memory, disk, etc.)
unpersist() → Removes RDD from memory

In [0]:
rdd = sc.parallelize([1, 2, 3, 4])
rdd.cache()
print("Cached Data:", rdd.collect())


Cached Data: [1, 2, 3, 4]


In [0]:
from pyspark import StorageLevel

rdd.persist(StorageLevel.MEMORY_AND_DISK)
print("Persisted Data:", rdd.collect())


[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-1263253622418682>:3[0m
[1;32m      1[0m [38;5;28;01mfrom[39;00m [38;5;21;01mpyspark[39;00m [38;5;28;01mimport[39;00m StorageLevel
[0;32m----> 3[0m rdd[38;5;241m.[39mpersist(StorageLevel[38;5;241m.[39mMEMORY_AND_DISK)
[1;32m      4[0m [38;5;28mprint[39m([38;5;124m"[39m[38;5;124mPersisted Data:[39m[38;5;124m"[39m, rdd[38;5;241m.[39mcollect())

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m [38;5;28;01mtry[39;00m:
[0;32m---> 48[0m     res [38;5;241m=[39m [43mfunc[49m[43m([49m[38;5;241;43m*[39;49m[43margs[49m[43m,[49m[43m [49m[38;5;241;43m*[39;49m[38;5;241

###6️⃣ Key-Value Pair RDDs


In [0]:
#mapValues() - Transform values
pair_rdd = sc.parallelize([(1,2),(3,4)])
mapped_rdd = pair_rdd.mapValues(lambda x : x**2)
print("Mapped Key-Value Pairs:", mapped_rdd.collect())


#reduceByKey() - Aggregate by key
pair_rdd = sc.parallelize([("a", 1), ("b", 2), ("a", 2)])
reduced_rdd = pair_rdd.reduceByKey(lambda x, y: x + y)
print("Reduced by Key:", reduced_rdd.collect())



Mapped Key-Value Pairs: [(1, 4), (3, 16)]
Reduced by Key: [('a', 3), ('b', 2)]


In [0]:
#groupByKey() - Group values by key
grouped_rdd = pair_rdd.groupByKey().mapValues(list)
print("Grouped Data:", grouped_rdd.collect())

#sortByKey() - Sort Key-Value RDD
sort = pair_rdd.sortByKey()
print("Sorted : ", sort.collect())


Grouped Data: [('a', [1, 2]), ('b', [2])]
Sorted :  [('a', 1), ('a', 2), ('b', 2)]


##7️⃣ RDD Partitions & Parallelism

In [0]:
#glom() - Get elements per partition
rdd = sc.parallelize([1,2,3,4,5], numSlices=4)
print("Partition Data : ", rdd.glom().collect())

#repartition() - Change number of partitions
repartitioned_rdd = rdd.repartition(2)
print("Repartitioned Data:", repartitioned_rdd.glom().collect())


#coalesce() - Reduce partitions
coalesced_rdd = rdd.coalesce(2)
print("Coalesced Data:", coalesced_rdd.glom().collect())




Partition Data :  [[1], [2], [3], [4, 5]]
Repartitioned Data: [[3, 4, 5], [1, 2]]
Coalesced Data: [[1, 2], [3, 4, 5]]


###8️⃣ RDD Lineage & DAG


In [0]:
print("RDD Lineage:", rdd.toDebugString().decode("utf-8"))


RDD Lineage: (4) ParallelCollectionRDD[126] at readRDDFromInputStream at PythonRDD.scala:435 []


##9️⃣ Fault Tolerance in RDD
Checkpointing

In [0]:
# sc.setCheckpointDir("FileStore/")
# rdd.checkpoint()
# print("Checkpointing Done")


##🔟 Advanced RDD Concepts
Broadcast Variables - Efficient Large Variable Distribution

In [0]:
broadcast_var = sc.broadcast([1, 2, 3, 4])
print("Broadcast Value:", broadcast_var.value)

accumulator = sc.accumulator(0)

def add_to_acc(x):
    global accumulator
    accumulator += x

rdd.foreach(add_to_acc)
print("Accumulated Value:", accumulator.value)


Broadcast Value: [1, 2, 3, 4]
Accumulated Value: 15


##🔹 Bonus: Monte Carlo Pi Calculation using RDD


In [0]:
from random import random

def inside(_):
    x, y = random(), random()
    return 1 if x*x + y*y <= 1 else 0

NUM_SAMPLES = 1000000
rdd = sc.parallelize(range(NUM_SAMPLES)).map(inside)
count = rdd.reduce(lambda a, b: a + b)

pi_estimate = 4.0 * count / NUM_SAMPLES
print("Estimated Pi:", pi_estimate)


Estimated Pi: 3.139488
