In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.3-py2.py3-none-any.whl size=317840625 sha256=60f61e0f07a0977ef2eee844d9bf82a7113c2e65a8b570a13f4e384d28c23a47
  Stored in directory: /root/.cache/pip/wheels/1b/3a/92/28b93e2fbfdbb07509ca4d6f50c5e407f48dce4ddbda69a4ab
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.3


In [2]:
from pyspark import SparkContext, SparkConf


In [3]:
conf = SparkConf().setAppName("TestApp").setMaster("local[*]")
sc = SparkContext(conf=conf)


In [4]:
import random
randomlist = random.sample(range(1, 40), 10)
print(randomlist)

[39, 27, 10, 21, 22, 38, 9, 14, 23, 36]


In [5]:
rdd1 = sc.parallelize(randomlist, 4)
rdd1.collect()

[39, 27, 10, 21, 22, 38, 9, 14, 23, 36]

In [6]:
print(rdd1.getNumPartitions())
print(rdd1.glom().collect())
print(rdd1.glom().take(2))

4
[[39, 27], [10, 21], [22, 38], [9, 14, 23, 36]]
[[39, 27], [10, 21]]


In [7]:
rdd1.glom().collect()[3]

[9, 14, 23, 36]

In [8]:
# distinct():
rdd1.distinct().collect()


[36, 21, 9, 10, 22, 38, 14, 39, 27, 23]

In [9]:
# map():
rdd_map = rdd1.map(lambda x:(x+1)*3)
rdd_map.collect()

[120, 84, 33, 66, 69, 117, 30, 45, 72, 111]

In [10]:
# filter():
rdd_filter = rdd1.filter(lambda x : x%3==0)
rdd_filter.collect()


[39, 27, 21, 9, 36]

In [11]:
rdd_filter.glom().collect()
# removed the empty partitions using Garbage collection

[[39, 27], [21], [], [9, 36]]

In [12]:
# flatMap():
rdd_flatmap=rdd1.flatMap(lambda x: [x+2,x+5])
print(rdd_flatmap.glom().collect())
print(rdd_flatmap.collect())

print("The summation of elements =", rdd_flatmap.reduce(lambda a,b : a + b))

[[41, 44, 29, 32], [12, 15, 23, 26], [24, 27, 40, 43], [11, 14, 16, 19, 25, 28, 38, 41]]
[41, 44, 29, 32, 12, 15, 23, 26, 24, 27, 40, 43, 11, 14, 16, 19, 25, 28, 38, 41]
The summation of elements = 548


In [13]:
# Descriptive statistics:
print([rdd1.max(), rdd1.min(), rdd1.mean(), rdd1.sum(), round(rdd1.stdev(),2), rdd1.top(2)])


[39, 9, 23.9, 239, 10.53, [39, 38]]


In [14]:
# mapPartitions():

def myfunc(partition):
  sum = 0
  for item in partition:
    sum = sum + item
  yield sum  # "return" causes a function to exit; "yield" is used to define generator and returns an intermediate results.

rdd_mapPartition = rdd1.mapPartitions(myfunc)
rdd_mapPartition.collect()


[66, 31, 60, 82]

# Part 2: Advanced RDD Transformations and Actions


In [15]:
# union():
print(rdd1.collect())
rdd2 = sc.parallelize([1, 14, 20, 20, 28, 10, 13, 3],2)
print(rdd2.collect())

rdd_union = rdd1.union(rdd2)
print(rdd_union.getNumPartitions())
print(rdd_union.collect())

[39, 27, 10, 21, 22, 38, 9, 14, 23, 36]
[1, 14, 20, 20, 28, 10, 13, 3]
6
[39, 27, 10, 21, 22, 38, 9, 14, 23, 36, 1, 14, 20, 20, 28, 10, 13, 3]


In [16]:
# intersection():
rdd_intersection = rdd1.intersection(rdd2)
print(rdd_intersection.getNumPartitions())
print(rdd_intersection.collect())
rdd_intersection.glom().collect()


