### [Get Started with PySpark and Jupyter Notebook in 3 Minutes](https://blog.sicara.com/get-started-pyspark-jupyter-guide-tutorial-ae2fe84f594f)

### Importing Libraries
  * #### Part 1: Essential Libraries to
    * Find Spark Installation &nbsp;&nbsp;&nbsp;&nbsp;>>> **import** findspark
    * Initiate a Spark Instance &nbsp;>>> **findspark.init()**

In [5]:
import findspark; findspark.init()

  * #### Part 2: Importing Essential pyspark Modules

In [6]:
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import *
from pyspark import StorageLevel
import time

### Loading Spark

In [7]:
conf = (SparkConf()\
.setMaster("local[*]")\
.setAppName("db2-q2-spark-rdd-c")\
.set("spark.executor.memory", "3g")\
.set("spark.driver.memory", "5g"))

sc = SparkContext(conf = conf)

### Create Dataset from csv file (RDD Approach)

In [8]:
lines = sc.textFile("db2_project_data.csv")

In [9]:
def getColumns(row, indexes):
    row = row.split(',')
    tmp = [row[i] for i in indexes]
    return tmp

In [11]:
consumption = lines.map(lambda s: getColumns(s, [0,2]));
header = consumption.first()
consumption = consumption.filter(lambda line : line != header)

### Repartition Dataset RDD using Hash Partitioning

In [12]:
dataset = sc.parallelize(consumption.collect())

In [13]:
dataset

ParallelCollectionRDD[5] at parallelize at PythonRDD.scala:489

In [None]:
# See the first 5 rows for a glimpse of the RDD structure
dataset.take(5)

In [14]:
slices = 50
wp = dataset.partitionBy(slices) # Hash Partitioning By Default

  * View Hash Buckets 

In [None]:
wp.map(lambda t: t[0]).glom().collect()

  * Make Hashed Partitioned RDD Persistent to Speed-Up Computations

In [15]:
wp.persist(StorageLevel.MEMORY_AND_DISK)

MapPartitionsRDD[9] at mapPartitions at PythonRDD.scala:436

### Execute a Query from Question (a)

### &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; a.iv: Average Distance (Kilometers) per Customer 

In [30]:
st = time.time()
tplRes = (0,0) # As of Python3, you can't pass a literal sequence to a function.
result = wp.mapValues(lambda v: (v, 1))\
.reduceByKey(lambda a,b: ((float(a[0])+float(b[0]), float(a[1])+float(b[1]))))\
.mapValues(lambda v: v[0]/v[1])
fn = time.time() - st

result.persist(StorageLevel.MEMORY_AND_DISK)
print ('Query Computation Time:', fn, ' seconds.')
print ('Average Distance (Kilometers) per Customer [Using Hash Partitioning]:')
print (result.take(10))
print ('Cumulative Computation Time:', time.time() - st, ' seconds.')

Query Computation Time: 0.001003265380859375  seconds.
Average Distance (Kilometers) per Customer [Using Hash Partitioning]:
[('9257094-CALEND-040-Καθημερινή-11', 592.7304328574816), ('9257206-CALEND-040-Καθημερινή-11', 574.3728026539111), ('9056408-CALEND-040-Σάββατο-07', 565.5172956532501), ('8135866-CALEND-049-Σάββατο-06', 519.8642560001414), ('9257033-CALEND-040-Καθημερινή-11', 592.73043287189), ('8135785-CALEND-049-Σάββατο-06', 510.27910324150736), ('9759640-CALEND-304-305-Σάββατο-10', 658.918602015489), ('9257061-CALEND-040-Καθημερινή-11', 592.7304328576909), ('9586372-CALEND-217-229-Σάββατο-14', 578.3699267903174), ('9759639-CALEND-304-305-Σάββατο-10', 658.0476338313363)]
Cumulative Computation Time: 2.8515617847442627  seconds.


### Stopping Spark Context... 

In [31]:
sc.stop()