# Importing libraries

In [1]:
import os
import sys
import glob
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [2]:
from pyspark.sql import SparkSession,functions, Window
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pyspark.pandas as ps
from pyspark import *

# Starting Spark Session and Creating a SparkSession called Resilient Distributed Dataset RDD

In [3]:
spark = SparkSession.builder \
        .master("local[4]") \
        .appName("Resilient Distributed Dataset (RDD) Training") \
        .config("spark.some.config.option", "config-value") \
        .enableHiveSupport() \
        .getOrCreate()

## running Spark locally with 4 cores

In [4]:
spark

In [5]:
print(spark)

<pyspark.sql.session.SparkSession object at 0x000001C74EB806A0>


In [6]:
data = [1,1,2,2,3,3,4,4,5,6,7,8,9,10,11,12,13,14,15,25,99,124]
rdd=spark.sparkContext.parallelize(data)

# `Actions` in PySpark RDDs

In [7]:
print(rdd.collect())

[1, 1, 2, 2, 3, 3, 4, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 25, 99, 124]


In [8]:
print(rdd.glom().collect())

[[1, 1, 2, 2, 3], [3, 4, 4, 5, 6], [7, 8, 9, 10, 11], [12, 13, 14, 15, 25, 99, 124]]


In [9]:
print(rdd.distinct().collect())

[4, 8, 12, 124, 1, 5, 9, 13, 25, 2, 6, 10, 14, 3, 7, 11, 15, 99]


In [10]:
print(rdd.count())

22


In [11]:
first_rdd = rdd
print(first_rdd.first())

1


In [12]:
take_rdd = rdd
print(take_rdd.take(3))

[1, 1, 2]


In [13]:
reduce_rdd = rdd 
print(reduce_rdd.reduce(lambda x, y : x + y))

378


In [14]:
rdd.sum()

378

In [15]:
rdd2 = rdd.map(lambda x: (x,1))
print(rdd2.collect())

[(1, 1), (1, 1), (2, 1), (2, 1), (3, 1), (3, 1), (4, 1), (4, 1), (5, 1), (6, 1), (7, 1), (8, 1), (9, 1), (10, 1), (11, 1), (12, 1), (13, 1), (14, 1), (15, 1), (25, 1), (99, 1), (124, 1)]


### Finding maximum element by `reduce`

In [16]:
rdd.reduce(lambda x,y: x if x > y else y)

124

In [17]:
rdd.fold(0, lambda x,y: x + y)

378

In [18]:
print("Maximum: ", rdd.max())
print("Minimum: ", rdd.min())
print("Mean (average): ", rdd.mean())
print("Standard deviation: ", rdd.stdev())
rdd.stats()

Maximum:  124
Minimum:  1
Mean (average):  17.181818181818183
Standard deviation:  30.583174761826097


(count: 22, mean: 17.181818181818183, stdev: 30.583174761826097, max: 124.0, min: 1.0)

### Finding the longest word using `reduce`

In [19]:
words = 'My name is Lukasz. I am learning PySpark and RDD PySpark to broaden my horizons in Big Data'.split(' ')
wordRDD=spark.sparkContext.parallelize(words)
wordRDD.reduce(lambda w,v: w if len(w)>len(v) else v)

'horizons'

### We can write regular Python functions to use with `reduce`

In [20]:
def largerThan(x,y):
    if len(x)> len(y):
        return x
    elif len(y) > len(x):
        return y
    else:
        if x < y: return x
        else: return y

In [21]:
wordRDD.reduce(largerThan)

'horizons'

### `Sampling` an RDD

In [22]:
# get a sample whose expected size is m
#note that the size of the sample is different in different runs
m=5
n=20
print('Sample1=', rdd.sample(False,m/n).collect())
print('Sample2=', rdd.sample(False,m/n).collect())

Sample1= [3, 4, 9, 10, 13]
Sample2= [1, 9, 10]


# Saving `RDD` as a text file

In [24]:
save_rdd = rdd
save_rdd.saveAsTextFile('fileActions.txt')

# `Transformations` in PySpark RDDs

In [25]:
data = [1,2,3,4,5,6,7]
rdd=spark.sparkContext.parallelize(data)

print(rdd.map(lambda x: x +10).collect())

[11, 12, 13, 14, 15, 16, 17]


### `Map` operation with regular Python function

In [26]:
def square_if_odd(x):
    if x%2==1:
        return x*x
    else:
        return x

In [27]:
rdd.map(square_if_odd).collect()

[1, 2, 9, 4, 25, 6, 49]

In [28]:
print(rdd.filter(lambda x: x%2 ==0).collect())

[2, 4, 6]


In [29]:
rdd.map(lambda x:(x, x*x)).collect()

[(1, 1), (2, 4), (3, 9), (4, 16), (5, 25), (6, 36), (7, 49)]

