# Resilient Distributed Dataset (RDD)

RDD (Resilient Distributed Dataset) is a fundamental building block of Pyspark which is fault tolerant, immutable distributed collection of objects.

Once you create a record in an RDD you cannot change it. Each record in RDD is divided into logical partition which can be computed on different node of cluster.

<img src='https://i0.wp.com/sparkbyexamples.com/wp-content/uploads/2020/08/rdd-creation.png?resize=768%2C477&ssl=1' />

## Creatiing Spark Session

In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
import findspark
findspark.init()

In [3]:
spark = SparkSession.builder.appName('Session 2').getOrCreate()

In [4]:
spark

## Creating RDD

In [5]:
data = [23, 45, 67, 86, 78, 3, 4, 5, 6, 10, 11, 12, 23, 45, 67, 10]

In [6]:
rdd1 = spark.sparkContext.parallelize(data)

In [7]:
rdd1

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274

In [8]:
type(rdd1)

pyspark.rdd.RDD

In [9]:
rdd1.collect()

[23, 45, 67, 86, 78, 3, 4, 5, 6, 10, 11, 12, 23, 45, 67, 10]

In [11]:
rdd1.count()

16

In [13]:
for val in rdd1.collect():
    print(val)

23
45
67
86
78
3
4
5
6
10
11
12
23
45
67
10


In [15]:
rdd1.distinct().collect()

[4, 12, 45, 5, 86, 78, 6, 10, 23, 67, 3, 11]

In [16]:
rdd1.distinct().count()

12

In [19]:
rdd1.collect()

[23, 45, 67, 86, 78, 3, 4, 5, 6, 10, 11, 12, 23, 45, 67, 10]

In [21]:
rdd1.first()

23

In [22]:
rdd1.take(5)

[23, 45, 67, 86, 78]

### Filter

In [23]:
rdd1.filter(lambda x : x<=20).collect()

[3, 4, 5, 6, 10, 11, 12, 10]

### Reduce

In [26]:
rdd1.reduce(lambda x,y : x+y)

495

In [30]:
# rdd1.saveAsTextFile('dataset/rdd1.txt')

### Map

In [31]:
rdd2 = spark.sparkContext.parallelize([1, 2, 3, 4, 5])

In [32]:
rdd2

ParallelCollectionRDD[33] at readRDDFromFile at PythonRDD.scala:274

In [33]:
rdd2.collect()

[1, 2, 3, 4, 5]

In [34]:
rdd2.map(lambda x: x*3).collect()

[3, 6, 9, 12, 15]

### Union

In [35]:
rdd3 = spark.sparkContext.parallelize([2, 4, 5, 6, 7, 8, 9])

In [36]:
rdd3.collect()

[2, 4, 5, 6, 7, 8, 9]

In [37]:
union1 = rdd3.filter(lambda x : x%2 == 0)
union2 = rdd3.filter(lambda x : x%3 == 0)

In [38]:
union1.collect()

[2, 4, 6, 8]

In [39]:
union2.collect()

[6, 9]

In [40]:
final_union = union1.union(union2)

In [41]:
final_union.collect()

[2, 4, 6, 8, 6, 9]

### Matrix

In [42]:
from pyspark.mllib.linalg import Matrix, Matrices

In [43]:
data = [10, 20, 30, 40, 50, 60]

In [44]:
matrix = Matrices.dense(3, 2, data)

In [45]:
matrix

DenseMatrix(3, 2, [10.0, 20.0, 30.0, 40.0, 50.0, 60.0], False)

In [46]:
type(matrix)

pyspark.mllib.linalg.DenseMatrix

In [47]:
matrix.values

array([10., 20., 30., 40., 50., 60.])

In [48]:
print(matrix)

DenseMatrix([[10., 40.],
             [20., 50.],
             [30., 60.]])


### Flatmap

It returns seperate value for each element in RDD

In [53]:
rdd4 = spark.sparkContext.parallelize(['Hey there', 'This is RDD Session in PySpark'])

In [54]:
rdd4

ParallelCollectionRDD[41] at readRDDFromFile at PythonRDD.scala:274

In [55]:
rdd4.collect()

['Hey there', 'This is RDD Session in PySpark']

In [56]:
rdd4.flatMap(lambda x : x.split(' ')).collect()

['Hey', 'there', 'This', 'is', 'RDD', 'Session', 'in', 'PySpark']

### ReduceByKey

It performs multiple parallel process for each key in the data and combines the value for same key.

It uses lambda to perfrom task.

In [57]:
marks = [('Punit', 55), ('Salam', 70), ('Dharmesh', 80), ('Rohan', 80), ('Mayuri', 55), ('Sumit', 90)]

In [58]:
marks

