# Learning PySpark 
### Video series

### Packt Publishing

**Author**: Tomasz Drabas
**Date**:   2017-12-10





# Section 3: Resilient Distributed Datasets - Actions

In this section we will look at the Resilient Distributed Datasets (RDDs) and the actions available.

## Read in the data

In [2]:
import datetime as dt

def parseCSVRow(inputRow):
    try:
        rowSplit = inputRow.split(',')
        rowSplit[0] = dt.datetime.strptime(rowSplit[0], '%m/%d/%y')
        rowSplit[4] = int(rowSplit[4])
        
        for i in [5,6]:
            rowSplit[i] = float(rowSplit[i])
        
        return [rowSplit]
    except:
        return []

rdd_clean = sc.textFile('../data/sample_data.csv', 4) \
    .flatMap(parseCSVRow)

## .take(...) action

In [3]:
rdd_clean.take(2)

[[datetime.datetime(2016, 1, 6, 0, 0), 'East', 'Jones', 'Pencil', 95, 1.99, 189.05], [datetime.datetime(2016, 2, 9, 0, 0), 'Central', 'Jardine', 'Pencil', 36, 4.99, 179.64]]

In [4]:
for element in rdd_clean.takeOrdered(5, key = lambda el: el[0]):
    print(element)

[datetime.datetime(2016, 1, 6, 0, 0), 'East', 'Jones', 'Pencil', 95, 1.99, 189.05]
[datetime.datetime(2016, 2, 9, 0, 0), 'Central', 'Jardine', 'Pencil', 36, 4.99, 179.64]
[datetime.datetime(2016, 2, 26, 0, 0), 'Central', 'Gill', 'Pen', 27, 19.99, 539.73]
[datetime.datetime(2016, 3, 15, 0, 0), 'West', 'Sorvino', 'Pencil', 56, 2.99, 167.44]
[datetime.datetime(2016, 4, 1, 0, 0), 'East', 'Jones', 'Binder', 60, 4.99, 299.4]

In [5]:
for element in rdd_clean.takeSample(False, 5, seed=666):
    print(element)

[datetime.datetime(2017, 8, 7, 0, 0), 'Central', 'Kivell', 'Pen Set', 42, 23.95, 1005.9]
[datetime.datetime(2016, 6, 25, 0, 0), 'Central', 'Morgan', 'Pencil', 90, 4.99, 449.1]
[datetime.datetime(2016, 9, 1, 0, 0), 'Central', 'Smith', 'Desk', 2, 125.0, 250.0]
[datetime.datetime(2017, 4, 10, 0, 0), 'Central', 'Andrews', 'Pencil', 66, 1.99, 131.34]
[datetime.datetime(2016, 5, 22, 0, 0), 'West', 'Thompson', 'Pencil', 32, 1.99, 63.68]

## .collect(...) action

In [6]:
len(rdd_clean.collect()) == rdd_clean.count()

True

In [7]:
for element in rdd_clean.filter(lambda el: el[-1] > 1000).collect():
    print(element)

[datetime.datetime(2016, 7, 29, 0, 0), 'East', 'Parent', 'Binder', 81, 19.99, 1619.19]
[datetime.datetime(2016, 12, 29, 0, 0), 'East', 'Parent', 'Pen Set', 74, 15.99, 1183.26]
[datetime.datetime(2017, 2, 1, 0, 0), 'Central', 'Smith', 'Binder', 87, 15.0, 1305.0]
[datetime.datetime(2017, 8, 7, 0, 0), 'Central', 'Kivell', 'Pen Set', 42, 23.95, 1005.9]
[datetime.datetime(2017, 10, 14, 0, 0), 'West', 'Thompson', 'Binder', 57, 19.99, 1139.43]
[datetime.datetime(2017, 12, 4, 0, 0), 'Central', 'Jardine', 'Binder', 94, 19.99, 1879.06]

## .reduce(...) action

In [8]:
from operator import add

total_value = rdd_clean \
    .map(lambda el: el[-1]) \
    .reduce(add)
    
total_value

18628.38