6
[14, 10]


[[], [], [14], [], [10], []]

In [17]:
# Find empty partitions
counter = 0
for item in rdd_intersection.glom().collect():
  if len(item) == 0:
    counter = counter + 1
print(counter)

4


In [18]:
# coalesce(numPartitions):
print(rdd_intersection.coalesce(1).glom().collect())

[[14, 10]]


In [19]:
# takeSample(withReplacement, num, [seed])
# This method should only be used if the resulting array is expected to be small,
# as all the data is loaded into the driver’s memory.

rdd1.takeSample(False, 5)

[21, 36, 27, 38, 23]

In [20]:
# takeOrdered(n, [ordering])
# This method should only be used if the resulting array is expected to be small,
#as all the data is loaded into the driver’s memory.
print(rdd1.collect())
print(rdd1.takeOrdered(5))
print(rdd1.takeOrdered(5, key=lambda x: -x))

[39, 27, 10, 21, 22, 38, 9, 14, 23, 36]
[9, 10, 14, 21, 22]
[39, 38, 36, 27, 23]


In [21]:

# reduce():
# A commutative and associative binary operator.
rdd1.reduce(lambda x,y: x+y)

239

In [22]:
# reduceByKey():
rdd_Rbk = sc.parallelize([(1,4),(7,10),(5,7),(1,12),(7,12),(7,1),(9,1),(7,4)], 2)
print(rdd_Rbk.collect())
rdd_Rbk.reduceByKey(lambda x,y: x+y).collect()

[(1, 4), (7, 10), (5, 7), (1, 12), (7, 12), (7, 1), (9, 1), (7, 4)]


[(1, 16), (7, 27), (5, 7), (9, 1)]

In [23]:
# tabular visualization
import pandas as pd
Counter = pd.DataFrame({'Key': rdd_Rbk.keys().collect(),
                 'Values': rdd_Rbk.values().collect()})

Counter

Unnamed: 0,Key,Values
0,1,4
1,7,10
2,5,7
3,1,12
4,7,12
5,7,1
6,9,1
7,7,4


In [24]:
# sortByKey():
rdd_Rbk.reduceByKey(lambda x,y: x+y).sortByKey().collect()

[(1, 16), (5, 7), (7, 27), (9, 1)]

In [25]:
# countByKey()
rdd_Rbk.countByKey()
rdd_Rbk.countByKey().items()
sorted(rdd_Rbk.countByKey())
sorted(rdd_Rbk.countByKey().items())

[(1, 2), (5, 1), (7, 4), (9, 1)]

In [26]:
# groupByKey():
rdd_group = rdd_Rbk.groupByKey() # or assign a partition 4
rdd_group.getNumPartitions()

2

In [27]:
rdd_group.collect() # it executes at driver node, not recommended


[(1, <pyspark.resultiterable.ResultIterable at 0x7fd935437100>),
 (7, <pyspark.resultiterable.ResultIterable at 0x7fd935434e50>),
 (5, <pyspark.resultiterable.ResultIterable at 0x7fd935435210>),
 (9, <pyspark.resultiterable.ResultIterable at 0x7fd935435690>)]

In [28]:
for item in rdd_group.collect():
  print(item[0], [value for value in item[1]])

1 [4, 12]
7 [10, 12, 1, 4]
5 [7]
9 [1]


In [29]:
# lookup(key):
rdd_Rbk.lookup(7)

[10, 12, 1, 4]

In [30]:
# cache:
# By default, each transformed RDD may be recomputed each time you run an action on it.
# However, you may also persist an RDD in memory using the persist (or cache) method,
# in which case Spark will keep the elements around on the cluster for much faster access the next time you query it.

rdd_Rbk.persist() # OR rdd_Rbk.cache()

ParallelCollectionRDD[44] at readRDDFromFile at PythonRDD.scala:289

In [31]:
# Persistence (https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence)
from pyspark import StorageLevel
rdd1.persist(StorageLevel.MEMORY_AND_DISK)

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:289