[('Punit', 55),
 ('Salam', 70),
 ('Dharmesh', 80),
 ('Rohan', 80),
 ('Mayuri', 55),
 ('Sumit', 90)]

In [61]:
rdd5 = spark.sparkContext.parallelize(marks+marks)

In [62]:
rdd5.collect()

[('Punit', 55),
 ('Salam', 70),
 ('Dharmesh', 80),
 ('Rohan', 80),
 ('Mayuri', 55),
 ('Sumit', 90),
 ('Punit', 55),
 ('Salam', 70),
 ('Dharmesh', 80),
 ('Rohan', 80),
 ('Mayuri', 55),
 ('Sumit', 90)]

In [63]:
rdd5.reduceByKey(lambda x,y :x+y).collect()

[('Rohan', 160),
 ('Punit', 110),
 ('Salam', 140),
 ('Dharmesh', 160),
 ('Mayuri', 110),
 ('Sumit', 180)]

### SortByKey

In [64]:
rdd5.sortByKey().collect()

[('Dharmesh', 80),
 ('Dharmesh', 80),
 ('Mayuri', 55),
 ('Mayuri', 55),
 ('Punit', 55),
 ('Punit', 55),
 ('Rohan', 80),
 ('Rohan', 80),
 ('Salam', 70),
 ('Salam', 70),
 ('Sumit', 90),
 ('Sumit', 90)]

In [65]:
rdd5.sortByKey(ascending=False).collect()

[('Sumit', 90),
 ('Sumit', 90),
 ('Salam', 70),
 ('Salam', 70),
 ('Rohan', 80),
 ('Rohan', 80),
 ('Punit', 55),
 ('Punit', 55),
 ('Mayuri', 55),
 ('Mayuri', 55),
 ('Dharmesh', 80),
 ('Dharmesh', 80)]

### GroupByKey

In [66]:
rdd5.collect()

[('Punit', 55),
 ('Salam', 70),
 ('Dharmesh', 80),
 ('Rohan', 80),
 ('Mayuri', 55),
 ('Sumit', 90),
 ('Punit', 55),
 ('Salam', 70),
 ('Dharmesh', 80),
 ('Rohan', 80),
 ('Mayuri', 55),
 ('Sumit', 90)]

In [68]:
res = rdd5.groupByKey().collect()

In [70]:
for key,val in res:
    print(key, list(val))

Rohan [80, 80]
Punit [55, 55]
Salam [70, 70]
Dharmesh [80, 80]
Mayuri [55, 55]
Sumit [90, 90]


### CountByKey

In [71]:
count = rdd5.countByKey().items()

In [72]:
for k,v in count:
    print(k,v)

Punit 2
Salam 2
Dharmesh 2
Rohan 2
Mayuri 2
Sumit 2


### Empty RDD

In [73]:
empty = spark.sparkContext.emptyRDD()

In [75]:
empty.collect()

[]

### Aggregations

In [76]:
data = [6, 7, 8, 9, 98, 87, 66, 54, 33, 54]

In [77]:
rdd6 = spark.sparkContext.parallelize(data, 5)

In [80]:
rdd6.getNumPartitions()

5

In [81]:
rdd6.min()

6

In [82]:
rdd6.max()

98

In [83]:
rdd6.sum()

422

In [85]:
rdd6.collect()

[6, 7, 8, 9, 98, 87, 66, 54, 33, 54]

### DataFrame

In [86]:
rdd5.collect()

[('Punit', 55),
 ('Salam', 70),
 ('Dharmesh', 80),
 ('Rohan', 80),
 ('Mayuri', 55),
 ('Sumit', 90),
 ('Punit', 55),
 ('Salam', 70),
 ('Dharmesh', 80),
 ('Rohan', 80),
 ('Mayuri', 55),
 ('Sumit', 90)]

In [87]:
cols_name = ['Name', 'Marks']

In [88]:
marks_df = rdd5.toDF(cols_name)

In [89]:
marks_df.show()

+--------+-----+
|    Name|Marks|
+--------+-----+
|   Punit|   55|
|   Salam|   70|
|Dharmesh|   80|
|   Rohan|   80|
|  Mayuri|   55|
|   Sumit|   90|
|   Punit|   55|
|   Salam|   70|
|Dharmesh|   80|
|   Rohan|   80|
|  Mayuri|   55|
|   Sumit|   90|
+--------+-----+



In [90]:
from pyspark.sql import Row

In [91]:
col = ['Data']

In [92]:
row = Row('Data')

In [93]:
res = rdd6.map(row).toDF()

In [94]:
res.show()

+----+
|Data|
+----+
|   6|
|   7|
|   8|
|   9|
|  98|
|  87|
|  66|
|  54|
|  33|
|  54|
+----+

