# Spark RDD (Workshop 2) 
---

# Spark Context

In [3]:
! ls datasets

dataset.csv


In [4]:
from pyspark import SparkContext

sc = SparkContext()

# Read dataset to RDD

In [5]:
raw_data = sc.textFile("datasets/dataset.csv")

In [6]:
raw_data.take(10)

['username,purchase',
 '50888,473',
 '9041985,738',
 'rhbcnbyrf,444',
 '29121986,690',
 '301095,383',
 '14021969,942',
 '2071964,71',
 '21071967,857',
 '120976,488']

In [7]:
#take out the header
header = raw_data.first()
header_rdd = sc.parallelize([header])
input_data = raw_data.subtract(header_rdd)
input_data.take(10)

['2access,54',
 '3266,949',
 'bonkers1,370',
 'jamesj,250',
 '22011997,867',
 '260190,902',
 '5325,247',
 'hendri,482',
 'impress,904',
 '12081977,914']

# Process dataset

In [27]:
from collections import namedtuple

Record = namedtuple("Record", ["user", "purchase"])

def print_records(input_rdd):
    for record in input_rdd.take(10):
        print(record)

def to_number(x):
    return int(x)

def parse_record(x):
    elements = x.split(",")
    user, purchase = elements
    purchase = to_number(purchase)
    
    return Record(user, purchase)

parsed_data = input_data.map(parse_record)

print_records(parsed_data)


Record(user='2access', purchase=54)
Record(user='3266', purchase=949)
Record(user='bonkers1', purchase=370)
Record(user='jamesj', purchase=250)
Record(user='22011997', purchase=867)
Record(user='260190', purchase=902)
Record(user='5325', purchase=247)
Record(user='hendri', purchase=482)
Record(user='impress', purchase=904)
Record(user='12081977', purchase=914)


In [28]:
volume = parsed_data.map(lambda x: x.purchase)
print_records(volume)

54
949
370
250
867
902
247
482
904
914


## 1. Find the average purchase of each user.

In [29]:
purchase_data = parsed_data.map(lambda x: (x.user, (x.purchase, 1)))
print_records(purchase_data)

('2access', (54, 1))
('3266', (949, 1))
('bonkers1', (370, 1))
('jamesj', (250, 1))
('22011997', (867, 1))
('260190', (902, 1))
('5325', (247, 1))
('hendri', (482, 1))
('impress', (904, 1))
('12081977', (914, 1))


In [30]:
def f_count(x, y):
    x_purchase, x_count = x
    y_purchase, y_count = y
    return x_purchase + y_purchase, x_count + y_count

In [80]:
mean_data = purchase_data.reduceByKey(lambda x, y: f_count(x, y))
print_records(mean_data)

('monitor3', (4308, 9))
('nikki123', (7545, 12))
('22111959', (2291, 7))
('mole', (9297, 20))
('james01', (4842, 9))
('25091983', (5573, 12))
('27-Aug', (2623, 7))
('meiner', (5065, 10))
('Porsche9', (4784, 12))
('BABYLON', (6448, 12))


In [81]:
def f_mean(x):
    x_purchase_sum, x_purchase_count = x
    return x_purchase_sum/x_purchase_count

In [82]:
final_mean_data = mean_data.map(lambda x: (x[0], f_mean(x[1])))

In [104]:
final_mean_data

PythonRDD[141] at RDD at PythonRDD.scala:53

### 1.1. Sort result by average purchase amount in decreasing order.

In [96]:
sorted_data_desc = final_mean_data.sortBy(lambda x: x[1], ascending=False)

In [105]:
sorted_data_desc.take(20)

[('accessno', 958.0),
 ('FORD', 956.25),
 ('gigi', 946.5),
 ('9071976', 935.5),
 ('Blue1', 931.5),
 ('18101961', 929.5),
 ('6021969', 916.0),
 ('madzia1', 915.75),
 ('qqq123', 915.0),
 ('peterj', 911.0),
 ('antigone', 906.5),
 ('4133', 900.25),
 ('15121960', 900.0),
 ('10691', 898.8),
 ('darthvader', 898.5),
 ('remembe', 898.0),
 ('13011963', 897.25),
 ('9394', 894.5),
 ('kranta1', 892.0),
 ('9021966', 889.0)]

### 1.2. Store the sorted results in local disc.

In [106]:
sorted_data_desc.saveAsTextFile('output/sorted_data_desc/txt')

In [114]:
! ls output/sorted_data_desc/txt

part-00000  part-00003	part-00006  part-00009	part-00012
part-00001  part-00004	part-00007  part-00010	part-00013
part-00002  part-00005	part-00008  part-00011	_SUCCESS


In [111]:
! cat output/sorted_data_desc/txt/part-00000 | head

('accessno', 958.0)
('FORD', 956.25)
('gigi', 946.5)
('9071976', 935.5)
('Blue1', 931.5)
('18101961', 929.5)
('6021969', 916.0)
('madzia1', 915.75)
('qqq123', 915.0)
('peterj', 911.0)
cat: write error: Broken pipe


In [116]:
! cat output/sorted_data_desc/txt/part-00003 | head

('qader', 561.1875)
('54132442', 561.1818181818181)
('magpies', 561.1818181818181)
('17111962', 561.1818181818181)
('300184', 561.1818181818181)
('goomba', 561.1666666666666)
('24041975', 561.1666666666666)
('boosted', 561.1666666666666)
('krypton', 561.1666666666666)
('1Samanth', 561.1666666666666)
cat: write error: Broken pipe


