<a href="https://colab.research.google.com/github/aayrm5/PySpark/blob/main/Working_with_RDD.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Working with RDD (Resilient Distributed Dataset)**

**`Credits: Amin Karami (PhD, FHEA)`**

---

**Resilient Distributed Dataset (RDD)**: RDD is the fundamental data structure of Spark. It is fault-tolerant (resilient) and immutable distributed collections of any type of objects.

source: https://spark.apache.org/docs/latest/rdd-programming-guide.html

source: https://spark.apache.org/docs/latest/api/python/reference/

In [1]:
########## ONLY in Colab ##########
!pip3 install pyspark
########## ONLY in Colab ##########

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 36 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 39.4 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=0ca4a79e32f5f62e5ba2cf919acede521c51720daae0a9eff26141b17a0a6131
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [2]:
########## ONLY in Ubuntu Machine ##########
# Load Spark engine
# !pip3 install -q findspark
# import findspark
# findspark.init()
########## ONLY in Ubuntu Machine ##########

In [3]:
# Linking with Spark
from pyspark import SparkContext, SparkConf

In [4]:
# Initializing Spark
conf = SparkConf().setAppName("RDD_practice").setMaster("local[*]")
sc = SparkContext(conf=conf)
print(sc)

<SparkContext master=local[*] appName=RDD_practice>


In [5]:
sc.defaultParallelism

2

# **Part 1: Create RDDs and Basic Operations**
# **There are two ways to create RDDs:**

1.   Parallelizing an existing collection in your driver program
2.   Referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

In [6]:
# Generate random data:
import random
randomlist = random.sample(range(0,40), 10)
print(randomlist)

[1, 25, 2, 19, 17, 32, 29, 21, 23, 13]


In [7]:
# Create RDD:
rdd1 = sc.parallelize(randomlist, 4)

In [8]:
rdd1.getNumPartitions()

4

In [9]:
# Data distribution in partitions:
rdd1.glom().collect()

[[1, 25], [2, 19], [17, 32], [29, 21, 23, 13]]

In [10]:
# Print last partition
rdd1.glom().collect()[3]

[29, 21, 23, 13]

In [11]:
# count():
rdd1.count()

10

In [12]:
# first():
rdd1.first()

1

In [13]:
# top():
rdd1.top(5)

[32, 29, 25, 23, 21]

In [14]:
# distinct():
rdd1.distinct().collect()

[32, 1, 25, 17, 29, 21, 13, 2, 19, 23]

In [15]:
# map():
rdd1.map(lambda x: x*2).collect()

[2, 50, 4, 38, 34, 64, 58, 42, 46, 26]

In [16]:
# filter(): 
rdd1.filter(lambda x: x%2==0).collect()

[2, 32]

In [17]:
# flatMap():
rdd_flatmap = rdd1.flatMap(lambda x: [x+2, x+5])
print(rdd_flatmap.collect())
print(rdd_flatmap.reduce(lambda x,y: x+y))

[3, 6, 27, 30, 4, 7, 21, 24, 19, 22, 34, 37, 31, 34, 23, 26, 25, 28, 15, 18]
434


In [18]:
# Descriptive statistics:
print([
       rdd1.max(), rdd1.min(), rdd1.mean(), rdd1.sum(), round(rdd1.stdev(),2), rdd1.top(2)
])

[32, 1, 18.2, 182, 9.86, [32, 29]]


In [19]:
# mapPartitions():
def my_func(partition):
    sum=0
    for item in partition:
        sum=+item
    yield sum

rdd_mapPartition = rdd1.mapPartitions(my_func)
rdd_mapPartition.collect()

[25, 19, 32, 13]

# **Part 2: Advanced RDD Transformations and Actions**

In [22]:
# union():

print(rdd1.collect())
rdd2 = sc.parallelize([1,14,20,20,28,10,13], 2)
print(rdd2.collect())

rdd_union = rdd1.union(rdd2)
print(rdd_union.getNumPartitions())
print(rdd_union.collect())
print(rdd_union.glom().collect())

[1, 25, 2, 19, 17, 32, 29, 21, 23, 13]
[1, 14, 20, 20, 28, 10, 13]
6
[1, 25, 2, 19, 17, 32, 29, 21, 23, 13, 1, 14, 20, 20, 28, 10, 13]
[[1, 25], [2, 19], [17, 32], [29, 21, 23, 13], [1, 14, 20], [20, 28, 10, 13]]


