# PySpark Project-Get a handle on using Python with Spark through this hands-on data processing spark python tutorial.

## This series of PySpark project will look at installing Apache Spark on the cluster and explore various data analysis tasks using PySpark for various big data and data science applications.

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

In [3]:
import os
import sys

In [4]:
spark_path = 'E:\spark-2.0.1-bin-hadoop2.7\spark-2.0.1-bin-hadoop2.7'

In [5]:
os.environ['SPARK_HOME'] = spark_path
os.environ['HADOOP_HOME'] = spark_path

In [6]:
sys.path.append(spark_path + "/bin")
sys.path.append(spark_path + "/python")
sys.path.append(spark_path + "/python/pypark/")
sys.path.append(spark_path + "/python/lib")
sys.path.append(spark_path + "/python/lib/pyspark.zip")
sys.path.append(spark_path + "/python/lib/py4j-0.10.3-src")
               

In [7]:
from pyspark import SparkContext

In [8]:
from pyspark import SparkConf

In [9]:
sc = SparkContext("local",'test123')

In [10]:
sc

<pyspark.context.SparkContext at 0x9adf978>

In [12]:
sc.stop()

# What id RDD (Resilient Distributed Data)


In [13]:
sc = SparkContext(master='local[2]')

In [14]:
sc

<pyspark.context.SparkContext at 0x9b2d940>

In [15]:
data = [12,32,45,65,67,89]

In [16]:
print data

[12, 32, 45, 65, 67, 89]


In [17]:
#parallelization
rdd1 = sc.parallelize(data)

In [18]:
print rdd1

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:475


In [19]:
rdd1.collect()

[12, 32, 45, 65, 67, 89]

In [20]:
rdd1.count()

6

In [23]:
sc.version

u'2.0.1'

In [24]:
sc.pythonVer

'2.7'

In [25]:
sc.master

u'local[2]'

In [26]:
rdd1.getNumPartitions()

2

In [27]:
rdd1.count()

6

In [28]:
sc.parallelize([]).isEmpty()

True

In [29]:
sc.parallelize([data]).isEmpty()

False

In [30]:
# basic statistics
rdd1.collect()

[12, 32, 45, 65, 67, 89]

In [31]:
rdd1.max()

89

In [32]:
rdd1.min()

12

In [33]:
rdd1.stdev()

25.203615260954571

In [34]:
rdd1.mean()

51.666666666666664

In [35]:
rdd1.variance()

635.2222222222222

In [36]:
rdd1.stats()

(count: 6, mean: 51.6666666667, stdev: 25.203615261, max: 89.0, min: 12.0)

In [21]:
import numpy as np
from pyspark.mllib.stat import Statistics

In [25]:
mat = sc.parallelize(
    [np.array([10.1,12.4,14.5,16.8,21]),np.array([21.3,24.2,35.4,36.4,31.7]),np.array([21.1,23.,54.,65.,71.])]
)

In [26]:
print mat

ParallelCollectionRDD[6] at parallelize at PythonRDD.scala:475


In [28]:
summary=Statistics.colStats(mat)

In [29]:
summary.mean()

array([ 17.5       ,  19.86666667,  34.63333333,  39.4       ,  41.23333333])

In [30]:
summary.variance()

array([  41.08      ,   42.17333333,  390.50333333,  587.56      ,
        693.16333333])

In [31]:
summary.numNonzeros()

array([ 3.,  3.,  3.,  3.,  3.])

In [32]:
# calculate correlation
X = sc.parallelize([10.1,12.4,14.5,16.8,21])
Y = sc.parallelize([21.3,24.2,35.4,36.4,31.7])

In [33]:
corr = Statistics.corr(X,Y,method='pearson')

In [34]:
corr

0.6779641435411099

In [35]:
from pyspark.mllib.linalg import Matrices, Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.stat import Statistics

In [36]:
vec = Vectors.dense(10.1,12.4,14.5,16.8,21,21.3,24.2,35.4,36.4,31.7)

In [37]:
vec

DenseVector([10.1, 12.4, 14.5, 16.8, 21.0, 21.3, 24.2, 35.4, 36.4, 31.7])

In [38]:
goodnestest = Statistics.chiSqTest(vec)

In [40]:
print (goodnestest)

Chi squared test summary:
method: pearson
degrees of freedom = 9 
statistic = 35.878284182305634 
pValue = 4.166733496191455E-5 
Very strong presumption against null hypothesis: observed follows the same distribution as expected..


In [41]:
# take the wine quality dataset
data = pd.read_csv("https://archive.ics.uci.edu/ml/machine-learning-databases/wine/wine.data",header=None, sep=',')

