In [1]:
# RDD's are the central first class citizen data structure in ApacheSpark
# so using the RDD API is crucial to working with Apache Spark

# RDD = Resilient Distributed Datasets
# Thus RDD's are going to be the first class citizen data structure for 
# use when using Apache Spark for distributed data science processing
# operations

# RDD's are schemaless and lazy, which means essentially
# that they only organize themselves in response to a specific operation
# or query.  They do not have an inherent scheme 

# If we want to treat data more like a relational database then instead
# of dealing with RDD's directly, we will want to use ApacheSparkSQL and
# or the *Dataframe* API.  When the dataframe API and or ApacheSparkSQL is
# used instead of dealing with the RDD's then the dataframe structure acts 
# as a structure built around the RDD.  This notebook only deals with RDD's 
# directly.  The dataframe we are talking about here should not be confused
# a pandas dataframe.

Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20210430223106-0001
KERNEL_ID = 943638e3-2ed5-4573-89c2-4a0dad989851


In [2]:
# generate two identical RDD's containing numbers running from  0 to 99: 

rddX = sc.parallelize(list(range(100)))
rddY = sc.parallelize(list(range(100)))

In [3]:
# get the mean (average) of each:

meanX = rddX.sum()/rddX.count()
meanY = rddY.sum()/rddY.count()

In [4]:
# print the mean of each

print(meanX)
print(meanY)

49.5
49.5


In [5]:
# We want to use these two rdd's at the same time, so
# we can "zip" them together into a new RDD which combines the two
# this is essentially an RDD where the elements are each tuples
# show the first 10 tuples in the newly created RDD

rddXY = rddX.zip(rddY)
rddXY.take(10)

[(0, 0),
 (1, 1),
 (2, 2),
 (3, 3),
 (4, 4),
 (5, 5),
 (6, 6),
 (7, 7),
 (8, 8),
 (9, 9)]

In [6]:
# This creates and RDD with tuple values made up of the values of each of the two RDD's above.
# This kind of resembles the behavior of a table in a relational database (with the limitations)
# on RDD's mentioned above

# We can subtract the mean from each value of X and each value of Y
# and then take the sum.
# This gives us the Covariance of the two datasets X and Y:

In [7]:
covXY = rddXY.map(lambda x__y : (x__y[0]-meanX)*(x__y[1]-meanY) ).sum() / rddXY.count()
covXY

833.25

In [8]:
# Next, we need to get the standard deviation of each dataset
# for that we use the number of elements in the zipped together
# series (of tuples)

from math import sqrt
n = rddXY.count()
sdX = sqrt(rddX.map(lambda x : pow(x-meanX,2)).sum()/n)
sdY = sqrt(rddY.map(lambda y : pow(y-meanY,2)).sum()/n)
print(sdX)
print(sdY)

28.86607004772212
28.86607004772212


In [9]:
# The standard deviation of the two is the same, since they are identical

# Next we calculate the Pearson's correlation between the two datasets


corrXY = covXY / (sdX * sdY)
corrXY

1.0

In [10]:
# since these two sets of observations are identical, they have a 
# corr = +B1.0

In [11]:
# here we reverse the second list
# and keep everything else the same

rddX = sc.parallelize(list(range(100)))
rddY = sc.parallelize(reversed(list(range(100))))

meanX = rddX.sum()/rddX.count()
meanY = rddY.sum()/rddY.count()

print(meanX)
print(meanY)

49.5
49.5


In [12]:
# We want to use these two rdd's at the same time, so
# we can "zip" them together

rddXY = rddX.zip(rddY)

covXY = rddXY.map(lambda x__y : (x__y[0]-meanX)*(x__y[1]-meanY) ).sum() / rddXY.count()

sdX = sqrt(rddX.map(lambda x : pow(x-meanX,2)).sum()/n)
sdY = sqrt(rddY.map(lambda y : pow(y-meanY,2)).sum()/n)

corrXY = covXY / (sdX * sdY)
corrXY

-1.0

In [13]:
# since one list is reversed, then the two lists
# are perfectly negatively correlated

In [14]:
# next, we use two random lists:

import random

rddX = sc.parallelize(list(random.sample(range(100),100)))
rddY = sc.parallelize(list(random.sample(range(100),100)))

meanX = rddX.sum()/rddX.count()
meanY = rddY.sum()/rddY.count()

rddXY = rddX.zip(rddY)

covXY = rddXY.map(lambda x__y : (x__y[0]-meanX)*(x__y[1]-meanY) ).sum() / rddXY.count()

sdX = sqrt(rddX.map(lambda x : pow(x-meanX,2)).sum()/n)
sdY = sqrt(rddY.map(lambda y : pow(y-meanY,2)).sum()/n)

corrXY = covXY / (sdX * sdY)
corrXY

-0.131989198919892

In [15]:
# the correlation is close to zero
# since the two datasets were randomly generated
# there is little or no correlation between the two datasets

In [16]:
# again, randomly:

rddX = sc.parallelize(list(random.sample(range(100),100)))
rddY = sc.parallelize(list(random.sample(range(100),100)))

meanX = rddX.sum()/rddX.count()
meanY = rddY.sum()/rddY.count()

rddXY = rddX.zip(rddY)

covXY = rddXY.map(lambda x__y : (x__y[0]-meanX)*(x__y[1]-meanY) ).sum() / rddXY.count()

sdX = sqrt(rddX.map(lambda x : pow(x-meanX,2)).sum()/n)
sdY = sqrt(rddY.map(lambda y : pow(y-meanY,2)).sum()/n)

corrXY = covXY / (sdX * sdY)
corrXY

-0.12519651965196518

In [17]:
# the correlation is still close to zero

In [18]:
# next we will consider a correlation matrix
# we create a number of rdd's in apache spark: column 1, etc.

