# Learning PySpark 
### Video series

### Packt Publishing

**Author**: Tomasz Drabas
**Date**:   2017-12-10





# Section 3: Resilient Distributed Datasets - Actions

In this section we will look at the Resilient Distributed Datasets (RDDs) and the actions available.

## Read in the data

In [1]:
%%sh
head -n 2 ../data/sample_data.csv

OrderDate,Region,Rep,Item,Units,UnitCost,Total
1/6/16,East,Jones,Pencil,95,1.99,189.05


In [2]:
import datetime as dt

def parseCSVRow(inputRow):
    try:
        rowSplit = inputRow.split(',')
        rowSplit[0] = dt.datetime.strptime(rowSplit[0], '%m/%d/%y')
        rowSplit[4] = int(rowSplit[4])
        
        for i in [5,6]:
            rowSplit[i] = float(rowSplit[i])
        
        return [rowSplit]
    except:
        return []

rdd_clean = sc.textFile('../data/sample_data.csv', 4) \
    .flatMap(parseCSVRow)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
6,,pyspark,idle,,,✔


SparkSession available as 'spark'.


## .take(...) action

In [None]:
rdd_clean.take(2)

In [None]:
for element in rdd_clean.takeOrdered(5, key = lambda el: el[0]):
    print(element)

In [None]:
for element in rdd_clean.takeSample(False, 5, seed=667):
    print(element)

## .collect(...) action

In [3]:
len(rdd_clean.collect()) == rdd_clean.count()

True

In [4]:
for element in rdd_clean.filter(lambda el: el[-1] > 1000).collect():
    print(element)

[datetime.datetime(2016, 7, 29, 0, 0), 'East', 'Parent', 'Binder', 81, 19.99, 1619.19]
[datetime.datetime(2016, 12, 29, 0, 0), 'East', 'Parent', 'Pen Set', 74, 15.99, 1183.26]
[datetime.datetime(2017, 2, 1, 0, 0), 'Central', 'Smith', 'Binder', 87, 15.0, 1305.0]
[datetime.datetime(2017, 8, 7, 0, 0), 'Central', 'Kivell', 'Pen Set', 42, 23.95, 1005.9]
[datetime.datetime(2017, 10, 14, 0, 0), 'West', 'Thompson', 'Binder', 57, 19.99, 1139.43]
[datetime.datetime(2017, 12, 4, 0, 0), 'Central', 'Jardine', 'Binder', 94, 19.99, 1879.06]

## .reduce(...) action

In [6]:
from operator import add

total_value = rdd_clean \
    .map(lambda el: el[-1]) \
    .reduce(add)
    
total_value

18628.38

In [7]:
total_value = rdd_clean \
    .map(lambda el: el[-1]) \
    .reduce(lambda x, y: x + y)

total_value

18628.38

## .reduceByKey(...) action

In [8]:
sales_by_region = rdd_clean \
    .map(lambda el: (el[1], el[-1])) \
    .reduceByKey(lambda x, y: x + y)
    
for element in sales_by_region.collect():
    print(element)

('East', 6002.090000000001)
('Central', 10139.57)
('West', 2486.7200000000003)

## .count() action

In [9]:
rdd_clean.count()

42

In [10]:
rdd_clean.countApprox(10, confidence=0.9)

42

In [11]:
sales = rdd_clean \
    .map(lambda el: el[2])

sales.countApproxDistinct()

11

In [12]:
sales.distinct().count()

11

## .foreach(...) action

In [16]:
distinct_sales = sales.distinct()
distinct_sales.foreach(print)

## .aggregate(...) action

In [20]:
seqOp =  (lambda x, y: (x[0] + y,    x[1] + 1))
combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))

rdd_clean \
    .map(lambda el: el[-1]) \
    .aggregate((0.0,0), seqOp, combOp)

(18628.38, 42)

## .aggregateByKey(...) action

In [21]:
seqOp =  (lambda x, y: (x[0] + y[0], x[1] + y[1]))
combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))

for element in rdd_clean \
    .map(lambda el: (el[2], (el[-1], 1))) \
    .aggregateByKey((0.0, 0), seqOp, combOp) \
    .map(lambda el: (el[0], el[1][0], el[1][1], el[1][0] / el[1][1])) \
    .collect():
        print(element)

('Jardine', 2812.19, 5, 562.438)
('Gill', 1749.8700000000001, 5, 349.97400000000005)
('Smith', 1641.43, 3, 547.1433333333333)
('Howard', 536.75, 2, 268.375)
('Thompson', 1203.1100000000001, 2, 601.5550000000001)
('Jones', 2363.04, 8, 295.38)
('Sorvino', 1283.6100000000001, 4, 320.90250000000003)
('Andrews', 438.37, 4, 109.5925)
('Morgan', 1387.77, 3, 462.59)
('Parent', 3102.3, 3, 1034.1000000000001)
('Kivell', 2109.94, 3, 703.3133333333334)