### Using `filter` to return RDD with elements divisible by 3

In [30]:
rdd.filter(lambda x:x%3==0).collect()

[3, 6]

In [31]:
data = ['Lukasz','Lucjan','Ligrecja','Lucja','Adam','Marcin','Rafal','Artur']
rdd = spark.sparkContext.parallelize(data)
print(rdd.filter(lambda x: x.startswith("L")).collect())

['Lukasz', 'Lucjan', 'Ligrecja', 'Lucja']


In [32]:
data = [2,4,5,6,7,8,9]
union_inp = spark.sparkContext.parallelize(data)
union_rdd_1 = union_inp.filter(lambda x: x % 2 == 0)
union_rdd_2 = union_inp.filter(lambda x: x % 3 ==0)
print(union_rdd_1.union(union_rdd_2).collect())


[2, 4, 6, 8, 6, 9]


### `flatmap` method returns a new RDD by first applying a function to all elements of this RDD, then flattening the result

In [33]:
data = [1,2,3,4,5,6,7]
rdd=spark.sparkContext.parallelize(data)
rdd.flatMap(lambda x:(x, x*x)).collect()

[1, 1, 2, 4, 3, 9, 4, 16, 5, 25, 6, 36, 7, 49]

In [34]:
data = ['Hey All!', "My name is Lukasz", 'I learn PySpark RDD Transformations']
rdd = spark.sparkContext.parallelize(data)
(rdd.flatMap(lambda x: x.split(" ")).collect())

['Hey',
 'All!',
 'My',
 'name',
 'is',
 'Lukasz',
 'I',
 'learn',
 'PySpark',
 'RDD',
 'Transformations']

# PySpark `Pair RDD Operations & Transformations` in Pair RDDS

In [35]:
scores = [('Lukasz', 90), ('Maciej', 87), ('Artur', 64), ('Dawid', 25), ('Bartosz', 76)]
rdd = spark.sparkContext.parallelize(scores)
rdd.collect()


[('Lukasz', 90), ('Maciej', 87), ('Artur', 64), ('Dawid', 25), ('Bartosz', 76)]

In [36]:
scores = [('Lukasz', 90),
           ('Maciej', 87), 
           ('Artur', 64),
             ('Dawid', 25), 
             ('Bartosz', 76),
             ('Lukasz', 91),
           ('Maciej', 82), 
           ('Artur', 63),
             ('Dawid', 24), 
             ('Bartosz', 77)]

rdd = spark.sparkContext.parallelize(scores)
rdd.reduceByKey(lambda x, y: x+ y).collect()

[('Lukasz', 181),
 ('Maciej', 169),
 ('Artur', 127),
 ('Dawid', 49),
 ('Bartosz', 153)]

In [37]:
rdd.sortByKey('ascending').collect()

[('Artur', 64),
 ('Artur', 63),
 ('Bartosz', 76),
 ('Bartosz', 77),
 ('Dawid', 25),
 ('Dawid', 24),
 ('Lukasz', 90),
 ('Lukasz', 91),
 ('Maciej', 87),
 ('Maciej', 82)]

In [38]:
dict_rdd = rdd.groupByKey().collect()
for key, value in dict_rdd:
    print(key, list(value))

Lukasz [90, 91]
Maciej [87, 82]
Artur [64, 63]
Dawid [25, 24]
Bartosz [76, 77]


In [39]:
dict_rdd = rdd.countByKey().items()
for key,value in dict_rdd:
    print(key, value)

Lukasz 2
Maciej 2
Artur 2
Dawid 2
Bartosz 2


# PySpark RDD `Grouping` and `binning`

### `groupby` returns a RDD of grouped elements (iterable) as per a given group operation (function)

In [40]:
data = [2,3,4,5,6,7,8,9,10,11,12,13,14,15]
rdd = spark.sparkContext.parallelize(data)
result = rdd.groupBy(lambda x:x%2).collect()
sorted([(x, sorted(y)) for (x,y) in result])


[(0, [2, 4, 6, 8, 10, 12, 14]), (1, [3, 5, 7, 9, 11, 13, 15])]

### `histogram` method takes a list of bins/buckets and returns a tuple with the result of the histogram (binning)

In [41]:
rdd2 = rdd.map(lambda x:x*x)
rdd2.collect()

[4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225]

In [42]:
rdd2.histogram([x for x in range(0,100,10)])

([0, 10, 20, 30, 40, 50, 60, 70, 80, 90], [2, 1, 1, 1, 1, 0, 1, 0, 1])

### `Set operations`

#### Create smaller RDDs to demonstrate joint operations

In [43]:
list1 = np.random.randint(0,10,3)
rddA = spark.sparkContext.parallelize(list1)
list2 = np.random.randint(0,10,3)
rddB = spark.sparkContext.parallelize(list2)
print("rddA:",rddA.collect())
print("rddB:",rddB.collect())


