In [2]:
# import spark 
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [3]:
# SparkContext is the entry point for spark RDD. 
# SparkSession is the new entry point for all context, including sql context, hive context etc. 
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
spark = SparkSession.builder.getOrCreate()

## Exercise 2
### Part 1
Now let's calculate covariance and correlation by ourselves using ApacheSpark

1st we crate two random RDD’s, which shouldn't correlate at all.

In [6]:
import random
rddX = sc.parallelize(random.sample(list(range(100)),100))
rddY = sc.parallelize(random.sample(list(range(100)),100))


Now we calculate the mean, note that we explicitly cast the denominator to float in order to obtain a float instead of int

In [8]:
meanX = rddX.sum()/rddX.count()
meanY = rddY.sum()/rddY.count()
print(meanX, meanY)

49.5 49.5


Now we calculate the covariance

In [10]:
rddXY = rddX.zip(rddY)
rddXY.take(10)

[(92, 66),
 (4, 40),
 (85, 28),
 (44, 83),
 (84, 9),
 (51, 90),
 (29, 94),
 (32, 53),
 (17, 67),
 (33, 46)]

In [13]:
covXY = rddXY.map(lambda x_y: (x_y[0]-meanX)*(x_y[1]-meanY)).sum()/rddXY.count()
covXY

-82.04

Covariance is not a normalized measure. Therefore we use it to calculate correlation. But before that we need to calculate the indivicual standard deviations first

In [14]:
from math import sqrt

In [15]:
n = rddXY.count()
sdX = sqrt(rddX.map(lambda x: pow(x-meanX,2)).sum()/n)
sdY = sqrt(rddY.map(lambda y: pow(y-meanY,2)).sum()/n)

In [16]:
print(sdX, sdY)

28.86607004772212 28.86607004772212


In [17]:
corrXY = covXY/(sdX*sdY)

In [18]:
corrXY

-0.09845784578457846

## Part 2

Now we want to create a correlation matrix out of the four RDDS used in the lecture

In [19]:
from pyspark.mllib.stat import Statistics
import random

In [21]:
column1 = sc.parallelize(range(100))
column2 = sc.parallelize(range(100,200))
column3 = sc.parallelize(list(reversed(range(100))))
column4 = sc.parallelize(random.sample(range(100),100))


In [23]:
column1.zip(column2).zip(column3).zip(column4).take(10)

[(((0, 100), 99), 85),
 (((1, 101), 98), 3),
 (((2, 102), 97), 77),
 (((3, 103), 96), 65),
 (((4, 104), 95), 22),
 (((5, 105), 94), 48),
 (((6, 106), 93), 97),
 (((7, 107), 92), 71),
 (((8, 108), 91), 94),
 (((9, 109), 90), 26)]

In [24]:
data = column1.zip(column2).zip(column3).zip(column4)\
.map(lambda a_b_c_d : (a_b_c_d[0][0][0],a_b_c_d[0][0][1],a_b_c_d[0][1],a_b_c_d[1]) )\
.map(lambda a_b_c_d : [a_b_c_d[0],a_b_c_d[1],a_b_c_d[2],a_b_c_d[3]])

In [25]:
data.take(10)

[[0, 100, 99, 85],
 [1, 101, 98, 3],
 [2, 102, 97, 77],
 [3, 103, 96, 65],
 [4, 104, 95, 22],
 [5, 105, 94, 48],
 [6, 106, 93, 97],
 [7, 107, 92, 71],
 [8, 108, 91, 94],
 [9, 109, 90, 26]]

In [26]:
Statistics.corr(data)

array([[ 1.        ,  1.        , -1.        , -0.03450345],
       [ 1.        ,  1.        , -1.        , -0.03450345],
       [-1.        , -1.        ,  1.        ,  0.03450345],
       [-0.03450345, -0.03450345,  0.03450345,  1.        ]])