In [2]:
!pip install pyspark



In [3]:
!pip install findspark


Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [5]:
import findspark
findspark.init()

In [6]:
from pyspark import SparkContext
from pyspark import SparkConf

In [7]:
import pyspark

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.sql("select 'spark' as hello ")

df.show()

+-----+
|hello|
+-----+
|spark|
+-----+



In [4]:
#read csv file with spark

In [5]:
df = spark.read.csv("C:\\Users\\randas\\Desktop\\SparkForML\\Exercise Files\\Ch01\\01_04\\employee.txt",header=True)

In [6]:
df.show(4)

+---+-----------+--------------------+--------+------------+------------+------+--------------------+---------+
| id|  last_name|               email|  gender|  department|  start_date|salary|           job_title|region_id|
+---+-----------+--------------------+--------+------------+------------+------+--------------------+---------+
|  1|   'Kelley'|'rkelley0@soundcl...|'Female'| 'Computers'| '10/2/2009'| 67470|'Structural Engin...|        2|
|  2|'Armstrong'|'sarmstrong1@info...|  'Male'|    'Sports'| '3/31/2008'| 71869| 'Financial Advisor'|        2|
|  3|     'Carr'|'fcarr2@woothemes...|  'Male'|'Automotive'| '7/12/2009'|101768|'Recruiting Manager'|        3|
|  4|   'Murray'|   'jmurray3@gov.uk'|'Female'|  'Jewelery'|'12/25/2014'| 96897|'Desktop Support ...|        3|
+---+-----------+--------------------+--------+------------+------------+------+--------------------+---------+
only showing top 4 rows



In [13]:
df.schema

StructType(List(StructField(id,StringType,true),StructField(last_name,StringType,true),StructField(email,StringType,true),StructField(gender,StringType,true),StructField(department,StringType,true),StructField(start_date,StringType,true),StructField(salary,StringType,true),StructField(job_title,StringType,true),StructField(region_id,StringType,true)))

In [15]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- department: string (nullable = true)
 |-- start_date: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- region_id: string (nullable = true)



In [16]:
df.columns

['id',
 'last_name',
 'email',
 'gender',
 'department',
 'start_date',
 'salary',
 'job_title',
 'region_id']

In [20]:
sample_df = df.sample(False,.10)

In [21]:
sample_df.count()

94

In [26]:
sample_df.filter("salary >= 100000").count()

38

In [12]:
from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import Vectors

In [28]:
# standarize numeric data 

In [20]:
features_df =spark.createDataFrame([(1,Vectors.dense([10.0,10000.00,1.0])),(2,Vectors.dense([20.0,30000.00,2.0])),(3,Vectors.dense([30.0,40000.00,3.0]))],["id","features"])

In [21]:
features_df.take(1)

[Row(id=1, features=DenseVector([10.0, 10000.0, 1.0]))]

In [22]:
features_df.show()

+---+------------------+
| id|          features|
+---+------------------+
|  1|[10.0,10000.0,1.0]|
|  2|[20.0,30000.0,2.0]|
|  3|[30.0,40000.0,3.0]|
+---+------------------+



In [23]:
feature_stand_scaler = StandardScaler(inputCol="features",outputCol="sfeatures",withStd=True,withMean=True)

In [24]:
stand_model = feature_stand_scaler.fit(features_df)

In [25]:
stand_sfeatures_df = stand_model.transform(features_df)

In [26]:
stand_sfeatures_df.take(1)

[Row(id=1, features=DenseVector([10.0, 10000.0, 1.0]), sfeatures=DenseVector([-1.0, -1.0911, -1.0]))]

In [27]:
stand_sfeatures_df.show()