rddA: [5, 2, 1]
rddB: [7, 2, 4]


### `rddA` + `rddB` gives the union (like set union), not the element wise sum!

In [44]:
(rddA + rddB).collect()

[5, 2, 1, 7, 2, 4]

### `cartesian` gives the pairwise product (as tuples)

In [45]:
rddA.cartesian(rddB).collect()

[(5, 7), (5, 2), (5, 4), (2, 7), (2, 2), (2, 4), (1, 7), (1, 2), (1, 4)]

### `intersection` and `subtract` methods return a RDD of the set intersection and subtraction (difference)

In [46]:
data = [1,2,3,4,5,6,7,10,11,12]
data2 = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20]
rddA = spark.sparkContext.parallelize(data)
rddB = spark.sparkContext.parallelize(data2)

rddA.intersection(rddB).collect()


[1, 2, 10, 3, 11, 4, 12, 5, 6, 7]

In [47]:
rddA.subtract(rddB).collect()

[]

### Stop the `SparkContext` at the end 

In [48]:
spark.stop()

### Returning the count of each value in RDD as a dictionary, then ordering operations and use items()

In [49]:
from pyspark import SparkConf, SparkContext
import collections
conf = SparkConf().setMaster("local").setAppName("RatingsHistogram")
sc = SparkContext(conf = conf)
# sc = Spark Context

In [50]:
lines = sc.textFile("D:\My commercial projects\PySpark\Project 5 PySpark RDD\data.txt")

In [51]:
ratings = lines.map(lambda x: x.split()[2])
result = ratings.countByValue() 
sortedResults = collections.OrderedDict(sorted(result.items()))
for key, value in sortedResults.items():
    print("%s %i" % (key,value))

1 6110
2 11370
3 27145
4 34174
5 21201


In [52]:
sc.stop()

### Maximum values for each sensor

In [53]:
conf = SparkConf().setMaster("local").setAppName("RODExercises")
sc = SparkContext(conf = conf)

In [54]:
lines = sc.textFile("D:\My commercial projects\PySpark\Project 5 PySpark RDD/sensors.txt")

In [55]:
print(lines.collect())

['s1,2016-01-01,20.5', 's2,2016-01-01,30.1', 's1,2016-01-02,60.2', 's2,2016-01-02,20.4', 's1,2016-01-03,55.5', 's2,2016-01-03,52.5']


In [56]:
sensorsplit = lines.map(lambda x:x.split(','))
print(sensorsplit.collect())

[['s1', '2016-01-01', '20.5'], ['s2', '2016-01-01', '30.1'], ['s1', '2016-01-02', '60.2'], ['s2', '2016-01-02', '20.4'], ['s1', '2016-01-03', '55.5'], ['s2', '2016-01-03', '52.5']]


In [57]:
sensorsplitpm10 = sensorsplit.map(lambda x:(x[0], float(x[2])))
print(sensorsplitpm10.collect())

[('s1', 20.5), ('s2', 30.1), ('s1', 60.2), ('s2', 20.4), ('s1', 55.5), ('s2', 52.5)]


### Sensors with at least 2 readings with a PM10 value greater than the critical threshold 50

In [58]:
sensorandpm10split = lines.map(lambda x:x.split(',')).map(lambda x:(x[0], float(x[2]))).filter(lambda x:x[1] >50.0)

In [59]:
sensorandpm10split.collect()

[('s1', 60.2), ('s1', 55.5), ('s2', 52.5)]

In [60]:
print(sensorandpm10split.countByKey())

defaultdict(<class 'int'>, {'s1': 2, 's2': 1})


In [61]:
print(sensorandpm10split.groupByKey().mapValues(len).sortBy(lambda x:x[1], False).collect())

[('s1', 2), ('s2', 1)]


### A sensorID and the list of dates with a PM10 values greater than 50 for that sensor

In [62]:
print(lines.collect())

['s1,2016-01-01,20.5', 's2,2016-01-01,30.1', 's1,2016-01-02,60.2', 's2,2016-01-02,20.4', 's1,2016-01-03,55.5', 's2,2016-01-03,52.5']


In [63]:
sensorandpm10split = lines.map(lambda x:x.split(',')).filter(lambda x: float(x[2])>50.0).map(lambda x:(x[0], x[1]))

In [64]:
print(sensorandpm10split.collect())

[('s1', '2016-01-02'), ('s1', '2016-01-03'), ('s2', '2016-01-03')]


In [65]:
print(sensorandpm10split.groupByKey().mapValues(list).collectAsMap())

{'s1': ['2016-01-02', '2016-01-03'], 's2': ['2016-01-03']}


In [66]:
sc.stop()

# PySpark using `RDD`

