# **Working with RDD (Resilient Distributed Dataset)**


In [1]:
########## ONLY in Colab ##########
!pip3 install pyspark
########## ONLY in Colab ##########

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=a5381ab236ac9e386e104ab5d219992e6c125834d7c1d81482660d5c35938075
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [2]:
########## ONLY in Ubuntu Machine ##########
# Load Spark engine
!pip3 install -q findspark
import findspark
findspark.init()
########## ONLY in Ubuntu Machine ##########

In [3]:
# Linking with Spark
from pyspark import SparkContext, SparkConf

In [4]:
# Initializing Spark
conf = SparkConf().setAppName("RDD_practice").setMaster("local[*]")
sc = SparkContext(conf=conf)
print(sc)

<SparkContext master=local[*] appName=RDD_practice>


# **Part 1: Create RDDs and Basic Operations**
# **There are two ways to create RDDs:**

1.   Parallelizing an existing collection in your driver program
2.   Referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

In [5]:
# Generate random data:
import random
# Generate 10 random numbers between 0 and 40
randomlist = random.sample(range(0, 40), 10)
print(randomlist)

[17, 32, 15, 29, 16, 35, 3, 9, 13, 4]


In [6]:
# Create RDD:
rdd1 = sc.parallelize(randomlist, 4)
rdd1.collect()


[17, 32, 15, 29, 16, 35, 3, 9, 13, 4]

In [7]:
# Data distribution in partitions:
# Data distribution in partitions:
print(rdd1.getNumPartitions())
print(rdd1.glom().collect())
print("Two partitions: ", rdd1.glom().take(2))

4
[[17, 32], [15, 29], [16, 35], [3, 9, 13, 4]]
Two partitions:  [[17, 32], [15, 29]]


In [8]:
# Print last partition
for item in rdd1.glom().collect()[3]:
  print(item)

3
9
13
4


In [9]:
# count():
rdd1.count()

10

In [10]:
# first():
rdd1.first()

17

In [11]:
# top():
# This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver's memory.
# It returns the list sorted in descending order.

rdd1.top(2)

[35, 32]

In [12]:
# distinct():
rdd1.distinct().collect()

[32, 16, 4, 17, 29, 9, 13, 15, 35, 3]

In [14]:
# map():
rdd_map = rdd1.map(lambda x:(x+1)*3)
rdd_map.collect()

[54, 99, 48, 90, 51, 108, 12, 30, 42, 15]

In [13]:
# flatMap():
rdd_flatmap=rdd1.flatMap(lambda x: [x+2,x+5])
print(rdd_flatmap.collect())
print("The summation of elements =", rdd_flatmap.reduce(lambda a,b : a + b))

[19, 22, 34, 37, 17, 20, 31, 34, 18, 21, 37, 40, 5, 8, 11, 14, 15, 18, 6, 9]
The summation of elements = 416


In [15]:
# Descriptive statistics:
print([rdd1.max(), rdd1.min(), rdd1.mean(), rdd1.sum(), round(rdd1.stdev(),2), rdd1.top(2)])

[35, 3, 17.3, 173, 10.69, [35, 32]]


In [16]:
# Descriptive statistics:
print([rdd1.max(), rdd1.min(), rdd1.mean(), rdd1.sum(), round(rdd1.stdev(),2), rdd1.top(2)])

[35, 3, 17.3, 173, 10.69, [35, 32]]


In [17]:
# mapPartitions():

def myfunc(partition):
  sum = 0
  for item in partition:
    sum = sum + item
  yield sum  # "return" causes a function to exit; "yield" is used to define generator and returns an intermediate results.

rdd_mapPartition = rdd1.mapPartitions(myfunc)
rdd_mapPartition.collect()

[49, 44, 51, 29]

# **Part 2: Advanced RDD Transformations and Actions**

In [18]:
# union():
print(rdd1.collect())
rdd2 = sc.parallelize([1, 14, 20, 20, 28, 10, 13, 3],2)
print(rdd2.collect())

rdd_union = rdd1.union(rdd2)
print(rdd_union.getNumPartitions())
print(rdd_union.collect())