In [23]:
# intersection():
rdd_intersection = rdd1.intersection(rdd2)
print(rdd_intersection.getNumPartitions())
print(rdd_intersection.collect())

rdd_intersection.glom().collect()

6
[1, 13]


[[], [1, 13], [], [], [], []]

In [24]:
# Find empty partitions
counter=0
for item in rdd_intersection.glom().collect():
    if len(item) == 0:
        counter=+1
print(counter)

1


In [25]:
# coalesce(numPartitions):
rdd_intersection.coalesce(1).glom().collect()

[[1, 13]]

In [26]:
# takeSample(withReplacement, num, [seed])
rdd1.takeSample(False, 5)

[21, 13, 29, 1, 17]

In [27]:
# takeOrdered(n, [ordering])
print(rdd1.collect())
print(rdd1.takeOrdered(5))
print(rdd1.takeOrdered(5, key=lambda x: -x))

[1, 25, 2, 19, 17, 32, 29, 21, 23, 13]
[1, 2, 13, 17, 19]
[32, 29, 25, 23, 21]


In [28]:
# reduce():
rdd1.reduce(lambda x,y: x+y)

182

In [30]:
# reduceByKey():
rdd_RBK = sc.parallelize([
                          (1,4), (7,10), (5,7), (1,12), (7,12), (7,1), (9,1), (7,4)
], 2)
print(rdd_RBK.collect())

print(rdd_RBK.reduceByKey(lambda x,y: x+y).collect())

import pandas as pd
Counter = pd.DataFrame(
    {'Key': rdd_RBK.keys().collect(),
     'Values':rdd_RBK.values().collect()}
)

Counter

[(1, 4), (7, 10), (5, 7), (1, 12), (7, 12), (7, 1), (9, 1), (7, 4)]
[(1, 16), (7, 27), (5, 7), (9, 1)]


Unnamed: 0,Key,Values
0,1,4
1,7,10
2,5,7
3,1,12
4,7,12
5,7,1
6,9,1
7,7,4


In [31]:
# sortByKey():
rdd_RBK.reduceByKey(lambda x,y: x+y).sortByKey().collect()

[(1, 16), (5, 7), (7, 27), (9, 1)]

In [33]:
# countByKey()
print(rdd_RBK.countByKey())
print(rdd_RBK.countByKey().items())
print(sorted(rdd_RBK.countByKey()))
print(sorted(rdd_RBK.countByKey().items()))

defaultdict(<class 'int'>, {1: 2, 7: 4, 5: 1, 9: 1})
dict_items([(1, 2), (7, 4), (5, 1), (9, 1)])
[1, 5, 7, 9]
[(1, 2), (5, 1), (7, 4), (9, 1)]


In [36]:
# groupByKey():
rdd_group = rdd_RBK.groupByKey()
print(rdd_group.getNumPartitions())

print(rdd_group.collect())

for item in rdd_group.collect():
    print(item[0], [value for value in item[1]])

2
[(1, <pyspark.resultiterable.ResultIterable object at 0x7f4179be0150>), (7, <pyspark.resultiterable.ResultIterable object at 0x7f4179be0e10>), (5, <pyspark.resultiterable.ResultIterable object at 0x7f4179be0390>), (9, <pyspark.resultiterable.ResultIterable object at 0x7f4179be0cd0>)]
1 [4, 12]
7 [10, 12, 1, 4]
5 [7]
9 [1]


In [37]:
# lookup(key):
rdd_RBK.lookup(7)

[10, 12, 1, 4]

In [38]:
# cache:
# By default, each transformed RDD may be recomputed each time you run an action on it.
# However, you may also persist an RDD in memory using the persist (or cache) method,
# in which case Spark will keep the elements around on the cluster for much faster access the next time you query it.
rdd_RBK.persist()

ParallelCollectionRDD[56] at readRDDFromFile at PythonRDD.scala:274

In [40]:
# Persistence (https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence)
from pyspark import StorageLevel
rdd1.persist(StorageLevel.MEMORY_AND_DISK).collect()

[1, 25, 2, 19, 17, 32, 29, 21, 23, 13]