In [67]:
conf = SparkConf().setMaster("local").setAppName("RDD Hamlet")
sc = SparkContext(conf = conf)
poem_rdd1 = sc.textFile("D:\My commercial projects\PySpark\Project 5 PySpark RDD/hamlet.txt")


In [68]:
sc

In [69]:
poem_rdd1

D:\My commercial projects\PySpark\Project 5 PySpark RDD/hamlet.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [70]:
poem_rdd1.take(2)

['To be, or not to be, that is the question:',
 "Whether 'tis nobler in the mind to suffer"]

In [71]:
print(poem_rdd1.collect())

['To be, or not to be, that is the question:', "Whether 'tis nobler in the mind to suffer", 'The slings and arrows of outrageous fortune,', 'Or to take arms against a sea of troubles', 'And by opposing end them. To die—to sleep,', 'No more; and by a sleep to say we end', 'The heart-ache and the thousand natural shocks', "That flesh is heir to: 'tis a consummation", "Devoutly to be wish'd. To die, to sleep;", "To sleep, perchance to dream—ay, there's the rub:", 'For in that sleep of death what dreams may come,', 'When we have shuffled off this mortal coil,', "Must give us pause—there's the respect", 'That makes calamity of so long life.', 'For who would bear the whips and scorns of time,', "Th'oppressor's wrong, the proud man's contumely,", "The pangs of dispriz'd love, the law's delay,", 'The insolence of office, and the spurns', "That patient merit of th'unworthy takes,", 'When he himself might his quietus make', 'With a bare bodkin? Who would fardels bear,', 'To grunt and sweat under

### `Map` and `Reduce`

In [72]:
lineLengths = poem_rdd1.map(lambda s: len(s)) # transformation
totalLength = lineLengths.reduce(lambda a, b: a + b) # action
print(lineLengths.collect())
print('Total lines:', totalLength)

[42, 41, 44, 41, 42, 37, 46, 42, 40, 49, 48, 43, 38, 36, 48, 48, 45, 39, 40, 38, 43, 38, 44, 43, 39, 43, 39, 44, 37, 47, 41, 41, 28]
Total lines: 1374


### Get the rdd item `count` 

In [73]:
my_rdd = sc.parallelize([1, 2, 3, 4, 5],2)
my_rdd.count()

5

In [74]:
print(my_rdd.take(3))

[1, 2, 3]


### Show `the number of partitions`

In [75]:
print('The number of partitions in RDD:  {0}'.format(my_rdd.getNumPartitions()))
print(my_rdd.glom().collect()) 
print(my_rdd.collect()) # it shows single list

The number of partitions in RDD:  2
[[1, 2], [3, 4, 5]]
[1, 2, 3, 4, 5]


### Using a defined function with `map`

In [76]:
def transfunc(lines):
    lines = lines.lower() # taking each line and converts it to lower case
    lines = lines.split() # then, each line was splited
    return lines # it returns each word separately by commas

poem_rdd2 = poem_rdd1.map(transfunc)
poem_rdd2.take(5)

[['to', 'be,', 'or', 'not', 'to', 'be,', 'that', 'is', 'the', 'question:'],
 ['whether', "'tis", 'nobler', 'in', 'the', 'mind', 'to', 'suffer'],
 ['the', 'slings', 'and', 'arrows', 'of', 'outrageous', 'fortune,'],
 ['or', 'to', 'take', 'arms', 'against', 'a', 'sea', 'of', 'troubles'],
 ['and', 'by', 'opposing', 'end', 'them.', 'to', 'die—to', 'sleep,']]

### Using `flatMap` to flatten out the rdd 

In [77]:
poem_rdd3 = poem_rdd1.flatMap(transfunc) #each word is a separate element
poem_rdd3.take(15)

['to',
 'be,',
 'or',
 'not',
 'to',
 'be,',
 'that',
 'is',
 'the',
 'question:',
 'whether',
 "'tis",
 'nobler',
 'in',
 'the']

### Get `distinct values` in an RDD

In [78]:
poem_rdd3.distinct().take(5) # Transformation followed by Action

['to', 'be,', 'or', 'not', 'that']

### Using `filter` to apply a search condition

In [79]:
skipwords = ['to', 'the', 'of']
poem_rdd4 = poem_rdd3.filter(lambda x: x not in skipwords)
poem_rdd4.take(10)

['be,',
 'or',
 'not',
 'be,',
 'that',
 'is',
 'question:',
 'whether',
 "'tis",
 'nobler']

### Getting `statistics` on an RDD

In [80]:
numbers_rdd = sc.parallelize(range(1,500))
numbers_rdd.max(), numbers_rdd.min(), numbers_rdd.sum(), numbers_rdd.variance(), numbers_rdd.stdev()

(499, 1, 124750, 20750.0, 144.04860290887933)

In [81]:
sc.stop()