In [2]:
!pip3 install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m21.4 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=6a1148e01970c64b6602f6aeba60b7c0d9cfd42be807b183f05e3bdda336bb1d
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [3]:
from pyspark import SparkContext, SparkConf

In [4]:
#Initializing Spark
conf = SparkConf().setAppName("RDD_practice").setMaster("local[*]") 
sc = SparkContext(conf=conf)

In [5]:
print(sc)

<SparkContext master=local[*] appName=RDD_practice>


In [6]:
#Lets see how many cores are available
sc.defaultParallelism

2

In [7]:
#Generate random data
import random
randomlist =  random.sample(range(0, 40),10)
print(randomlist)

[39, 11, 21, 5, 12, 19, 25, 22, 15, 34]


In [8]:
#Create RDD
#I want to split my data into 4 parts. We are going to use "collect" because we have small data.
rdd1 = sc.parallelize(randomlist, 4)
rdd1.collect()


[39, 11, 21, 5, 12, 19, 25, 22, 15, 34]

In [None]:
#Data distributions in partitions
rdd1.getNumPartitions()
#lets print all partitions
print(rdd1.glom().collect())
#lets print first 2 partitions
print(rdd1.glom().take(2))
#lets print last partition
print(rdd1.glom().collect()[3])

[[31, 0], [1, 22], [32, 27], [7, 21, 23, 3]]
[[31, 0], [1, 22]]
[7, 21, 23, 3]


In [None]:
#count
print(rdd1.count())

10


In [None]:
#first
print(rdd1.first())

31


In [None]:
#top
print(rdd1.top(2))

[32, 31]


In [None]:
#distinct
print(rdd1.distinct().collect())

[0, 32, 1, 21, 22, 31, 27, 7, 23, 3]


In [None]:
#map. map transformation returns a new rdd
rdd_map = rdd1.map(lambda item: (item + 3) * 3 )
print(rdd_map.collect())

[102, 9, 12, 75, 105, 90, 30, 72, 78, 18]


In [None]:
#filter. filter returns a new dataset
rdd_filter = rdd1.filter(lambda x: x%3 == 0)
print(rdd_filter.collect())

[0, 27, 21, 3]


In [None]:
#flatMap. flatmap is similar to map, but each item is can be mapped to 0 or more output items. It collects in to single list.
rdd_flatmap = rdd1.flatMap(lambda x: [ x+2, x+5 ])
print(rdd_flatmap.collect())
#lets use reduce action. 
rdd_flatmap.reduce(lambda x, y : x+y)


[33, 36, 2, 5, 3, 6, 24, 27, 34, 37, 29, 32, 9, 12, 23, 26, 25, 28, 5, 8]


404

In [None]:
#Descriptive statistics
print([rdd1.max(),rdd1.min(),rdd1.mean(),round(rdd1.stdev(), 2 ),rdd1.sum()])

[32, 0, 16.7, 11.99, 167]


In [None]:
#mapPartitions. We will have a specific calculation for each partitions.
def myfunc(partition):
  sum = 0
  for item in partition:
    sum = sum + item
  yield sum
#We can not use return here, because it returns only one result. Here we want one result per partition.
print(rdd1.mapPartitions(myfunc).collect())


[31, 23, 59, 54]


In [None]:
#union. it returns a new dataset. Unions the elements of dataset and the argument.
print(rdd1.collect())
#we create rrd2 with  2 partitions
rdd2 = sc.parallelize([1,14,20,2028,10,13,3],2)
print(rdd2.collect())

rdd_union = rdd1.union(rdd2)
print(rdd_union.collect())

#lets check howe many partitions we have
print(rdd_union.getNumPartitions())

[31, 0, 1, 22, 32, 27, 7, 21, 23, 3]
[1, 14, 20, 2028, 10, 13, 3]
[31, 0, 1, 22, 32, 27, 7, 21, 23, 3, 1, 14, 20, 2028, 10, 13, 3]
6


In [None]:
#intersection
rdd_intersection = rdd1.intersection(rdd2)
print(rdd_intersection.collect())
#how many partitions we have 
print(rdd_intersection.getNumPartitions())

[1, 3]
6


In [None]:
#find empty partitions
print(rdd_intersection.glom().collect())

counter = 0
for item in rdd_intersection.glom().collect():
  if len(item)==0:
    counter = counter+1
print(counter)

[[], [1], [], [3], [], []]
4


In [None]:
#coalesce(numPartitions).we use to reduce the size of the partitions to the numPartitions.
print(rdd_intersection.coalesce(1).glom().collect())


[[1, 3]]


In [None]:
#takeSample(withReplacement, num, [seed]).It is advised to use takeSample for big data sizes.
print(rdd1.takeSample(False, 5))

[3, 1, 22, 7, 27]


In [9]:
#takeOrdered(n, [ordering])
print(rdd1.takeOrdered(5))
#for desc
print(rdd1.takeOrdered(5, key = lambda x: -x))

[5, 11, 12, 15, 19]
[39, 34, 25, 22, 21]


In [11]:
#reduce(func).aggregate elements of the dataset using a function.
print(rdd1.reduce(lambda x,y : x-y))

65


In [16]:
#reduceByKey
rdd_Rbk=sc.parallelize([(1,4),(7,10),(5,7),(1,12),(7,12),(7,1),(9,1),(7,4)],2)
print(rdd_Rbk.glom().collect())

print(rdd_Rbk.reduceByKey(lambda x,y : x+y).collect())

#user-friendly visualization
import pandas as pd
Counter = pd.DataFrame({'Key':rdd_Rbk.keys().collect(),
                        'Values':rdd_Rbk.values().collect()})
print(Counter)

[[(1, 4), (7, 10), (5, 7), (1, 12)], [(7, 12), (7, 1), (9, 1), (7, 4)]]
[(1, 16), (7, 27), (5, 7), (9, 1)]
   Key  Values
0    1       4
1    7      10
2    5       7
3    1      12
4    7      12
5    7       1
6    9       1
7    7       4


In [17]:
#sortByKey
print(rdd_Rbk.reduceByKey(lambda x,y : x+y).sortByKey(False).collect())

[(9, 1), (7, 27), (5, 7), (1, 16)]


In [20]:
#countByKey
print(rdd_Rbk.countByKey())
#or
print(sorted(rdd_Rbk.countByKey().items()))

defaultdict(<class 'int'>, {1: 2, 7: 4, 5: 1, 9: 1})
[(1, 2), (5, 1), (7, 4), (9, 1)]


In [27]:
#groupByKey. there is a difference between reduce and groupByKey. it is not good for big data.use carefully.
rdd_group = rdd_Rbk.groupByKey()
print(rdd_group.getNumPartitions())

for item in rdd_group.collect():
  print(item[0], [values for values in item[1]])

2
1 [4, 12]
7 [10, 12, 1, 4]
5 [7]
9 [1]


In [28]:
#lookup(key)
rdd_Rbk.lookup(7)

[10, 12, 1, 4]

In [29]:
#cache
#By default, each transformed RDD may be recomputed each time you run an action on it.
#However, you may also persist an RDD in memory using the persist (or cache) method,
#in which case Spark will keep the elements around on the cluster for much faster access the next time you query it.
#it is still in memory.gc won't collect it.
rdd_Rbk.persist()
#persistence.We use MEMORY_AND_DISK because 1TB of data dows not fit in to 32GB RAM.
from pyspark import StorageLevel
rdd1.persist(StorageLevel.MEMORY_AND_DISK)

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274