In [9]:
total_value = rdd_clean \
    .map(lambda el: el[-1]) \
    .reduce(lambda x, y: x + y)

total_value

18628.38

## .reduceByKey(...) action

In [10]:
sales_by_region = rdd_clean \
    .map(lambda el: (el[1], el[-1])) \
    .reduceByKey(lambda x, y: x + y)
    
for element in sales_by_region.collect():
    print(element)

('East', 6002.090000000001)
('Central', 10139.57)
('West', 2486.7200000000003)

## .count() action

In [11]:
rdd_clean.count()

42

In [12]:
rdd_clean.countApprox(10, confidence=0.9)

42

In [13]:
sales = rdd_clean \
    .map(lambda el: el[2])

sales.countApproxDistinct()

11

In [14]:
distinct_sales = sales.distinct()
distinct_sales.collect()

['Jardine', 'Gill', 'Smith', 'Howard', 'Thompson', 'Jones', 'Sorvino', 'Andrews', 'Morgan', 'Parent', 'Kivell']

In [15]:
sales.distinct().count()

11

## .foreach(...) action

In [16]:
distinct_sales.foreach(print)

## .aggregate(...) action

In [17]:
seqOp =  (lambda x, y: (x[0] + y,    x[1] + 1))
combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))

rdd_clean \
    .map(lambda el: el[-1]) \
    .aggregate((0.0,0), seqOp, combOp)

(18628.38, 42)

## .aggregateByKey(...) action

In [18]:
seqOp =  (lambda x, y: (x[0] + y[0], x[1] + 1))
combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))

for element in rdd_clean \
    .map(lambda el: (el[2], (el[-1], 1))) \
    .aggregateByKey((0.0, 0), seqOp, combOp) \
    .map(lambda el: (el[0], el[1][0], el[1][1], el[1][0] / el[1][1])) \
    .collect():
        print(element)

('Jardine', 2812.19, 5, 562.438)
('Gill', 1749.8700000000001, 5, 349.97400000000005)
('Smith', 1641.43, 3, 547.1433333333333)
('Howard', 536.75, 2, 268.375)
('Thompson', 1203.1100000000001, 2, 601.5550000000001)
('Jones', 2363.04, 8, 295.38)
('Sorvino', 1283.6100000000001, 4, 320.90250000000003)
('Andrews', 438.37, 4, 109.5925)
('Morgan', 1387.77, 3, 462.59)
('Parent', 3102.3, 3, 1034.1000000000001)
('Kivell', 2109.94, 3, 703.3133333333334)

## .coalesce(...) action

In [19]:
rdd_clean.getNumPartitions()

4

In [21]:
len(rdd_clean.glom().collect())

4

In [23]:
len(rdd_clean.coalesce(1).glom().collect())

1

In [24]:
rdd_clean.coalesce(1).collect()