### 1.3. Print the first 20 rows of the sorted dataset.

In [117]:
sorted_data_desc.take(20)

[('accessno', 958.0),
 ('FORD', 956.25),
 ('gigi', 946.5),
 ('9071976', 935.5),
 ('Blue1', 931.5),
 ('18101961', 929.5),
 ('6021969', 916.0),
 ('madzia1', 915.75),
 ('qqq123', 915.0),
 ('peterj', 911.0),
 ('antigone', 906.5),
 ('4133', 900.25),
 ('15121960', 900.0),
 ('10691', 898.8),
 ('darthvader', 898.5),
 ('remembe', 898.0),
 ('13011963', 897.25),
 ('9394', 894.5),
 ('kranta1', 892.0),
 ('9021966', 889.0)]

In [132]:
type(sorted_data_desc)

pyspark.rdd.PipelinedRDD

## 2. Filter the average purchase of each user dataset (the dataset formed during Task 1), and keep all data rows where purchase >100 and purchase < 900.

In [151]:
avg_btw_100_900 = sorted_data_desc.filter(lambda x: x[1] > 100 and x[1] < 900).collect()

In [152]:
avg_btw_100_900

[('10691', 898.8),
 ('darthvader', 898.5),
 ('remembe', 898.0),
 ('13011963', 897.25),
 ('9394', 894.5),
 ('kranta1', 892.0),
 ('9021966', 889.0),
 ('31101965', 889.0),
 ('bykemo', 886.3333333333334),
 ('nivek1', 885.5714285714286),
 ('18111983', 885.4),
 ('260980', 882.5),
 ('981', 882.5),
 ('25051970', 881.6),
 ('openopen', 879.0),
 ('70861', 878.4),
 ('4268', 877.0),
 ('cloud', 875.6666666666666),
 ('151065', 874.0),
 ('galatasara', 874.0),
 ('Zealots', 871.75),
 ('lyndon', 869.5),
 ('muffin12', 868.5),
 ('stunner1', 867.5),
 ('Hd764nW5d7E1vb1', 865.3333333333334),
 ('nyranger', 860.6666666666666),
 ('21081', 860.3333333333334),
 ('150495', 858.2),
 ('larsen', 855.6666666666666),
 ('squids', 855.5),
 ('john1968', 855.3333333333334),
 ('beachs', 854.4285714285714),
 ('gandalf0', 853.5),
 ('gritty', 853.0),
 ('motorhead', 851.0),
 ('just1n', 850.8571428571429),
 ('reaper1', 850.6666666666666),
 ('mcgraw', 848.0),
 ('thelast1', 847.8),
 ('11081987', 847.75),
 ('20051992', 847.333333333

In [157]:
type(avg_btw_100_900)

list

## 3. Find the minimum and maximum on purchase amount inside the dataset formed during Task 2.

In [165]:
rdd_avg_btw_100_900 = sc.parallelize(avg_btw_100_900)

In [166]:
type(rdd_avg_btw_100_900)

pyspark.rdd.RDD

In [167]:
rdd_avg_btw_100_900

ParallelCollectionRDD[217] at parallelize at PythonRDD.scala:195

In [215]:
min_avg_pur = rdd_avg_btw_100_900.map(lambda x : x[1]).min()
max_avg_pur = rdd_avg_btw_100_900.map(lambda x : x[1]).max()

print('Minimum average amount between 100 and 900 is: ', min_avg_pur)
print('Maximum average amount between 100 and 900 is: ', max_avg_pur)

Minimum average amount between 100 and 900 is:  104.66666666666667
Maximum average amount between 100 and 900 is:  898.8


## 4. Create broadcast data variable which will keep the minimum and maximum values.

### 4.1. Use the broadcast variable to implement Min/Max Scaler.

In [216]:
# define the broadcast dict file, it will broadcast the values to the nodes
# to do
params = sc.broadcast({"min":104.66666666666667, "max":898.8})

def norm(v):
    # define broadcast dict
    return (v-params.value["min"])/(params.value["max"]-params.value["min"])

In [217]:
type(params)

pyspark.broadcast.Broadcast

In [218]:
#example 
params.value["min"]/params.value["max"]

0.11645156504969591

### 4.2. Apply Min/Max Scaler on the dataset formed in Task 2.

In [219]:
type(rdd_avg_btw_100_900)

pyspark.rdd.RDD

In [227]:
answer = rdd_avg_btw_100_900.map(lambda x : norm (x[1]))

### 4.3 Output the first 10 rows from Min/Max Scanner output.

In [228]:
answer.take(10)

[1.0,
 0.999622229684352,
 0.9989926124916052,
 0.998048186702485,
 0.9945852921423775,
 0.9914372061786434,
 0.9876595030221625,
 0.9876595030221625,
 0.9843015446608463,
 0.9833421279861845]

## 5. Use caching to achieve best possible optimization.

In [229]:
rdd_avg_btw_100_900.cache()

ParallelCollectionRDD[217] at parallelize at PythonRDD.scala:195

In [230]:
min_value = rdd_avg_btw_100_900.map(lambda x : x[1]).min()
max_value = rdd_avg_btw_100_900.map(lambda x : x[1]).max()
open_sum = rdd_avg_btw_100_900.map(lambda x : x[1]).sum()
volume_std = rdd_avg_btw_100_900.map(lambda x : x[1]).stdev()


print(min_value)
print(max_value)
print(open_sum)
print(volume_std)

104.66666666666667
898.8
49823587.30897473
96.5480318636