[17, 32, 15, 29, 16, 35, 3, 9, 13, 4]
[1, 14, 20, 20, 28, 10, 13, 3]
6
[17, 32, 15, 29, 16, 35, 3, 9, 13, 4, 1, 14, 20, 20, 28, 10, 13, 3]


In [19]:
# intersection():
rdd_intersection = rdd1.intersection(rdd2)
print(rdd_intersection.getNumPartitions())
print(rdd_intersection.collect())

rdd_intersection.glom().collect()

6
[13, 3]


[[], [13], [], [3], [], []]

In [20]:
# Find empty partitions
counter = 0
for item in rdd_intersection.glom().collect():
  if len(item) == 0:
    counter = counter + 1
print(counter)
print(rdd_union.collect())

4
[17, 32, 15, 29, 16, 35, 3, 9, 13, 4, 1, 14, 20, 20, 28, 10, 13, 3]


In [21]:
# coalesce(numPartitions):
rdd_intersection.coalesce(1).glom().collect()

[[13, 3]]

In [22]:
# coalesce(numPartitions):
rdd_intersection.coalesce(1).glom().collect()

[[13, 3]]

In [23]:
# takeSample(withReplacement, num, [seed])
# This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver’s memory.

rdd1.takeSample(False, 5)

[15, 9, 29, 4, 32]

In [24]:
# takeSample(withReplacement, num, [seed])
# This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver’s memory.

rdd1.takeSample(False, 5)

[32, 9, 15, 17, 29]

In [25]:
# takeOrdered(n, [ordering])
# This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver’s memory.
print(rdd1.collect())
print(rdd1.takeOrdered(5))
print(rdd1.takeOrdered(5, key=lambda x: -x))

[17, 32, 15, 29, 16, 35, 3, 9, 13, 4]
[3, 4, 9, 13, 15]
[35, 32, 29, 17, 16]


In [26]:
# reduce():
# A commutative and associative binary operator.
rdd1.reduce(lambda x,y: x+y)

173

In [27]:
# reduceByKey():
rdd_Rbk = sc.parallelize([(1,4),(7,10),(5,7),(1,12),(7,12),(7,1),(9,1),(7,4)], 2)
print(rdd_Rbk.collect())
rdd_Rbk.reduceByKey(lambda x,y: x+y).collect()


# tabular visualization
import pandas as pd
Counter = pd.DataFrame({'Key': rdd_Rbk.keys().collect(),
                 'Values': rdd_Rbk.values().collect()})
Counter

[(1, 4), (7, 10), (5, 7), (1, 12), (7, 12), (7, 1), (9, 1), (7, 4)]


Unnamed: 0,Key,Values
0,1,4
1,7,10
2,5,7
3,1,12
4,7,12
5,7,1
6,9,1
7,7,4


In [28]:
# sortByKey():
rdd_Rbk.reduceByKey(lambda x,y: x+y).sortByKey().collect()

[(1, 16), (5, 7), (7, 27), (9, 1)]

In [29]:
# countByKey()
rdd_Rbk.countByKey()
rdd_Rbk.countByKey().items()
sorted(rdd_Rbk.countByKey())
sorted(rdd_Rbk.countByKey().items())

[(1, 2), (5, 1), (7, 4), (9, 1)]

In [30]:
# groupByKey():
rdd_group = rdd_Rbk.groupByKey() # or assign a partition 4
rdd_group.getNumPartitions()

rdd_group.collect() # it executes at driver node, not recommended

for item in rdd_group.collect():
  print(item[0], [value for value in item[1]])

1 [4, 12]
7 [10, 12, 1, 4]
5 [7]
9 [1]


In [31]:
# lookup(key):
rdd_Rbk.lookup(7)

[10, 12, 1, 4]

In [32]:
# cache:
# By default, each transformed RDD may be recomputed each time you run an action on it.
# However, you may also persist an RDD in memory using the persist (or cache) method,
# in which case Spark will keep the elements around on the cluster for much faster access the next time you query it.

rdd_Rbk.persist() # OR rdd_Rbk.cache()

ParallelCollectionRDD[54] at readRDDFromFile at PythonRDD.scala:289

In [33]:
# Persistence (https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence)
from pyspark import StorageLevel
rdd1.persist(StorageLevel.MEMORY_AND_DISK)

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:289