# Env Set up

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install pyspark
!pip install -q findspark
!pip show pyspark
!pip install memory_profiler
%load_ext memory_profiler

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 29 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 14.4 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=74bd3b3a3da7074d74eb057d39749c7ee0ce80f0a9e886427c18adad2e4a3ad3
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0
Name: pyspark
Version: 3.2.0
Summary: Apache Spark Python API
Home-page: https://github.com/apache/spark/tree/master/python
Author: Spark Developers
Author-email: dev@spark.apache.org
License: http://www.apache.org/lic

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/usr/local/lib/python3.7/dist-packages/pyspark"

import pyspark
sc = pyspark.SparkContext("local[*]").getOrCreate()
sc

In [3]:
import findspark
findspark.init()

# Data set up

In [4]:
!wget https://raw.githubusercontent.com/aliswh/architectures-for-big-data/main/logDataset.csv
log_path = '/content/logDataset.csv'
data = sc.textFile(log_path)
data

--2021-11-04 12:12:02--  https://raw.githubusercontent.com/aliswh/architectures-for-big-data/main/logDataset.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 118114 (115K) [text/plain]
Saving to: ‘logDataset.csv’


2021-11-04 12:12:02 (76.9 MB/s) - ‘logDataset.csv’ saved [118114/118114]



/content/logDataset.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [5]:
data.count() # includes header

856

In [6]:
data.take(10)

