In [1]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("RDD-Step-By-Step")
    .master("local[2]")
    .config("spark.ui.enabled", "true")
    .config("spark.ui.port", "4040")
    .config("spark.driver.bindAddress", "127.0.0.1")
    .config("spark.driver.host", "127.0.0.1")
    .getOrCreate()
)

sc = spark.sparkContext
sc.setLogLevel("WARN")

print("Spark Version:", spark.version)
print("Spark UI:", sc.uiWebUrl)   # click this / copy to browser


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/02/23 15:57:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark Version: 3.5.4
Spark UI: http://127.0.0.1:4040


In [2]:
data = [1, 2, 3, 4]
rdd = sc.parallelize(data)  # Spark decides partitions (default)

print("RDD:", rdd)
print("Num partitions:", rdd.getNumPartitions())


RDD: ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:289
Num partitions: 2


In [3]:
rdd2 = sc.parallelize(data, 2)   # explicitly 2 partitions
print("Num partitions:", rdd2.getNumPartitions())


Num partitions: 2


In [4]:
# glom() groups items inside each partition
rdd2.glom().collect()


                                                                                

[[1, 2], [3, 4]]

In [5]:
rdd2.mapPartitionsWithIndex(lambda i, it: [(i, list(it))]).collect()


[(0, [1, 2]), (1, [3, 4])]

In [6]:
mapped = rdd2.map(lambda x: x * 10)
print(mapped)          # only shows RDD object
mapped.collect()       # action triggers computation


PythonRDD[4] at RDD at PythonRDD.scala:53


[10, 20, 30, 40]

In [7]:
filtered = rdd2.filter(lambda x: x % 2 == 0)
filtered.collect()


[2, 4]

In [8]:
print("count:", rdd2.count())
print("sum:", rdd2.sum())
print("take(3):", rdd2.take(3))


count: 4
sum: 10
take(3): [1, 2, 3]


In [9]:
rdd2.reduce(lambda a, b: a + b)


10

In [10]:
lines = sc.parallelize(["hello world", "spark is fast"], 2)
words = lines.flatMap(lambda s: s.split())
words.collect()


['hello', 'world', 'spark', 'is', 'fast']

In [11]:
wc = (
    words
    .map(lambda w: (w, 1))
    .reduceByKey(lambda a, b: a + b)
)
wc.collect()


                                                                                

[('hello', 1), ('world', 1), ('fast', 1), ('spark', 1), ('is', 1)]

In [12]:
spark.stop()


In [13]:
# ReduceBy Key 
# Sum of marks per student 
rdd=sc.parallelize([
    ("A",10),
    ("A",20),
    ("B",30),
    ("B",50)
])

rdd.reduceByKey(lambda a,b:a+b).collect()

                                                                                

[('A', 30), ('B', 80)]

In [14]:
# combineBykey
combined=rdd.combineByKey(
    lambda v:(v,1),
    lambda acc,v:(acc[0]+v,acc[1]+1),
    lambda acc1,acc2:(acc[0]+acc2[0],acc[1]+acc2[1])
)


In [15]:
combined.collect()


[('A', (30, 2)), ('B', (80, 2))]

In [None]:
from operator import add 

rdd=sc.parallelize([
    ("mumbai",200),
    ("mumbai",300),
    ("Delhi",150),
    ("Delhi",250),
    ("Delhi",100)
],2)

print("Step 0 : Parttion data ")
print(rdd.glom().collect())
print("-"*70)

def local_combine(partition_index,iterator):
    acc_map{}
    print("-----Partition{partiton_index}")
    for k,v in iterator:
        if k not in acc_map:
            acc_map[k]=(v,1)

`print (f"\n=======Partition{partition_index} START -------")
def merge_combiners(acc1,acc2):
    # (sum,count(1) + sum2,count2)
final_combined=local_combined.reduceByKey(merge_combiners)
print(f"p{partition_index}:createCombiner for {k} with )

In [22]:
spark.stop()

In [23]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("spark_internal")
    .master("local[2]")
    .config("spark.ui.enabled", "true")
    .config("spark.ui.port", "4040")
    .config("spark.driver.bindAddress", "127.0.0.1")
    .config("spark.driver.host", "127.0.0.1")
    .getOrCreate()
)

sc = spark.sparkContext
sc.setLogLevel("WARN")

print("Spark Version:", spark.version)
print("Spark UI:", sc.uiWebUrl)   # click this / copy to browser

Spark Version: 3.5.4
Spark UI: http://127.0.0.1:4040


In [24]:
rdd=sc.parallelize([1,2,3,4])
result=(
    rdd
    .map(lambda x:("num",x*2)) #Narrow
    .filter(lambda x:x[1]>2) # Narrow
    .reduceByKey(lambda a,b:a+b) #Wide 
)

In [25]:
result.collect()

[('num', 18)]

26/02/24 01:14:17 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 389147 ms exceeds timeout 120000 ms
26/02/24 01:14:17 WARN SparkContext: Killing executors is not supported by current scheduler.
26/02/24 01:14:25 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:642)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1223)
	at o