+---+------------------+--------------------+
| id|          features|           sfeatures|
+---+------------------+--------------------+
|  1|[10.0,10000.0,1.0]|[-1.0,-1.09108945...|
|  2|[20.0,30000.0,2.0]|[0.0,0.2182178902...|
|  3|[30.0,40000.0,3.0]|[1.0,0.8728715609...|
+---+------------------+--------------------+



In [30]:
#Bucketize numeric data :: group data based on boudaries

In [31]:
from pyspark.ml.feature import Bucketizer

In [33]:
splits =[-float("inf"),-10.0,0.0,10.0,float("inf")]

In [39]:
b_data = [(-800.0,),(-10.5,),(-1.7,),(0.0,),(8.2,),(90.1,)]

In [40]:
b_df = spark.createDataFrame(b_data,["features"])

In [41]:
b_df.show()

+--------+
|features|
+--------+
|  -800.0|
|   -10.5|
|    -1.7|
|     0.0|
|     8.2|
|    90.1|
+--------+



In [42]:
bucketizer =Bucketizer(splits=splits,inputCol="features",outputCol="bfeatures")

In [43]:
bucketizer_df = bucketizer.transform(b_df)

In [45]:
bucketizer_df.show()

+--------+---------+
|features|bfeatures|
+--------+---------+
|  -800.0|      0.0|
|   -10.5|      0.0|
|    -1.7|      1.0|
|     0.0|      2.0|
|     8.2|      2.0|
|    90.1|      3.0|
+--------+---------+



In [52]:
### Tokenize text data 
from pyspark.ml.feature import Tokenizer

In [53]:
sentences_df = spark.createDataFrame([(1,"test test"),(2,"ML ML"),(3,"Spark Spark")],["id","sentence"])

In [54]:
sentences_df.show()

+---+-----------+
| id|   sentence|
+---+-----------+
|  1|  test test|
|  2|      ML ML|
|  3|Spark Spark|
+---+-----------+



In [56]:
sen_token =Tokenizer(inputCol="sentence",outputCol="words")

In [57]:
sent_df = sen_token.transform(sentences_df)

In [59]:
sent_df.show()

+---+-----------+--------------+
| id|   sentence|         words|
+---+-----------+--------------+
|  1|  test test|  [test, test]|
|  2|      ML ML|      [ml, ml]|
|  3|Spark Spark|[spark, spark]|
+---+-----------+--------------+



In [64]:
### clustering 

In [61]:
from pyspark.ml.linalg import Vectors

In [62]:
from pyspark.ml.feature import VectorAssembler

In [63]:
from pyspark.ml.clustering import KMeans

In [66]:
df_clustering = spark.read.csv("C:\\Users\\randas\\Desktop\\SparkForML\\Exercise Files\\Ch03\\03_02\\clustering_dataset.csv",header=True)

In [67]:
df_clustering.show()

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   7|   4|   1|
|   7|   7|   9|
|   7|   9|   6|
|   1|   6|   5|
|   6|   7|   7|
|   7|   9|   4|
|   7|  10|   6|
|   7|   8|   2|
|   8|   3|   8|
|   4|  10|   5|
|   7|   4|   5|
|   7|   8|   4|
|   2|   5|   1|
|   2|   6|   2|
|   2|   3|   8|
|   3|   9|   1|
|   4|   2|   9|
|   1|   7|   1|
|   6|   2|   3|
|   4|   1|   9|
+----+----+----+
only showing top 20 rows



In [69]:
## Try vector assembler 
# What is it : A feature transformer that merges multiple columns into a vector column. 
# do some transformation  : put the feature into vector 

In [70]:
kmeans = KMeans().setK(3)

In [71]:
kmeans = kmeans.setSeed(1)

In [74]:
vectorAssembler = VectorAssembler(inputCols=["col1","col2","col3"],outputCol="features")

In [76]:
df_clustering

DataFrame[col1: string, col2: string, col3: string]

In [1]:
############### Working with Iris dataset 

In [53]:
from pyspark.sql.functions import * 
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer

In [54]:
iris_df = spark.read.csv("C:\\Users\\randas\\Desktop\\SparkForML\\Exercise Files\\iris.data")

In [55]:
iris_df.show()

+---+---+---+---+-----------+
|_c0|_c1|_c2|_c3|        _c4|
+---+---+---+---+-----------+
|5.1|3.5|1.4|0.2|Iris-setosa|
|4.9|3.0|1.4|0.2|Iris-setosa|
|4.7|3.2|1.3|0.2|Iris-setosa|
|4.6|3.1|1.5|0.2|Iris-setosa|
|5.0|3.6|1.4|0.2|Iris-setosa|
|5.4|3.9|1.7|0.4|Iris-setosa|
|4.6|3.4|1.4|0.3|Iris-setosa|
|5.0|3.4|1.5|0.2|Iris-setosa|
|4.4|2.9|1.4|0.2|Iris-setosa|
|4.9|3.1|1.5|0.1|Iris-setosa|
|5.4|3.7|1.5|0.2|Iris-setosa|
|4.8|3.4|1.6|0.2|Iris-setosa|
|4.8|3.0|1.4|0.1|Iris-setosa|
|4.3|3.0|1.1|0.1|Iris-setosa|
|5.8|4.0|1.2|0.2|Iris-setosa|
|5.7|4.4|1.5|0.4|Iris-setosa|
|5.4|3.9|1.3|0.4|Iris-setosa|
|5.1|3.5|1.4|0.3|Iris-setosa|
|5.7|3.8|1.7|0.3|Iris-setosa|
|5.1|3.8|1.5|0.3|Iris-setosa|
+---+---+---+---+-----------+
only showing top 20 rows



In [56]:
iris_df = iris_df.select(col("_c0").cast('double').alias("sepal_length"),
                         col("_c1").cast('double').alias("sepal_width"),
                         col("_c2").cast('double').alias("petal_length"),
                         col("_c3").cast('double').alias("petal_width"),
                         col("_c4").alias("species"))

In [57]:
iris_df.schema

StructType(List(StructField(sepal_length,DoubleType,true),StructField(sepal_width,DoubleType,true),StructField(petal_length,DoubleType,true),StructField(petal_width,DoubleType,true),StructField(species,StringType,true)))

In [58]:
iris_df.show()

+------------+-----------+------------+-----------+-----------+
|sepal_length|sepal_width|petal_length|petal_width|    species|
+------------+-----------+------------+-----------+-----------+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|
|         4.7|        3.2|         1.3|        0.2|Iris-setosa|
|         4.6|        3.1|         1.5|        0.2|Iris-setosa|
|         5.0|        3.6|         1.4|        0.2|Iris-setosa|
|         5.4|        3.9|         1.7|        0.4|Iris-setosa|
|         4.6|        3.4|         1.4|        0.3|Iris-setosa|
|         5.0|        3.4|         1.5|        0.2|Iris-setosa|
|         4.4|        2.9|         1.4|        0.2|Iris-setosa|
|         4.9|        3.1|         1.5|        0.1|Iris-setosa|
|         5.4|        3.7|         1.5|        0.2|Iris-setosa|
|         4.8|        3.4|         1.6|        0.2|Iris-setosa|
|         4.8|        3.0|         1.4| 

In [59]:
iris_df.take(1)

[Row(sepal_length=5.1, sepal_width=3.5, petal_length=1.4, petal_width=0.2, species='Iris-setosa')]

In [60]:
vectorAssembler = VectorAssembler(inputCols=["sepal_length","sepal_width","petal_length","petal_width"],outputCol="features")

In [61]:
viris_df = vectorAssembler.transform(iris_df)

In [62]:
viris_df.take(1)

[Row(sepal_length=5.1, sepal_width=3.5, petal_length=1.4, petal_width=0.2, species='Iris-setosa', features=DenseVector([5.1, 3.5, 1.4, 0.2]))]

In [63]:
## use String Inder, which is mapping the textual data to numerical value with keeping the categoricat context.

In [64]:
indexer = StringIndexer(inputCol="species",outputCol="label")
iviris = indexer.fit(viris_df).transform(viris_df)

In [67]:
iviris.show()

+------------+-----------+------------+-----------+-----------+-----------------+-----+
|sepal_length|sepal_width|petal_length|petal_width|    species|         features|label|
+------------+-----------+------------+-----------+-----------+-----------------+-----+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|[5.1,3.5,1.4,0.2]|  0.0|
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|[4.9,3.0,1.4,0.2]|  0.0|
|         4.7|        3.2|         1.3|        0.2|Iris-setosa|[4.7,3.2,1.3,0.2]|  0.0|
|         4.6|        3.1|         1.5|        0.2|Iris-setosa|[4.6,3.1,1.5,0.2]|  0.0|
|         5.0|        3.6|         1.4|        0.2|Iris-setosa|[5.0,3.6,1.4,0.2]|  0.0|
|         5.4|        3.9|         1.7|        0.4|Iris-setosa|[5.4,3.9,1.7,0.4]|  0.0|
|         4.6|        3.4|         1.4|        0.3|Iris-setosa|[4.6,3.4,1.4,0.3]|  0.0|
|         5.0|        3.4|         1.5|        0.2|Iris-setosa|[5.0,3.4,1.5,0.2]|  0.0|
|         4.4|        2.9|      

In [68]:
#### Naive Bayes classification 

In [87]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [88]:
splits = iviris.randomSplit([0.6,.4],1)

In [89]:
train_df = splits[0]

In [90]:
test_df = splits[1]

In [91]:
train_df.count()

98

In [92]:
test_df.count()

52

In [93]:
iviris.count()

150

In [94]:
nb  = NaiveBayes(modelType="multinomial")

In [95]:
nbmodel = nb.fit(train_df)

In [96]:
predections_df = nbmodel.transform(test_df)

In [97]:
predections_df.take(1)

[Row(sepal_length=4.3, sepal_width=3.0, petal_length=1.1, petal_width=0.1, species='Iris-setosa', features=DenseVector([4.3, 3.0, 1.1, 0.1]), label=0.0, rawPrediction=DenseVector([-9.9894, -11.3476, -11.902]), probability=DenseVector([0.7118, 0.183, 0.1051]), prediction=0.0)]

In [98]:
evaluator = MulticlassClassificationEvaluator(labelCol="label",predictionCol="prediction",metricName="accuracy")

In [99]:
nbaccuracy = evaluator.evaluate(predections_df)

In [100]:
nbaccuracy

0.9807692307692307

In [101]:
########## multilaier perceptron 

In [102]:
from pyspark.ml.classification import MultilayerPerceptronClassifier

In [103]:
layers = [4,5,5,3]

In [105]:
mlp =MultilayerPerceptronClassifier(layers=layers,seed=1)

In [106]:
mlp_model = mlp.fit(train_df) ## building the model

In [107]:
predections = mlp_model.transform(test_df)

In [109]:
mlp_evaluator = MulticlassClassificationEvaluator(metricName="accuracy")

In [110]:
mlp_accuracy = mlp_evaluator.evaluate(predections)

In [111]:
mlp_accuracy

0.6923076923076923

In [112]:
###Decision Tree classifications 

In [113]:
from pyspark.ml.classification import DecisionTreeClassifier

In [114]:
dt = DecisionTreeClassifier(labelCol="label",featuresCol="features")

In [115]:
dt_model = dt.fit(train_df)

In [116]:
predictions = dt_model.transform(test_df)

In [117]:
dt_evaluator = MulticlassClassificationEvaluator(labelCol="label",predictionCol="prediction",metricName="accuracy")

In [118]:
dt_accuracy = dt_evaluator.evaluate(predections)

In [119]:
dt_accuracy

0.6923076923076923

In [120]:
#regression 

In [121]:
## linear regression 

In [122]:
from pyspark.ml.regression import LinearRegression

In [127]:
pp_df = spark.read.csv("C:\\Users\\randas\\Desktop\\SparkForML\\Exercise Files\\CCPP\\CCPP\\power_plant.csv",header=True,inferSchema=True)

In [128]:
pp_df

DataFrame[AT: double, V: double, AP: double, RH: double, PE: double]

In [129]:
pp_df.take(1)

[Row(AT=14.96, V=41.76, AP=1024.07, RH=73.17, PE=463.26)]

In [131]:
vectorAssembler =VectorAssembler(inputCols=["AT","V","AP","RH"],outputCol="features")

In [132]:
vpp_df = vectorAssembler.transform(pp_df)

In [133]:
vpp_df.take(1)

[Row(AT=14.96, V=41.76, AP=1024.07, RH=73.17, PE=463.26, features=DenseVector([14.96, 41.76, 1024.07, 73.17]))]

In [134]:
## create linear regression model 

In [135]:
lr = LinearRegression(featuresCol="features",labelCol="PE")

In [136]:
lr_model = lr.fit(vpp_df)

In [137]:
lr_model.coefficients

DenseVector([-1.9775, -0.2339, 0.0621, -0.1581])

In [138]:
lr_model.intercept

454.6092744523414

In [139]:
lr_model.summary.rootMeanSquaredError # how much error 

4.557126016749488

In [143]:
###  Decision Tree Regression 

In [144]:
from pyspark.ml.regression import DecisionTreeRegressor

In [145]:
from pyspark.ml.evaluation import RegressionEvaluator

In [146]:
from pyspark.ml.feature import VectorAssembler

In [147]:
pp_df = spark.read.csv("C:\\Users\\randas\\Desktop\\SparkForML\\Exercise Files\\CCPP\\CCPP\\power_plant.csv",header=True,inferSchema=True)

In [148]:
pp_df.take(1)

[Row(AT=14.96, V=41.76, AP=1024.07, RH=73.17, PE=463.26)]

In [159]:
vectorAssembler = VectorAssembler(inputCols=["AT","V","AP","RH"],outputCol="features")

In [160]:
vpp_df = vectorAssembler.transform(pp_df)

In [161]:
splits = vpp_df.randomSplit([.7,.3])

In [162]:
train_df = splits[0]

In [163]:
test_df = splits[1]

In [164]:
train_df.count()

6703

In [165]:
test_df.count()

2865

In [175]:
vpp_df.count()

9568

In [176]:
dt = DecisionTreeRegressor(featuresCol="features",labelCol="PE")

In [177]:
dt_model = dt.fit(train_df)

In [186]:
prediction = dt_model.transform(test_df)

In [189]:
dt_evaluator = RegressionEvaluator(labelCol="PE",predictionCol="prediction",metricName="rmse")

rmse

In [190]:
rmse = dt_evaluator.evaluate(prediction)

In [191]:
rmse

4.533151006288871

In [192]:
############# Gradiant boosted tree regression 

In [193]:
from pyspark.ml.regression import GBTRegressor

In [194]:
GBT = GBTRegressor(featuresCol="features",labelCol="PE")

In [196]:
gbt_model = GBT.fit(train_df)

In [197]:
gbt_predctions = gbt_model.transform(test_df)

In [198]:
gbt_evaluator = RegressionEvaluator(labelCol="PE",predictionCol="prediction",metricName="rmse")

In [199]:
gbt_evaluator.evaluate(gbt_predctions)

4.030501155333614