# Experience Parallel Programming on Apache Spark

## Averages

**1st Statistical Moment.**

### Set up the environment

In [1]:
from pyspark import SparkContext
sc = SparkContext()

### Generate the data

In [20]:
range(100)

range(0, 100)

In [110]:
list(list(range(100)) + [100, 1000, 10000])

[0,
 1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 18,
 19,
 20,
 21,
 22,
 23,
 24,
 25,
 26,
 27,
 28,
 29,
 30,
 31,
 32,
 33,
 34,
 35,
 36,
 37,
 38,
 39,
 40,
 41,
 42,
 43,
 44,
 45,
 46,
 47,
 48,
 49,
 50,
 51,
 52,
 53,
 54,
 55,
 56,
 57,
 58,
 59,
 60,
 61,
 62,
 63,
 64,
 65,
 66,
 67,
 68,
 69,
 70,
 71,
 72,
 73,
 74,
 75,
 76,
 77,
 78,
 79,
 80,
 81,
 82,
 83,
 84,
 85,
 86,
 87,
 88,
 89,
 90,
 91,
 92,
 93,
 94,
 95,
 96,
 97,
 98,
 99,
 100,
 1000,
 10000]

In [113]:
rdd = sc.parallelize(list(range(100)) + [100, 1000, 10000])

### Mean
Calculate the mean of the data generated.

In [114]:
sum = rdd.sum()
n = rdd.count()
mean = sum / n
print(mean)

155.8252427184466


### Median

The median is an **out-lier** resistant version of the mean.

In [115]:
sortedAndIndexed = rdd.sortBy(lambda x : x).zipWithIndex().map(lambda val: (val[1], val[0]))
n = sortedAndIndexed.count()
if(n % 2 == 1):
    index = (n - 1) / 2
    print(sortedAndIndexed.lookup(index)[0])
else:
    index1 = (n / 2) - 1
    index2 = n / 2
    value1 = sortedAndIndexed.lookup(index1)[0]
    value2 = sortedAndIndexed.lookup(index2)[0]
    print((value1 + value2) / 2)

51


## Standard Deviation

The **2nd Statistical Moment**. 
How wide the data is spread around the mean.

In [116]:
from math import sqrt
sd = sqrt(rdd.map(lambda x : pow(x - mean, 2)).sum() / n)
sd

979.5845902523644

## Skewness

Skewness is the **3rd Statistical Moment**. It measures how asymmetric data is spread about the mean.<br>
![](images/skewness.png)

In [152]:
rdd = sc.parallelize(list(range(100)) + [1000]*10000)
sum = rdd.sum()
n = rdd.count()
mean = sum / n
sd = sqrt(rdd.map(lambda x : pow(x - mean, 2)).sum() / n)
print(mean, sd)

990.5891089108911 94.15273288177377


In [153]:
skewness = rdd.map(lambda x : pow(x - mean, 3) / pow(sd, 3)).sum()
skewness

-100132.30520317465

Now, add the normalization term

In [154]:
skewness = (1 / n) * rdd.map(lambda x : pow(x - mean, 3) / pow(sd, 3)).sum()
skewness

-9.914089624076698

In [156]:
rdd = sc.parallelize(list(range(100)) + [-1000]*10000)
sum = rdd.sum()
n = rdd.count()
mean = sum / n
sd = sqrt(rdd.map(lambda x : pow(x - mean, 2)).sum() / n)
skewness = (1 / n) * rdd.map(lambda x : pow(x - mean, 3) / pow(sd, 3)).sum()
skewness

9.911560206061798

## Kurtosis

- **4th Statistical Moment**
- The higher the kurtosis measure is, the more outliers are present and the longer the tails of the distribution in the histogram are.
- **Outlier**: In statistics, an outlier is a data point that differs significantly from other observations.

![kurtosis](images/kurtosis.png)

In [159]:
kurtosis = 1 / n * rdd.map(lambda x : pow(x - mean, 4) / pow(sd, 4)).sum()
kurtosis

99.3167193433139

## Covariance, Covariance matrices, Correlation

We firstly take an example where the 2 columns of data totally correlate with each other, with +1 as their correlation.

### Covariance

In [209]:
rddX = sc.parallelize(range(100))
rddY = sc.parallelize(range(100))

In [210]:
meanX = rddX.sum() / rddX.count()
meanY = rddY.sum() / rddY.count()
print(meanX)
print(meanY)

49.5
49.5


In [211]:
rddXY = rddX.zip(rddY)
rddXY.take(10)

[(0, 0),
 (1, 1),
 (2, 2),
 (3, 3),
 (4, 4),
 (5, 5),
 (6, 6),
 (7, 7),
 (8, 8),
 (9, 9)]

In [212]:
covXY = rddXY.map(lambda row : (row[0] - meanX) * (row[1] - meanY)).sum() / rddXY.count()
covXY

833.25

### Correlation

In [213]:
from math import sqrt
n = rddXY.count()
sdX = sqrt(rddX.map(lambda x: pow(x - meanX, 2)).sum() / n)
sdY = sqrt(rddX.map(lambda x: pow(x - meanY, 2)).sum() / n)
print(sdX)
print(sdY)

28.86607004772212
28.86607004772212


In [214]:
corrXY = covXY / (sdX * sdY)
corrXY

1.0

### Correlation Matrix

In [225]:
import random
from pyspark.mllib.stat import Statistics
column1 = sc.parallelize(range(100)).repartition(1)
column2 = sc.parallelize(range(100, 200)).repartition(1)
column3 = sc.parallelize(reversed(range(100))).repartition(1)
column4 = sc.parallelize(random.sample(range(100), 100)).repartition(1)
data = column1.zip(column2).zip(column3).zip(column4).map(lambda row: [row[0][0][0], row[0][0][1], row[0][1], row[1]])
data.take(10)

[[0, 100, 99, 63],
 [1, 101, 98, 9],
 [2, 102, 97, 79],
 [3, 103, 96, 80],
 [4, 104, 95, 76],
 [5, 105, 94, 72],
 [6, 106, 93, 97],
 [7, 107, 92, 17],
 [8, 108, 91, 39],
 [9, 109, 90, 78]]

In [226]:
Statistics.corr(data)

array([[ 1.        ,  1.        , -1.        , -0.05220522],
       [ 1.        ,  1.        , -1.        , -0.05220522],
       [-1.        , -1.        ,  1.        ,  0.05220522],
       [-0.05220522, -0.05220522,  0.05220522,  1.        ]])