In [19]:
# Here, we can use mllib, which is from the distributed
# machine learning library of apache spark
# we are also using the RDD API at the same time
# could just use the code above to generate every correlation
# value in the matrix, but this is easier

from pyspark.mllib.stat import Statistics

column1 = sc.parallelize(list(range(100)))
column2 = sc.parallelize(list(range(100,200)))
column3 = sc.parallelize(list(reversed(range(100))))
column4 = sc.parallelize(list(random.sample(range(100),100)))

In [20]:
# Above we have generated four different RDD's ,but the 
# library we have imported expects a more relational database
# table-like structure
# Therefore we use the zip function from the RDD API to
# put them together
# If we zip column1 and column2 together:

In [21]:
data1 = column1.zip(column2)
data1.take(15)

[(0, 100),
 (1, 101),
 (2, 102),
 (3, 103),
 (4, 104),
 (5, 105),
 (6, 106),
 (7, 107),
 (8, 108),
 (9, 109),
 (10, 110),
 (11, 111),
 (12, 112),
 (13, 113),
 (14, 114)]

In [22]:
# if we kind of "stack" the zipping together, this happens:

In [23]:
data2 = column1.zip(column2).zip(column3)
data2.take(15)

[((0, 100), 99),
 ((1, 101), 98),
 ((2, 102), 97),
 ((3, 103), 96),
 ((4, 104), 95),
 ((5, 105), 94),
 ((6, 106), 93),
 ((7, 107), 92),
 ((8, 108), 91),
 ((9, 109), 90),
 ((10, 110), 89),
 ((11, 111), 88),
 ((12, 112), 87),
 ((13, 113), 86),
 ((14, 114), 85)]

In [24]:
# we end up with a kind of nested tuple structure
# this looks like it would be a problem, but let's keep going:

data3 = column1.zip(column2).zip(column3).zip(column4)
data3.take(15)

[(((0, 100), 99), 63),
 (((1, 101), 98), 62),
 (((2, 102), 97), 51),
 (((3, 103), 96), 0),
 (((4, 104), 95), 94),
 (((5, 105), 94), 4),
 (((6, 106), 93), 30),
 (((7, 107), 92), 50),
 (((8, 108), 91), 14),
 (((9, 109), 90), 66),
 (((10, 110), 89), 88),
 (((11, 111), 88), 7),
 (((12, 112), 87), 24),
 (((13, 113), 86), 92),
 (((14, 114), 85), 39)]

In [25]:
#  Next we need to use a lambda function to take this nested tuple structure and turn it into 
#  a list of tuples containing four elements each
#  The course was written in Python 2.* so we have to create the Python 3.* code ourselves here:


data4 = column1.zip(column2).zip(column3).zip(column4).map(lambda a__b__c__d : (a__b__c__d[0][0][0],a__b__c__d[0][0][1],a__b__c__d[0][1],a__b__c__d[1]) )
data4.take(15)

[(0, 100, 99, 63),
 (1, 101, 98, 62),
 (2, 102, 97, 51),
 (3, 103, 96, 0),
 (4, 104, 95, 94),
 (5, 105, 94, 4),
 (6, 106, 93, 30),
 (7, 107, 92, 50),
 (8, 108, 91, 14),
 (9, 109, 90, 66),
 (10, 110, 89, 88),
 (11, 111, 88, 7),
 (12, 112, 87, 24),
 (13, 113, 86, 92),
 (14, 114, 85, 39)]

In [26]:
# So amazingly, that worked

# Next we need to figure out out to tranform this structure that contains these 4 datasets into a correlation matrix between
# the four datasets.

# but we still need one more transform:
# We need each of the tuples to be arrays instead, so that we an rdd of arrays, with 4 elements in each array:

In [27]:
data = column1.zip(column2).zip(column3).zip(column4).map(lambda a__b__c__d : (a__b__c__d[0][0][0],a__b__c__d[0][0][1],a__b__c__d[0][1],a__b__c__d[1]) ).map(lambda abcd:[abcd[0],abcd[1],abcd[2],abcd[3]])
data.take(15)

[[0, 100, 99, 63],
 [1, 101, 98, 62],
 [2, 102, 97, 51],
 [3, 103, 96, 0],
 [4, 104, 95, 94],
 [5, 105, 94, 4],
 [6, 106, 93, 30],
 [7, 107, 92, 50],
 [8, 108, 91, 14],
 [9, 109, 90, 66],
 [10, 110, 89, 88],
 [11, 111, 88, 7],
 [12, 112, 87, 24],
 [13, 113, 86, 92],
 [14, 114, 85, 39]]

In [28]:
# Now we really have something that looks like a relational database table.

#   *** Data preparation is more than 80% of the work involved in data science.  ***

# Once we have the data in this particular format, the generation of the correlation matrix
# is relatively simple:

Statistics.corr(data)

array([[ 1.        ,  1.        , -1.        , -0.03714371],
       [ 1.        ,  1.        , -1.        , -0.03714371],
       [-1.        , -1.        ,  1.        ,  0.03714371],
       [-0.03714371, -0.03714371,  0.03714371,  1.        ]])

In [29]:
# There is redundant information in this as in all correlation matrixes, 
# since the top left to bottom right diagonal is all correlation of 
# datasets with themselves, which is always 1.  All of hte interesting
# information is contained in the upper rectangular matrix not including
# the diagonal:

# 1          -1            0.0874
#            -1            0.0874
#                         -0.0874

# Both Covariance and correlation determines the degree of linear relationship
# between two datasets.  Correlation is a normalized and unitless measure of 
# the linearity whereas Covariance is not normalized and also has units.

In [30]:
# [end of notebook on covariance and correlation - FFH]