This notebook is designed to run in a IBM Watson Studio default runtime (NOT the Watson Studio Apache Spark Runtime as the default runtime with 1 vCPU is free of charge). Therefore, we install Apache Spark in local mode for test purposes only. Please don't use it in production.

In case you are facing issues, please read the following two documents first:

https://github.com/IBM/skillsnetwork/wiki/Environment-Setup

https://github.com/IBM/skillsnetwork/wiki/FAQ

Then, please feel free to ask:

https://coursera.org/learn/machine-learning-big-data-apache-spark/discussions/all

Please make sure to follow the guidelines before asking a question:

https://github.com/IBM/skillsnetwork/wiki/FAQ#im-feeling-lost-and-confused-please-help-me


If running outside Watson Studio, this should work as well. In case you are running in an Apache Spark context outside Watson Studio, please remove the Apache Spark setup in the first notebook cells.

In [6]:
from IPython.display import Markdown, display
def printmd(string):
    display(Markdown('# <span style="color:red">'+string+'</span>'))


if ('sc' in locals() or 'sc' in globals()):
    printmd('<<<<<!!!!! It seems that you are running in a IBM Watson Studio Apache Spark Notebook. Please run it in an IBM Watson Studio Default Runtime (without Apache Spark) !!!!!>>>>>')


In [7]:
!pip install pyspark==2.4.5



In [11]:
pip install --upgrade pyspark cloudpickle

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m7.2 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
Collecting cloudpickle
  Downloading cloudpickle-2.2.1-py3-none-any.whl (25 kB)
Collecting py4j==0.10.9.7
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m44.6 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285412 sha256=3d7af1654c9edd70a0b1ea7c06e1519472eb8269a2697d1ce8f081b3fbfa1d38
  Stored in directory: /tmp/wsuser/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: py4j, pyspar

In [1]:
try:
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SparkSession
except ImportError as e:
    printmd('<<<<<!!!!! Please restart your kernel after installing Apache Spark !!!!!>>>>>')

In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

spark = SparkSession \
    .builder \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/18 08:50:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Exercise 2
## Part 1
Now let's calculate covariance and correlation by ourselves using ApacheSpark

1st we crate two random RDD’s, which shouldn't correlate at all.


In [3]:
import random
rddX = sc.parallelize(random.sample(list(range(100)),100))
rddY = sc.parallelize(random.sample(list(range(100)),100))

Now we calculate the mean, note that we explicitly cast the denominator to float in order to obtain a float instead of int

In [4]:
meanX = rddX.sum()/float(rddX.count())
meanY = rddY.sum()/float(rddY.count())
print (meanX)
print (meanY)

                                                                                

49.5
49.5


Now we calculate the covariance

In [6]:
rddXY = rddX.zip(rddY)
covXY = rddXY.map(lambda x_y : (x_y[0]-meanX)*(x_y[1]-meanY)).sum()/rddXY.count()
covXY

-50.96

Covariance is not a normalized measure. Therefore we use it to calculate correlation. But before that we need to calculate the indivicual standard deviations first

In [8]:
from math import sqrt
n = rddXY.count()
sdX = sqrt(rddX.map(lambda x : pow(x-meanX,2)).sum()/n)
sdY = sqrt(rddY.map(lambda x : pow(x-meanY,2)).sum()/n)
print (sdX)
print (sdY)

28.86607004772212
28.86607004772212


Now we calculate the correlation

In [9]:
corrXY = covXY / (sdX * sdY)
corrXY

-0.06115811581158116

## Part 2
No we want to create a correlation matrix out of the four RDDs used in the lecture

In [15]:
from pyspark.mllib.stat import Statistics
import random
column1 = sc.parallelize(range(100))
column2 = sc.parallelize(range(100,200))
column3 = sc.parallelize(list(reversed(range(100))))
column4 = sc.parallelize(random.sample(range(100),100))
data = column1.zip(column2).zip(column3).zip(column4).map(lambda a_b_c_d : (a_b_c_d[0][0][0],a_b_c_d[0][0][1],a_b_c_d[0][1],a_b_c_d[1]) ).map(lambda a_b_c_d : [a_b_c_d[0],a_b_c_d[1],a_b_c_d[2],a_b_c_d[3]])
print(Statistics.corr(data))

