In [0]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("RDD Functions").getOrCreate()

MAP Function

In [0]:
data = ["Project","Gutenberg’s","Alice’s","Adventures",
"in","Wonderland","Project","Gutenberg’s","Adventures",
"in","Wonderland","Project","Gutenberg’s"]
rdd=spark.sparkContext.parallelize(data)
rdd.collect()

Out[2]: ['Project',
 'Gutenberg’s',
 'Alice’s',
 'Adventures',
 'in',
 'Wonderland',
 'Project',
 'Gutenberg’s',
 'Adventures',
 'in',
 'Wonderland',
 'Project',
 'Gutenberg’s']

In [0]:
rdd2=rdd.map(lambda x: (x,1))
rdd2.collect()

Out[3]: [('Project', 1),
 ('Gutenberg’s', 1),
 ('Alice’s', 1),
 ('Adventures', 1),
 ('in', 1),
 ('Wonderland', 1),
 ('Project', 1),
 ('Gutenberg’s', 1),
 ('Adventures', 1),
 ('in', 1),
 ('Wonderland', 1),
 ('Project', 1),
 ('Gutenberg’s', 1)]

In [0]:
rdd2.count()

Out[4]: 13

In [0]:
data = [('James','Smith','M',3000),
  ('Anna','Rose','F',2500),
  ('Robert','Williams','M',1800), 
]

columns = ["firstname","lastname","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)
df.show()

+---------+--------+------+------+
|firstname|lastname|gender|salary|
+---------+--------+------+------+
|    James|   Smith|     M|  3000|
|     Anna|    Rose|     F|  2500|
|   Robert|Williams|     M|  1800|
+---------+--------+------+------+



In [0]:
rdd_map=df.rdd.map(lambda x: (x[0]+" "+x[1],x[2],x[3]*2))
rdd_map.collect()

Out[6]: [('James Smith', 'M', 6000),
 ('Anna Rose', 'F', 5000),
 ('Robert Williams', 'M', 3600)]

In [0]:
df2=rdd_map.toDF(["name","gender","salary_hike"])
df2.show()

+---------------+------+-----------+
|           name|gender|salary_hike|
+---------------+------+-----------+
|    James Smith|     M|       6000|
|      Anna Rose|     F|       5000|
|Robert Williams|     M|       3600|
+---------------+------+-----------+



In [0]:
def map_func(x):
    first_name=x.firstname
    last_name=x.lastname
    name=first_name+" "+last_name
    gender=x.gender.lower()
    salary=x.salary*2
    return (name,gender,salary)

rdd_res=df.rdd.map(lambda x:map_func(x))
rdd_res.collect()

Out[8]: [('James Smith', 'm', 6000),
 ('Anna Rose', 'f', 5000),
 ('Robert Williams', 'm', 3600)]

In [0]:
column=["name","gender","salary_hike"]
df3=rdd_res.toDF(schema=column)
df3.show()

+---------------+------+-----------+
|           name|gender|salary_hike|
+---------------+------+-----------+
|    James Smith|     m|       6000|
|      Anna Rose|     f|       5000|
|Robert Williams|     m|       3600|
+---------------+------+-----------+



FLAT MAP Function

In [0]:
rdd=spark.sparkContext.range(5,10,1)
rdd.collect()

Out[10]: [5, 6, 7, 8, 9]

In [0]:
rdd2=rdd.flatMap(lambda x: (x,x*2,x*3))
rdd2.collect()

Out[11]: [5, 10, 15, 6, 12, 18, 7, 14, 21, 8, 16, 24, 9, 18, 27]

In [0]:
rdd_text = spark.sparkContext.parallelize(["If you want to split a text file"])
rdd_text.collect()

Out[12]: ['If you want to split a text file']

In [0]:
rdd2=rdd_text.flatMap(lambda x : x.split(" "))
rdd2.collect()

Out[13]: ['If', 'you', 'want', 'to', 'split', 'a', 'text', 'file']


REDUCE : this functionused to reduce dataset in single value

In [0]:
list_rdd = spark.sparkContext.parallelize([1,2,3,4,5,3,2])
print("Min : "+ str(list_rdd.reduce(lambda x, y: min(x, y))))
print("Max: "+ str(list_rdd.reduce(lambda x, y: max(x,y))))
print("Sum : "+ str(list_rdd.reduce(lambda x,y: x+y)))
print("Multiply : "+ str(list_rdd.reduce(lambda x,y: x*y)))


Min : 1
Max: 5
Sum : 20
Multiply : 720


reduceByKey

In [0]:
rdd2.collect()

Out[16]: ['If', 'you', 'want', 'to', 'split', 'a', 'text', 'file']

In [0]:
rdd_1=rdd2.map(lambda x : (x,1))
res= rdd_1.reduceByKey(lambda x,y : (x+y))
res.collect()

Out[17]: [('you', 1),
 ('a', 1),
 ('text', 1),
 ('If', 1),
 ('to', 1),
 ('want', 1),
 ('split', 1),
 ('file', 1)]

sortByKey

In [0]:
rdd_list = spark.sparkContext.parallelize([['C', 10], ['H', 4], ['A', 6], ['L', 8]])
rdd_sort = rdd_list.sortByKey()
rdd_sort.collect()


Out[18]: [('A', 6), ('C', 10), ('H', 4), ('L', 8)]

In [0]:
rdd_list.sortByKey(False).collect()

Out[19]: [('L', 8), ('H', 4), ('C', 10), ('A', 6)]

groupByKey

In [0]:
rdd_list = spark.sparkContext.parallelize([['C', 10], ['H', 4], ['A', 6], ['L', 8], ['A', 6], ['C', 12], ['L', 2], ['H', 4], ['A', 6], ['C', 13], ['L', 6], ['H', 4]])
rdd_list.collect()

Out[20]: [['C', 10],
 ['H', 4],
 ['A', 6],
 ['L', 8],
 ['A', 6],
 ['C', 12],
 ['L', 2],
 ['H', 4],
 ['A', 6],
 ['C', 13],
 ['L', 6],
 ['H', 4]]

In [0]:
rdd_group_key = rdd_list.groupByKey()
rdd_group_key.collect()

Out[21]: [('H', <pyspark.resultiterable.ResultIterable at 0x77f910f98f10>),
 ('C', <pyspark.resultiterable.ResultIterable at 0x77f910f92310>),
 ('L', <pyspark.resultiterable.ResultIterable at 0x77f910f922b0>),
 ('A', <pyspark.resultiterable.ResultIterable at 0x77f910f92fa0>)]

In [0]:
rdd_result = rdd_group_key.map(lambda x: (x[0], list(x[1])))
rdd_result.collect()


Out[22]: [('H', [4, 4, 4]), ('C', [10, 12, 13]), ('L', [8, 2, 6]), ('A', [6, 6, 6])]