['CompID,CompName,CompRef,ErrorCode,ErrorText,ErrorType,FeederID,GoodPickUps,HammerGW.ts_load,InsertDate,McID,Milliseconds,NozzleNum,NozzleSize,Slot,Station,SubSlot,TimeStamp,Type',
 '076333,002930,0,No error,,1,10511,0,2021-10-29T06:56:02.905Z,2021-10-29T07:57:16.663Z,02,611,0,0,174,32,0,2021-10-29T07:57:16.610Z,null',
 '049198,075897,0,No error,,1,298522,12,2021-10-29T06:56:03.145Z,2021-10-29T07:57:18.710Z,10,513,0,0,194,88,0,2021-10-29T07:57:18.513Z,null',
 '041960,058189,0,No error,,1,86425640,0,2021-10-29T06:56:03.145Z,2021-10-29T07:57:45.443Z,06,433,0,0,27,24,0,2021-10-29T07:57:45.433Z,-1',
 '038137,031878,AC232,8000D701,,0,86425640,0,2021-10-29T06:56:03.145Z,2021-10-29T07:57:50.787Z,03,772,1,8,145,65,0,2021-10-29T07:57:50.773Z,-1',
 '031989,008144,AC231,8000D701,,0,86425640,0,2021-10-29T06:56:03.145Z,2021-10-29T07:57:50.850Z,09,782,1,5,191,36,0,2021-10-29T07:57:50.783Z,-1',
 '010575,075971,AC232,8000D701,,0,86425640,56,2021-10-29T06:56:03.145Z,2021-10-29T07:57:50.913Z,02,792,1,5

Write an algorithm to distribute the computation of mean and median over a dataset using Map/Reduce model.

*I choose to compute the mean and the median over the 'Milliseconds' (12th) column.*

In [7]:
# some preprocessing
headers = data.first()  
rdd =  data.filter(lambda line: line != headers)
rdd = rdd.map(lambda line: line.split(","))
rdd.take(5)[0][11] # take first 'Milliseconds' value from first element in the file 

'611'

# Naive Solution

Especially the median is not scalable.
* The median is a descriptive stat on data that is based on _position_
* Our RDD is not ordered, so to get the central position (aka the median) we need to order it
* Sorting has a cost...
* ...but also collecting all the data in main memory to select the central value!
Can we do better?

In [8]:
rdd.map(lambda x: int(x[11])).sum()/rdd.count() # mean

513.706432748538

In [9]:
rdd.sortBy(lambda x: int(x[11])).collect()[rdd.count()//2][11] # median

'505'

# (Hopefully) better solution

## Computing the mean
Strategy: use an **accumulator** for the count of all elements in the RDD.
Computing the mean without an accumulator...

In [10]:
from operator import add
pairs = rdd.map(lambda x: (int(x[11]), 1)).reduceByKey(add)
count = pairs.map(lambda x: x[1]).sum()
mean = pairs.map(lambda x: x[0]*x[1]).reduce(add)/count

In [11]:
print(f"Example of pairs: {pairs.collect()[:5]}\nresulting count of elements: {count}\nresulting mean: {round(mean,1)}") # check

Example of pairs: [(772, 1), (782, 2), (792, 3), (922, 2), (352, 1)]
resulting count of elements: 855
resulting mean: 513.7


...versus computing with an accumulator.

In [12]:
count_accumulator = sc.accumulator(pairs.map(lambda x: x[1]).sum())
print(f"Accumulator value: {count_accumulator}")
mean = pairs.map(lambda x: x[0]*x[1]).reduce(add)/count_accumulator.value
print(f"Mean: {round(mean,1)}")

Accumulator value: 855
Mean: 513.7


## Computing the median

We don't want to collect a huge array in main memory only to get one value, instead we can
* use TakeOrdered, which is an Action that doesn't need to the sortByKey transformation, however it doesn't work if we have duplicate values
* order the RDD, **index** its elements, filter based on the condition that the index is equal to the rdd.count(), which we stored in a **broadcast variable**

In [13]:
rdd.map(lambda x: int(x[11])).takeOrdered(count_accumulator.value//2)[-1] # not the real median, because 'take' and 'takeOrderd' don't consider duplicate values

503

In [14]:
count_broadcast = sc.broadcast(count_accumulator.value)
def is_broadcast(x):
  return int(x[1]) == count_broadcast.value//2
  
rdd.map(lambda x: (int(x[11]), 1)).sortByKey().zipWithIndex().filter(is_broadcast).collect()

[((505, 1), 427)]

# Conclusions
What can be done better?
* The final solution shares the `pairs` RDD
* I don't think there could be improvements for the `mean`, there aren't many different ways of computing sums and multiplication that I can think of, and the accumulator is the best practice that I ended up implementing
* A lot can be said about the `median`: I don't think this is the final best solution, also because the dataset provided is so small that it is diffucult to measure the efficiency of the "smartest" solution. 
  * We still need to sort the RDD, which comes with all the problems of the sorting process
  * The zipWithIndex() function could be problematic, maybe there are better ways of indexing a sorted RDD, or not doing it at all
  * The filter is applied over all the RDD, while we wish that it stops checking for a median when it is found.
  * Still, this solution doesn't require to store all the sorted array in main memory, because we collect only what is not filter by the condition

## Time and RAM consumed by both processes (including printing time).

In [15]:
%%memit
count = rdd.count()
print(rdd.map(lambda x: int(x[11])).sum()/count) # mean
print(rdd.sortBy(lambda x: x[11]).collect()[count//2][11]) # median

513.706432748538
559
peak memory: 133.05 MiB, increment: 0.61 MiB


In [16]:
%%time
count = rdd.count()
print(rdd.map(lambda x: int(x[11])).sum()/count) # mean
print(rdd.sortBy(lambda x: x[11]).collect()[count//2][11]) # median

513.706432748538
559
CPU times: user 77.4 ms, sys: 9.8 ms, total: 87.2 ms
Wall time: 790 ms


In [17]:
%%memit
pairs = rdd.map(lambda x: (int(x[11]), 1))
count_accumulator = sc.accumulator(pairs.count())
count_broadcast = sc.broadcast(count_accumulator.value)

def is_broadcast(x):
  return int(x[1]) == count_broadcast.value//2

print(pairs.reduceByKey(add).map(lambda x: x[0]*x[1]).reduce(add)/count_accumulator.value) # mean
print(pairs.sortByKey().zipWithIndex().filter(is_broadcast).collect()[0][0][0]) # median

513.706432748538
505
peak memory: 133.28 MiB, increment: 0.03 MiB


In [18]:
%%time
pairs = rdd.map(lambda x: (int(x[11]), 1))
count_accumulator = sc.accumulator(pairs.count())
count_broadcast = sc.broadcast(count_accumulator.value)

def is_broadcast(x):
  return int(x[1]) == count_broadcast.value//2

print(pairs.reduceByKey(add).map(lambda x: x[0]*x[1]).reduce(add)/count_accumulator.value) # mean
print(pairs.sortByKey().zipWithIndex().filter(is_broadcast).collect()[0][0][0]) # median

513.706432748538
505
CPU times: user 101 ms, sys: 12.1 ms, total: 114 ms
Wall time: 1.04 s


## Docker
Note: This notebook was successfully run on Docker using the [Jupyter PySpark Notebook](https://hub.docker.com/r/jupyter/pyspark-notebook), by running 

`Docker run -p 8888:8888 jupyter/pyspark-notebook` 

to download the image, and 

`docker cp C:\Users\alice\Downloads\logDataset.csv <container-id>:/log.csv` 

to import the .csv file.
