# Intro to Spark

In [1]:
%pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [2]:
try:
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SparkSession
except ImportError as e:
    print("Restart kernel")

In [3]:
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

sc.setLogLevel("ERROR")

spark = SparkSession \
    .builder \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/01/07 18:54:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Variance, Covariance

In [4]:
import random
rddX = sc.parallelize(random.sample(list(range(100)),100))
rddY = sc.parallelize(random.sample(list(range(100)),100))

In [5]:
meanX = rddX.sum()/float(rddX.count())
meanY = rddY.sum()/float(rddY.count())
print (meanX)
print (meanY)

                                                                                

49.5
49.5


Covariance

In [6]:
rddXY = rddX.zip(rddY)
covXY = rddXY.map(lambda x_y : (x_y[0]-meanX)*(x_y[1]-meanY)).sum()/rddXY.count()
covXY

-80.94

Correlation

In [7]:
from math import sqrt
n = rddXY.count()
sdX = sqrt(rddX.map(lambda x : pow(x-meanX,2)).sum()/n)
sdY = sqrt(rddY.map(lambda x : pow(x-meanY,2)).sum()/n)
print (sdX)
print (sdY)

28.86607004772212
28.86607004772212


In [8]:
corrXY = covXY / (sdX * sdY)
corrXY

-0.09713771377137713

## Correlation Matrix


In [9]:
from pyspark.mllib.stat import Statistics

COUNT = 100

data = sc.range(COUNT).map(lambda x: [x, x+COUNT, COUNT - x, random.randint(0, COUNT)] )
print(Statistics.corr(data))

[[ 1.          1.         -1.          0.02844611]
 [ 1.          1.         -1.          0.02844611]
 [-1.         -1.          1.         -0.02844611]
 [ 0.02844611  0.02844611 -0.02844611  1.        ]]


In [21]:
# with SQL
from pyspark.sql.functions import col

data = spark.range(0, COUNT).select(col("id").cast("int"))
data.printSchema()

data.select(data['id'], COUNT - data['id']).toPandas()


root
 |-- id: integer (nullable = false)



Unnamed: 0,id,(100 - id)
0,0,100
1,1,99
2,2,98
3,3,97
4,4,96
...,...,...
95,95,5
96,96,4
97,97,3
98,98,2