[[datetime.datetime(2016, 1, 6, 0, 0), 'East', 'Jones', 'Pencil', 95, 1.99, 189.05], [datetime.datetime(2016, 2, 9, 0, 0), 'Central', 'Jardine', 'Pencil', 36, 4.99, 179.64], [datetime.datetime(2016, 2, 26, 0, 0), 'Central', 'Gill', 'Pen', 27, 19.99, 539.73], [datetime.datetime(2016, 3, 15, 0, 0), 'West', 'Sorvino', 'Pencil', 56, 2.99, 167.44], [datetime.datetime(2016, 4, 1, 0, 0), 'East', 'Jones', 'Binder', 60, 4.99, 299.4], [datetime.datetime(2016, 4, 18, 0, 0), 'Central', 'Andrews', 'Pencil', 75, 1.99, 149.25], [datetime.datetime(2016, 5, 5, 0, 0), 'Central', 'Jardine', 'Pencil', 90, 4.99, 449.1], [datetime.datetime(2016, 5, 22, 0, 0), 'West', 'Thompson', 'Pencil', 32, 1.99, 63.68], [datetime.datetime(2016, 6, 8, 0, 0), 'East', 'Jones', 'Binder', 60, 8.99, 539.4], [datetime.datetime(2016, 6, 25, 0, 0), 'Central', 'Morgan', 'Pencil', 90, 4.99, 449.1], [datetime.datetime(2016, 7, 12, 0, 0), 'East', 'Howard', 'Binder', 29, 1.99, 57.71], [datetime.datetime(2016, 7, 29, 0, 0), 'East', 'Pa

## .combineByKey(...) action

In [99]:
for element in rdd_clean \
    .map(lambda el: (el[2], (el[1], el[-1]))) \
    .groupByKey() \
    .mapValues(list)\
    .map(lambda el: (el[0], max(el[1], key=lambda m: m[1]))) \
    .collect():
        print(element)

('Jardine', ('Central', 1879.06))
('Gill', ('Central', 719.2))
('Smith', ('Central', 1305.0))
('Howard', ('East', 479.04))
('Thompson', ('West', 1139.43))
('Jones', ('East', 575.36))
('Sorvino', ('West', 825.0))
('Andrews', ('Central', 149.25))
('Morgan', ('Central', 686.95))
('Parent', ('East', 1619.19))
('Kivell', ('Central', 1005.9))

With `groupByKey` there's no combiner working on the executors -- everything is sent to a single reducer. CombineByKey runs a local appendByKey 

In [98]:
def combiner(element):
    return [element]

def append(element1, element2):
    element1.append(element2)
    return element1

def extend(element1, element2):
    element1.extend(element2)
    return element1

for element in rdd_clean \
    .map(lambda el: (el[2], (el[0].strftime('%Y-%m'), el[-1]))) \
    .combineByKey(combiner, append, extend) \
    .map(lambda el: (el[0], max(el[1], key=lambda m: m[1]))) \
    .collect():
        print(element)

('Jardine', ('2017-12', 1879.06))
('Gill', ('2017-05', 719.2))
('Smith', ('2017-02', 1305.0))
('Howard', ('2017-04', 479.04))
('Thompson', ('2017-10', 1139.43))
('Jones', ('2016-10', 575.36))
('Sorvino', ('2017-08', 825.0))
('Andrews', ('2016-04', 149.25))
('Morgan', ('2017-07', 686.95))
('Parent', ('2016-07', 1619.19))
('Kivell', ('2017-08', 1005.9))

## .histogram(...) action

In [125]:
hist = rdd_clean \
    .map(lambda el: el[-1]) \
    .histogram(10)



[(round(b, 0), v) for b, v in zip(hist[0], hist[1])]

[(9.0, 17), (196.0, 7), (383.0, 7), (570.0, 4), (757.0, 1), (944.0, 1), (1131.0, 3), (1318.0, 0), (1505.0, 1), (1692.0, 1)]

## Sorting data

### sortBy(...) action

In [132]:
for element in rdd_clean \
    .map(lambda el: (el[2], el[0].strftime('%Y-%m'), el[1], el[-1])) \
    .sortBy(lambda el: el[-1], ascending = False) \
    .take(5):
        print(element)
    

('Jardine', '2017-12', 'Central', 1879.06)
('Parent', '2016-07', 'East', 1619.19)
('Smith', '2017-02', 'Central', 1305.0)
('Parent', '2016-12', 'East', 1183.26)
('Thompson', '2017-10', 'West', 1139.43)

### sortByKey(...) action

In [134]:
for element in rdd_clean \
    .map(lambda el: (el[-1], (el[2], el[0].strftime('%Y-%m'), el[1]))) \
    .sortByKey(ascending = False) \
    .take(5):
        print(element)

(1879.06, ('Jardine', '2017-12', 'Central'))
(1619.19, ('Parent', '2016-07', 'East'))
(1305.0, ('Smith', '2017-02', 'Central'))
(1183.26, ('Parent', '2016-12', 'East'))
(1139.43, ('Thompson', '2017-10', 'West'))

## Saving data

### .saveAsTextFile(...) action

In [145]:
%%sh
rm -rf ../data/sample_data_cleaned.csv

In [135]:
rdd_clean.saveAsTextFile('../data/sample_data_cleaned.csv')

In [141]:
%%sh
ls -la ../data

total 8
drwxr-xr-x   4 drabast  staff   128 Dec 28 22:23 .
drwxr-xr-x  12 drabast  staff   384 Dec 15 00:00 ..
-rw-r--r--@  1 drabast  staff  1927 Dec 15 21:56 sample_data.csv
drwxr-xr-x  12 drabast  staff   384 Dec 28 22:23 sample_data_cleaned.csv


In [146]:
%%sh
rm -rf ../data/sample_data_cleaned_gzipped.csv

In [147]:
rdd_clean.saveAsTextFile(
    '../data/sample_data_cleaned_gzipped.csv',
    'org.apache.hadoop.io.compress.GzipCodec'
)

In [148]:
%%sh
ls -la ../data/sample_data_cleaned_gzipped.csv

total 72
drwxr-xr-x  12 drabast  staff  384 Dec 28 22:29 .
drwxr-xr-x   4 drabast  staff  128 Dec 28 22:29 ..
-rw-r--r--   1 drabast  staff    8 Dec 28 22:29 ._SUCCESS.crc
-rw-r--r--   1 drabast  staff   12 Dec 28 22:29 .part-00000.gz.crc
-rw-r--r--   1 drabast  staff   12 Dec 28 22:29 .part-00001.gz.crc
-rw-r--r--   1 drabast  staff   12 Dec 28 22:29 .part-00002.gz.crc
-rw-r--r--   1 drabast  staff   12 Dec 28 22:29 .part-00003.gz.crc
-rw-r--r--   1 drabast  staff    0 Dec 28 22:29 _SUCCESS
-rw-r--r--   1 drabast  staff  266 Dec 28 22:29 part-00000.gz
-rw-r--r--   1 drabast  staff  328 Dec 28 22:29 part-00001.gz
-rw-r--r--   1 drabast  staff  324 Dec 28 22:29 part-00002.gz
-rw-r--r--   1 drabast  staff  311 Dec 28 22:29 part-00003.gz


### .saveAsPickleFile(...) action

In [149]:
%%sh
rm -rf ../data/sample_data_cleaned.pkl

In [150]:
rdd_clean.saveAsTextFile('../data/sample_data_cleaned.pkl')

In [153]:
%%sh
ls -la ../data/sample_data_cleaned.pkl

total 72
drwxr-xr-x  12 drabast  staff   384 Dec 28 22:31 .
drwxr-xr-x   5 drabast  staff   160 Dec 28 22:31 ..
-rw-r--r--   1 drabast  staff     8 Dec 28 22:31 ._SUCCESS.crc
-rw-r--r--   1 drabast  staff    16 Dec 28 22:31 .part-00000.crc
-rw-r--r--   1 drabast  staff    20 Dec 28 22:31 .part-00001.crc
-rw-r--r--   1 drabast  staff    16 Dec 28 22:31 .part-00002.crc
-rw-r--r--   1 drabast  staff    16 Dec 28 22:31 .part-00003.crc
-rw-r--r--   1 drabast  staff     0 Dec 28 22:31 _SUCCESS
-rw-r--r--   1 drabast  staff   767 Dec 28 22:31 part-00000
-rw-r--r--   1 drabast  staff  1030 Dec 28 22:31 part-00001
-rw-r--r--   1 drabast  staff   938 Dec 28 22:31 part-00002
-rw-r--r--   1 drabast  staff   877 Dec 28 22:31 part-00003


## Descriptive Statistics

### .mean() action

In [154]:
rdd_clean \
    .map(lambda el: el[-1]) \
    .mean()

443.53285714285715

### .stdev() action

In [155]:
rdd_clean \
    .map(lambda el: el[-1]) \
    .stdev()

438.90819278819419

### .max() action

In [156]:
rdd_clean \
    .map(lambda el: el[-1]) \
    .max()

1879.06

### .min() action

In [157]:
rdd_clean \
    .map(lambda el: el[-1]) \
    .min()

9.03