# **Working with RDD (Resilient Distributed Dataset)**

**`Udemy Course: Best Hands-on Big Data Practices and Use Cases using PySpark`**

**`Author: Amin Karami (PhD, FHEA)`**

---

**Resilient Distributed Dataset (RDD)**: RDD is the fundamental data structure of Spark. It is fault-tolerant (resilient) and immutable distributed collections of any type of objects.

source: https://spark.apache.org/docs/latest/rdd-programming-guide.html

source: https://spark.apache.org/docs/latest/api/python/reference/

# **Part 1: Create RDDs and Basic Operations**
# **There are two ways to create RDDs:**

1.   Parallelizing an existing collection in your driver program
2.   Referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

In [0]:
# Generate random data:
import random
random_list = random.sample(range(0, 40), 10)
print(random_list)

In [0]:
# Create RDD:
rdd1 = sc.parallelize(random_list, 4)

rdd1.collect()

In [0]:
# Data distribution in partitions:
print(rdd1.getNumPartitions())
print(rdd1.glom().collect())

print("the first two particions:", rdd1.glom().take(2))

In [0]:
# Print last partition
rdd1.glom().collect()[3]

In [0]:
# count():
rdd1.count()

In [0]:
# first():
rdd1.first()

In [0]:
# top():
rdd1.top(2)

In [0]:
# distinct():
rdd1.distinct().collect()

In [0]:
# map():
rdd_map = rdd1.map(lambda item: (item + 1) * 3)
print(rdd_map.collect())
print(rdd_map.glom().collect())

In [0]:
# filter():
rdd_filter = rdd1.filter(lambda x: x%2==0)
print(rdd_filter.collect())
print(rdd_filter.glom().collect())

In [0]:
# flatMap():
rdd_flatmap = rdd1.flatMap(lambda x: [x+2, x+5])
print(rdd_flatmap.collect())
print(rdd_flatmap.reduce(lambda x,y: x+y))

In [0]:
# Descriptive statistics:
print([rdd1.max(), rdd1.min(), rdd1.mean(), round(rdd1.stdev(), 2), rdd1.sum()])

In [0]:
# mapPartitions():
def myfunc(particion):
  sum = 0
  for item in particion:
    sum += item

  yield sum

rdd1.mapPartitions(myfunc).collect()

# **Part 2: Advanced RDD Transformations and Actions**

In [0]:
# union():
print(rdd1.collect())
rand_list2 = random_list = random.sample(range(0, 30), 8)
rdd2 = sc.parallelize(rand_list2, 2)
print(rdd2.collect())
rdd_union = rdd1.union(rdd2)
print(rdd_union.collect())

print(rdd_union.getNumPartitions())

In [0]:
# intersection():
rdd_intersection = rdd1.intersection(rdd2)
print(rdd_intersection.collect())
print(rdd_intersection.getNumPartitions())

rdd_intersection.glom().collect()

In [0]:
# Find empty
counter = 0
for item in rdd_intersection.glom().collect():
  if len(item) == 0:
    counter += 1

counter

In [0]:
# coalesce(numPartitions):
rdd_intersection.coalesce(1).glom().collect()

In [0]:
# takeSample(withReplacement, num, [seed])
rdd1.takeSample(False, 5)

In [0]:
# takeOrdered(n, [ordering])
print(rdd1.takeOrdered(5))

print(rdd1.takeOrdered(5, key=lambda x: -x))

In [0]:
# reduce():
rdd1.reduce(lambda x,y: x-y)

In [0]:
# reduceByKey():
rdd_rbk = sc.parallelize([(1,4), (7,10), (5,7), (1,12), (7,12), (7,1), (9,1), (7,4)], 2)
print(rdd_rbk.glom().collect())

print(rdd_rbk.reduceByKey(lambda x,y: x+y).collect())

# user friendly visualization
import pandas as pd
counter = pd.DataFrame({'Key': rdd_rbk.keys().collect(),
                        'Values': rdd_rbk.values().collect()})
print(counter)

In [0]:
# sortByKey():
print(rdd_rbk.reduceByKey(lambda x,y: x+y).sortByKey(True).collect())

In [0]:
# countByKey()
print(rdd_rbk.countByKey())

print(sorted(rdd_rbk.countByKey().items()))

In [0]:
# groupByKey():
rdd_group = rdd_rbk.groupByKey()

print(rdd_group.getNumPartitions())

for item in rdd_group.collect():
  print(item[0], [values for values in item[1]])

In [0]:
# lookup(key):
rdd_rbk.lookup(7)

In [0]:
# cache:
# By default, each transformed RDD may be recomputed each time you run an action on it.
# However, you may also persist an RDD in memory using the persist (or cache) method,
# in which case Spark will keep the elements around on the cluster for much faster access the next time you query it.
rdd_rbk.persist()

In [0]:
# Persistence (https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence)
from pyspark import StorageLevel
rdd1.persist(StorageLevel.MEMORY_AND_DISK)