## .coalesce(...) action

In [24]:
rdd_clean.getNumPartitions()

4

In [25]:
rdd_single = rdd_clean.coalesce(1)
rdd_single.getNumPartitions()

1

## .combineByKey(...) action

In [26]:
def combiner(element):
    return [element]

def valueMerger(element1, element2):
    element1.append(element2)
    return element1

def combinerMerger(element1, element2):
    el1 = dict(element1)
    
    for e in element2:
        if e[0] not in el1:
            el1[e[0]] = 0

        el1[e[0]] += e[1]
    
    return list(el1.items())

for element in rdd_clean \
    .map(lambda el: (el[2], (el[3], el[-1]))) \
    .combineByKey(combiner, valueMerger, combinerMerger) \
    .collect():
        print(element)

('Jardine', [('Binder', 1933.95), ('Pencil', 449.1), ('Pen Set', 249.5)])
('Gill', [('Pen', 539.73), ('Pencil', 77.4), ('Binder', 1132.74)])
('Smith', [('Desk', 250.0), ('Pencil', 86.43), ('Binder', 1305.0)])
('Howard', [('Pen', 479.04), ('Binder', 57.71)])
('Thompson', [('Pencil', 63.68), ('Binder', 1139.43)])
('Jones', [('Pen', 575.36), ('Binder', 559.36), ('Pencil', 363.70000000000005), ('Pen Set', 565.22)])
('Sorvino', [('Pen', 151.24), ('Desk', 825.0), ('Pencil', 167.44), ('Binder', 139.93)])
('Andrews', [('Pencil', 298.65000000000003), ('Binder', 139.72)])
('Morgan', [('Pen Set', 686.95), ('Pencil', 449.1), ('Binder', 251.72)])
('Parent', [('Binder', 1619.19), ('Pen', 299.85), ('Pen Set', 1183.26)])
('Kivell', [('Desk', 625.0), ('Pen Set', 1484.94)])

## .histogram(...) action

In [27]:
hist = rdd_clean \
    .map(lambda el: el[-1]) \
    .histogram(10)

for bucket in [(round(b, 0), v) for b, v in zip(hist[0], hist[1])]:
    print(bucket)

(9.0, 17)
(196.0, 7)
(383.0, 7)
(570.0, 4)
(757.0, 1)
(944.0, 1)
(1131.0, 3)
(1318.0, 0)
(1505.0, 1)
(1692.0, 1)

## Sorting data

### sortBy(...) action

In [28]:
for element in rdd_clean \
    .map(lambda el: (el[2], el[0].strftime('%Y-%m'), el[1], el[-1])) \
    .sortBy(lambda el: el[-1], ascending = False) \
    .take(5):
        print(element)
    

('Jardine', '2017-12', 'Central', 1879.06)
('Parent', '2016-07', 'East', 1619.19)
('Smith', '2017-02', 'Central', 1305.0)
('Parent', '2016-12', 'East', 1183.26)
('Thompson', '2017-10', 'West', 1139.43)

### sortByKey(...) action

In [29]:
for element in rdd_clean \
    .map(lambda el: (el[-1], (el[2], el[0].strftime('%Y-%m'), el[1]))) \
    .sortByKey(ascending = False) \
    .take(5):
        print(element)

(1879.06, ('Jardine', '2017-12', 'Central'))
(1619.19, ('Parent', '2016-07', 'East'))
(1305.0, ('Smith', '2017-02', 'Central'))
(1183.26, ('Parent', '2016-12', 'East'))
(1139.43, ('Thompson', '2017-10', 'West'))

## Saving data

### .saveAsTextFile(...) action

In [30]:
%%sh
rm -rf ../data/sample_data_cleaned.csv

In [33]:
rdd_clean.saveAsTextFile('../data/sample_data_cleaned.csv')

In [34]:
%%sh
ls -la ../data

total 8
drwxr-xr-x   5 drabast  staff   160 Jan  9 22:30 .
drwxr-xr-x  12 drabast  staff   384 Dec 15 00:00 ..
-rw-r--r--@  1 drabast  staff  1927 Dec 15 21:56 sample_data.csv
drwxr-xr-x  12 drabast  staff   384 Jan  9 22:30 sample_data_cleaned.csv
drwxr-xr-x  12 drabast  staff   384 Jan  8 22:23 sample_data_cleaned.pkl