In [42]:
data.head()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11,12,13
0,1,14.23,1.71,2.43,15.6,127,2.8,3.06,0.28,2.29,5.64,1.04,3.92,1065
1,1,13.2,1.78,2.14,11.2,100,2.65,2.76,0.26,1.28,4.38,1.05,3.4,1050
2,1,13.16,2.36,2.67,18.6,101,2.8,3.24,0.3,2.81,5.68,1.03,3.17,1185
3,1,14.37,1.95,2.5,16.8,113,3.85,3.49,0.24,2.18,7.8,0.86,3.45,1480
4,1,13.24,2.59,2.87,21.0,118,2.8,2.69,0.39,1.82,4.32,1.04,2.93,735


In [43]:
from pyspark.mllib.regression import LabeledPoint, LinearRegressionModel, LinearRegressionWithSGD

In [44]:
sc.stop()

In [45]:
sc = SparkContext(appName='MLAlgo')

In [46]:
data = sc.textFile("C:\Users\Dell\Documents\winequality.csv") \
       .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)>1)\
    .map(lambda line: (line[0],line[3],line[2]))\
    .collect()

In [47]:
print (data)

[(u'1', u'2.43', u'1.71'), (u'1', u'2.14', u'1.78'), (u'1', u'2.67', u'2.36'), (u'1', u'2.5', u'1.95'), (u'1', u'2.87', u'2.59'), (u'1', u'2.45', u'1.76'), (u'1', u'2.45', u'1.87'), (u'1', u'2.61', u'2.15'), (u'1', u'2.17', u'1.64'), (u'1', u'2.27', u'1.35'), (u'1', u'2.3', u'2.16'), (u'1', u'2.32', u'1.48'), (u'1', u'2.41', u'1.73'), (u'1', u'2.39', u'1.73'), (u'1', u'2.38', u'1.87'), (u'1', u'2.7', u'1.81'), (u'1', u'2.72', u'1.92'), (u'1', u'2.62', u'1.57'), (u'1', u'2.48', u'1.59'), (u'1', u'2.56', u'3.1'), (u'1', u'2.28', u'1.63'), (u'1', u'2.65', u'3.8'), (u'1', u'2.36', u'1.86'), (u'1', u'2.52', u'1.6'), (u'1', u'2.61', u'1.81'), (u'1', u'3.22', u'2.05'), (u'1', u'2.62', u'1.77'), (u'1', u'2.14', u'1.72'), (u'1', u'2.8', u'1.9'), (u'1', u'2.21', u'1.68'), (u'1', u'2.7', u'1.5'), (u'1', u'2.36', u'1.66'), (u'1', u'2.36', u'1.83'), (u'1', u'2.7', u'1.53'), (u'1', u'2.65', u'1.8'), (u'1', u'2.41', u'1.81'), (u'1', u'2.84', u'1.64'), (u'1', u'2.55', u'1.65'), (u'1', u'2.1', u'1.5'),

In [52]:
# apply logistic regression model using MLLIB in pyspark
parsed_data = [LabeledPoint(0.0,[14.23,1.71,2.43,15.6]),
              LabeledPoint(0.0,[13.2,1.78,2.14,11.2]),
              LabeledPoint(1.0,[21.3,32.4,3.5,21.4]),
              LabeledPoint(1.0,[12.4,21.4,21.7,32.8]),
              LabeledPoint(2.0,[21,65,45,21]),
              LabeledPoint(2.0,[21.5,76.8,54.6,54.9])]

In [53]:
parsed_data

[LabeledPoint(0.0, [14.23,1.71,2.43,15.6]),
 LabeledPoint(0.0, [13.2,1.78,2.14,11.2]),
 LabeledPoint(1.0, [21.3,32.4,3.5,21.4]),
 LabeledPoint(1.0, [12.4,21.4,21.7,32.8]),
 LabeledPoint(2.0, [21.0,65.0,45.0,21.0]),
 LabeledPoint(2.0, [21.5,76.8,54.6,54.9])]

In [54]:
from pyspark.mllib.classification import LogisticRegressionModel, LogisticRegressionWithLBFGS, LogisticRegressionWithSGD

In [55]:
model = LogisticRegressionWithLBFGS.train(sc.parallelize(parsed_data),numClasses=3)

In [56]:
model.intercept

0.0

In [57]:
model.weights

DenseVector([-3.0592, 1.5696, -0.5426, 1.5941, -6.3434, 1.8031, 2.3828, 0.0215])