[[ 1.          1.         -1.          0.06967897]
 [ 1.          1.         -1.          0.06967897]
 [-1.         -1.          1.         -0.06967897]
 [ 0.06967897  0.06967897 -0.06967897  1.        ]]


Congratulations, you are done with Exercice 2

## Practice Exercise

### Mean calculation

In [32]:
rddx = sc.parallelize(range(100))
rddy = sc.parallelize(range(100))

nx = rddx.count()
ny = rddy.count()

# print(rddx.collect())
# print(rddy.collect())

meanx = rddx.sum()/(rddx.count())
meany = rddy.sum()/(rddy.count())
print(meany)

49.5
49.5


In [33]:
rddX = sc.parallelize(random.sample(list(range(100)),100))
rddY = sc.parallelize(random.sample(list(range(100)),100))
rddX.collect()
rddY.collect()

meanX = rddX.sum()/float(rddX.count())
meanY = rddX.sum()/float(rddX.count())

meanX = rddX.sum()/float(rddX.count())
meanY = rddY.sum()/float(rddY.count())
print (meanX)
print (meanY)

49.5
49.5


### Standard variation calculation

In [39]:
from math import sqrt
n = rddX.count()
sdX = sqrt(rddX.map(lambda x : pow(x-meanX,2)).sum()/n)
sdY = sqrt(rddY.map(lambda x : pow(x-meanY,2)).sum()/n)

print(sdX)
print(sdY)

28.86607004772212
28.86607004772212


### Covariance calculation

In [40]:
rddXY = rddX.zip(rddY)
covXY = rddXY.map(lambda x_y : (x_y[0]-meanX)*(x_y[1]-meanY)).sum()/rddXY.count()
covXY

-37.93

In [41]:
rddXY = rddX.zip(rddY)
rddXY.collect()

[(2, 86),
 (64, 35),
 (47, 76),
 (15, 72),
 (56, 18),
 (97, 25),
 (58, 92),
 (13, 64),
 (36, 61),
 (11, 41),
 (27, 85),
 (71, 8),
 (88, 77),
 (60, 98),
 (72, 10),
 (62, 97),
 (1, 90),
 (50, 88),
 (76, 1),
 (83, 6),
 (7, 46),
 (99, 47),
 (43, 91),
 (9, 12),
 (12, 13),
 (53, 55),
 (8, 51),
 (78, 30),
 (65, 57),
 (24, 21),
 (40, 15),
 (81, 59),
 (57, 40),
 (96, 53),
 (23, 22),
 (46, 31),
 (79, 87),
 (33, 24),
 (59, 60),
 (45, 16),
 (75, 78),
 (42, 28),
 (31, 48),
 (85, 67),
 (38, 68),
 (94, 27),
 (84, 49),
 (93, 65),
 (32, 74),
 (41, 81),
 (68, 96),
 (5, 45),
 (95, 7),
 (48, 11),
 (19, 26),
 (73, 23),
 (18, 54),
 (74, 9),
 (17, 14),
 (10, 75),
 (37, 89),
 (39, 50),
 (6, 62),
 (52, 2),
 (21, 80),
 (69, 20),
 (44, 83),
 (66, 94),
 (54, 70),
 (20, 56),
 (29, 84),
 (30, 58),
 (63, 79),
 (25, 5),
 (61, 66),
 (77, 95),
 (3, 93),
 (35, 39),
 (92, 29),
 (22, 52),
 (82, 73),
 (49, 32),
 (16, 4),
 (34, 71),
 (0, 37),
 (55, 38),
 (67, 69),
 (90, 82),
 (14, 19),
 (87, 63),
 (86, 36),
 (28, 0),
 (70, 