In [35]:
%%sh
ls -la ../data/sample_data_cleaned.csv

total 72
drwxr-xr-x  12 drabast  staff   384 Jan  9 22:30 .
drwxr-xr-x   5 drabast  staff   160 Jan  9 22:30 ..
-rw-r--r--   1 drabast  staff     8 Jan  9 22:30 ._SUCCESS.crc
-rw-r--r--   1 drabast  staff    16 Jan  9 22:30 .part-00000.crc
-rw-r--r--   1 drabast  staff    20 Jan  9 22:30 .part-00001.crc
-rw-r--r--   1 drabast  staff    16 Jan  9 22:30 .part-00002.crc
-rw-r--r--   1 drabast  staff    16 Jan  9 22:30 .part-00003.crc
-rw-r--r--   1 drabast  staff     0 Jan  9 22:30 _SUCCESS
-rw-r--r--   1 drabast  staff   767 Jan  9 22:30 part-00000
-rw-r--r--   1 drabast  staff  1030 Jan  9 22:30 part-00001
-rw-r--r--   1 drabast  staff   938 Jan  9 22:30 part-00002
-rw-r--r--   1 drabast  staff   877 Jan  9 22:30 part-00003


In [32]:
%%sh
rm -rf ../data/sample_data_cleaned_gzipped.csv

In [36]:
rdd_clean.saveAsTextFile(
    '../data/sample_data_cleaned_gzipped.csv',
    'org.apache.hadoop.io.compress.GzipCodec'
)

In [37]:
%%sh
ls -la ../data/sample_data_cleaned_gzipped.csv

total 72
drwxr-xr-x  12 drabast  staff  384 Jan  9 22:32 .
drwxr-xr-x   6 drabast  staff  192 Jan  9 22:32 ..
-rw-r--r--   1 drabast  staff    8 Jan  9 22:32 ._SUCCESS.crc
-rw-r--r--   1 drabast  staff   12 Jan  9 22:32 .part-00000.gz.crc
-rw-r--r--   1 drabast  staff   12 Jan  9 22:32 .part-00001.gz.crc
-rw-r--r--   1 drabast  staff   12 Jan  9 22:32 .part-00002.gz.crc
-rw-r--r--   1 drabast  staff   12 Jan  9 22:32 .part-00003.gz.crc
-rw-r--r--   1 drabast  staff    0 Jan  9 22:32 _SUCCESS
-rw-r--r--   1 drabast  staff  266 Jan  9 22:32 part-00000.gz
-rw-r--r--   1 drabast  staff  328 Jan  9 22:32 part-00001.gz
-rw-r--r--   1 drabast  staff  324 Jan  9 22:32 part-00002.gz
-rw-r--r--   1 drabast  staff  311 Jan  9 22:32 part-00003.gz


### .saveAsPickleFile(...) action

In [38]:
%%sh
rm -rf ../data/sample_data_cleaned.pkl

In [39]:
rdd_clean.saveAsPickleFile('../data/sample_data_cleaned.pkl')

In [40]:
%%sh
ls -la ../data/sample_data_cleaned.pkl

total 72
drwxr-xr-x  12 drabast  staff   384 Jan  9 22:33 .
drwxr-xr-x   6 drabast  staff   192 Jan  9 22:33 ..
-rw-r--r--   1 drabast  staff     8 Jan  9 22:33 ._SUCCESS.crc
-rw-r--r--   1 drabast  staff    16 Jan  9 22:33 .part-00000.crc
-rw-r--r--   1 drabast  staff    20 Jan  9 22:33 .part-00001.crc
-rw-r--r--   1 drabast  staff    20 Jan  9 22:33 .part-00002.crc
-rw-r--r--   1 drabast  staff    20 Jan  9 22:33 .part-00003.crc
-rw-r--r--   1 drabast  staff     0 Jan  9 22:33 _SUCCESS
-rw-r--r--   1 drabast  staff   951 Jan  9 22:33 part-00000
-rw-r--r--   1 drabast  staff  1235 Jan  9 22:33 part-00001
-rw-r--r--   1 drabast  staff  1157 Jan  9 22:33 part-00002
-rw-r--r--   1 drabast  staff  1049 Jan  9 22:33 part-00003


## Descriptive Statistics

In [41]:
values = rdd_clean \
    .map(lambda el: el[-1])

### .mean() action

In [42]:
values.mean()

443.53285714285715

### .stdev() action

In [43]:
values.stdev()

438.90819278819419

### .max() action

In [44]:
values.max()

1879.06

### .min() action

In [45]:
values.min()

9.03