In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/07/28 09:45:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
spark

# ---DATA PREPARATION / PREPROCESSING---
## 1
Normalize

In [4]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors

In [5]:
features_df = spark.createDataFrame([
    (1, Vectors.dense([10.0,10000.0,1.0]),),
    (2, Vectors.dense([20.0,30000.0,2.0]),),
    (3, Vectors.dense([30.0,40000.0,3.0]),)
],['id','features'])

In [6]:
features_df.show()

                                                                                

+---+------------------+
| id|          features|
+---+------------------+
|  1|[10.0,10000.0,1.0]|
|  2|[20.0,30000.0,2.0]|
|  3|[30.0,40000.0,3.0]|
+---+------------------+



In [7]:
features_df.take(1)

[Row(id=1, features=DenseVector([10.0, 10000.0, 1.0]))]

In [8]:
feature_scaler = MinMaxScaler(inputCol='features',outputCol='scaledfeatures')

In [9]:
scaledmodel = feature_scaler.fit(features_df)

                                                                                

In [10]:
scaledfeatures_df = scaledmodel.transform(features_df)

In [11]:
scaledfeatures_df.show()

                                                                                

+---+------------------+--------------------+
| id|          features|      scaledfeatures|
+---+------------------+--------------------+
|  1|[10.0,10000.0,1.0]|           (3,[],[])|
|  2|[20.0,30000.0,2.0]|[0.5,0.6666666666...|
|  3|[30.0,40000.0,3.0]|       [1.0,1.0,1.0]|
+---+------------------+--------------------+



## 2
Standardize

In [13]:
from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import Vectors

In [14]:
features_df = spark.createDataFrame([
    (1, Vectors.dense([10.0,10000.00,1.0]),),
    (2, Vectors.dense([20.0,30000.00,2.0]),),
    (3, Vectors.dense([30.0,40000.00,3.0]),)
],['id','features'])

In [15]:
feature_stand_scaler = StandardScaler(inputCol='features', outputCol='sfeatures', withStd=True, withMean=True)

In [16]:
stand_smodel = feature_stand_scaler.fit(features_df)

In [17]:
stand_sfeatures_df = stand_smodel.transform(features_df)

In [28]:
stand_sfeatures_df.take(3)

[Row(id=1, features=DenseVector([10.0, 10000.0, 1.0]), sfeatures=DenseVector([-1.0, -1.0911, -1.0])),
 Row(id=2, features=DenseVector([20.0, 30000.0, 2.0]), sfeatures=DenseVector([0.0, 0.2182, 0.0])),
 Row(id=3, features=DenseVector([30.0, 40000.0, 3.0]), sfeatures=DenseVector([1.0, 0.8729, 1.0]))]

## 3
Buckets / Partitions

In [29]:
from pyspark.ml.feature import Bucketizer

In [30]:
splits = [-float('inf'), -10.0,0.0,10.0,float('inf')]

In [34]:
b_data = [(-800.0,),(-10.5,),(-1.7,),(0.0,),(8.2,),(90.1,)]

In [35]:
b_df = spark.createDataFrame(b_data, ['features'])

In [36]:
b_df.show()

+--------+
|features|
+--------+
|  -800.0|
|   -10.5|
|    -1.7|
|     0.0|
|     8.2|
|    90.1|
+--------+



In [37]:
bucketizer = Bucketizer(splits=splits, inputCol='features' , outputCol='bfeatures')

In [39]:
bucketed_df = bucketizer.transform(b_df)

In [40]:
bucketed_df.show()

+--------+---------+
|features|bfeatures|
+--------+---------+
|  -800.0|      0.0|
|   -10.5|      0.0|
|    -1.7|      1.0|
|     0.0|      2.0|
|     8.2|      2.0|
|    90.1|      3.0|
+--------+---------+



## 4
Text data / Tokenize

In [41]:
from pyspark.ml.feature import Tokenizer

In [52]:
sentence_df = spark.createDataFrame([(1,'This is a sentence'),(2,'MLlib is really neat'),(3,'I love it')], ['id','sentences'])

In [53]:
sentence_df.show()

+---+--------------------+
| id|           sentences|
+---+--------------------+
|  1|  This is a sentence|
|  2|MLlib is really neat|
|  3|           I love it|
+---+--------------------+



In [56]:
sent_token = Tokenizer(inputCol='sentences',outputCol='words')

In [57]:
sent_tokenized_df = sent_token.transform(sentence_df)

In [58]:
sent_tokenized_df.show()

+---+--------------------+--------------------+
| id|           sentences|               words|
+---+--------------------+--------------------+
|  1|  This is a sentence|[this, is, a, sen...|
|  2|MLlib is really neat|[mllib, is, reall...|
|  3|           I love it|       [i, love, it]|
+---+--------------------+--------------------+



In [59]:
from pyspark.ml.feature import HashingTF, IDF

In [63]:
sentence_df

DataFrame[id: bigint, sentences: string]

In [64]:
hashingTF = HashingTF(inputCol='words',outputCol='rawFeatures',numFeatures=20)

In [66]:
sent_hfTF_df = hashingTF.transform(sent_tokenized_df)
sent_hfTF_df.show()

+---+--------------------+--------------------+--------------------+
| id|           sentences|               words|         rawFeatures|
+---+--------------------+--------------------+--------------------+
|  1|  This is a sentence|[this, is, a, sen...|(20,[7,9,12,13],[...|
|  2|MLlib is really neat|[mllib, is, reall...|(20,[6,9,12,15],[...|
|  3|           I love it|       [i, love, it]|(20,[0,6,16],[1.0...|
+---+--------------------+--------------------+--------------------+



In [67]:
sent_hfTF_df.take(3)

[Row(id=1, sentences='This is a sentence', words=['this', 'is', 'a', 'sentence'], rawFeatures=SparseVector(20, {7: 1.0, 9: 1.0, 12: 1.0, 13: 1.0})),
 Row(id=2, sentences='MLlib is really neat', words=['mllib', 'is', 'really', 'neat'], rawFeatures=SparseVector(20, {6: 1.0, 9: 1.0, 12: 1.0, 15: 1.0})),
 Row(id=3, sentences='I love it', words=['i', 'love', 'it'], rawFeatures=SparseVector(20, {0: 1.0, 6: 1.0, 16: 1.0}))]

In [68]:
idf = IDF(inputCol='rawFeatures',outputCol='idf_features')

In [69]:
idfModel = idf.fit(sent_hfTF_df)

                                                                                

In [72]:
tfidf_df = idfModel.transform(sent_hfTF_df)
tfidf_df.show()

+---+--------------------+--------------------+--------------------+--------------------+
| id|           sentences|               words|         rawFeatures|        idf_features|
+---+--------------------+--------------------+--------------------+--------------------+
|  1|  This is a sentence|[this, is, a, sen...|(20,[7,9,12,13],[...|(20,[7,9,12,13],[...|
|  2|MLlib is really neat|[mllib, is, reall...|(20,[6,9,12,15],[...|(20,[6,9,12,15],[...|
|  3|           I love it|       [i, love, it]|(20,[0,6,16],[1.0...|(20,[0,6,16],[0.6...|
+---+--------------------+--------------------+--------------------+--------------------+



In [73]:
tfidf_df.take(1)

[Row(id=1, sentences='This is a sentence', words=['this', 'is', 'a', 'sentence'], rawFeatures=SparseVector(20, {7: 1.0, 9: 1.0, 12: 1.0, 13: 1.0}), idf_features=SparseVector(20, {7: 0.6931, 9: 0.2877, 12: 0.2877, 13: 0.6931}))]

### To sum it up!
MinMaxScaler scales attributes from -1 to 1, normalize data

SrandardScaler same but tries a normal dist, scales data

Bucketizer groups values by creating partitions

Tokenizer splits strings to words

HashingTF creates frequency of words from text

# ---Clustering---
## 1
K-means

In [75]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

In [89]:
cluster_df = spark.read.csv('/Users/aaron/Downloads/Ex_Files_Spark_ML_AI/Exercise Files/Ch03/03_02/clustering_dataset.csv',header=True,inferSchema=True)
cluster_df.show(2)

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   7|   4|   1|
|   7|   7|   9|
+----+----+----+
only showing top 2 rows



In [90]:
vectorAssembler = VectorAssembler(inputCols=['col1','col2','col3'],outputCol='features')

In [91]:
vcluster_df = vectorAssembler.transform(cluster_df)
vcluster_df.show()

+----+----+----+--------------+
|col1|col2|col3|      features|
+----+----+----+--------------+
|   7|   4|   1| [7.0,4.0,1.0]|
|   7|   7|   9| [7.0,7.0,9.0]|
|   7|   9|   6| [7.0,9.0,6.0]|
|   1|   6|   5| [1.0,6.0,5.0]|
|   6|   7|   7| [6.0,7.0,7.0]|
|   7|   9|   4| [7.0,9.0,4.0]|
|   7|  10|   6|[7.0,10.0,6.0]|
|   7|   8|   2| [7.0,8.0,2.0]|
|   8|   3|   8| [8.0,3.0,8.0]|
|   4|  10|   5|[4.0,10.0,5.0]|
|   7|   4|   5| [7.0,4.0,5.0]|
|   7|   8|   4| [7.0,8.0,4.0]|
|   2|   5|   1| [2.0,5.0,1.0]|
|   2|   6|   2| [2.0,6.0,2.0]|
|   2|   3|   8| [2.0,3.0,8.0]|
|   3|   9|   1| [3.0,9.0,1.0]|
|   4|   2|   9| [4.0,2.0,9.0]|
|   1|   7|   1| [1.0,7.0,1.0]|
|   6|   2|   3| [6.0,2.0,3.0]|
|   4|   1|   9| [4.0,1.0,9.0]|
+----+----+----+--------------+
only showing top 20 rows



In [116]:
kmeans = KMeans().setK(3).setSeed(1)

In [117]:
kmodel = kmeans.fit(vcluster_df)

In [118]:
centers = kmodel.clusterCenters()
centers

[array([35.88461538, 31.46153846, 34.42307692]),
 array([80.        , 79.20833333, 78.29166667]),
 array([5.12, 5.84, 4.84])]

## 2
Hierarchy clustering

In [106]:
vcluster_df.show(2)

+----+----+----+-------------+
|col1|col2|col3|     features|
+----+----+----+-------------+
|   7|   4|   1|[7.0,4.0,1.0]|
|   7|   7|   9|[7.0,7.0,9.0]|
+----+----+----+-------------+
only showing top 2 rows



In [107]:
from pyspark.ml.clustering import BisectingKMeans

In [119]:
bkmeans = BisectingKMeans().setK(3).setSeed(1)

In [120]:
bkmodel = bkmeans.fit(vcluster_df)

In [121]:
bkcenters = bkmodel.clusterCenters()
bkcenters

[array([5.12, 5.84, 4.84]),
 array([35.88461538, 31.46153846, 34.42307692]),
 array([80.        , 79.20833333, 78.29166667])]

# ---Classification---
## 1
Preprocessing

In [1]:
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer

In [11]:
iris_df = spark.read.csv('/Users/aaron/Downloads/iris.data', inferSchema=True)

In [15]:
iris_df.show(2)

+------------+-----------+------------+-----------+-----------+
|sepal_lenght|sepal_width|petal_lenght|petal_width|    species|
+------------+-----------+------------+-----------+-----------+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|
+------------+-----------+------------+-----------+-----------+
only showing top 2 rows



In [13]:
iris_df = iris_df.select( col('_c0').alias('sepal_lenght'), \
                          col('_c1').alias('sepal_width'), \
                          col('_c2').alias('petal_lenght'), \
                          col('_c3').alias('petal_width'), \
                          col('_c4').alias('species')   )

In [14]:
iris_df.show(2)

+------------+-----------+------------+-----------+-----------+
|sepal_lenght|sepal_width|petal_lenght|petal_width|    species|
+------------+-----------+------------+-----------+-----------+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|
+------------+-----------+------------+-----------+-----------+
only showing top 2 rows



In [16]:
vectorAssembler = VectorAssembler(inputCols=['sepal_lenght','sepal_width','petal_lenght','petal_width'],outputCol='features')

In [17]:
viris_df = vectorAssembler.transform(iris_df)

In [20]:
viris_df.show(2)

+------------+-----------+------------+-----------+-----------+-----------------+
|sepal_lenght|sepal_width|petal_lenght|petal_width|    species|         features|
+------------+-----------+------------+-----------+-----------+-----------------+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|[5.1,3.5,1.4,0.2]|
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|[4.9,3.0,1.4,0.2]|
+------------+-----------+------------+-----------+-----------+-----------------+
only showing top 2 rows



In [21]:
indexer = StringIndexer(inputCol='species',outputCol='label')

In [22]:
iviris_df = indexer.fit(viris_df).transform(viris_df)

                                                                                

In [25]:
iviris_df.show(2)

+------------+-----------+------------+-----------+-----------+-----------------+-----+
|sepal_lenght|sepal_width|petal_lenght|petal_width|    species|         features|label|
+------------+-----------+------------+-----------+-----------+-----------------+-----+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|[5.1,3.5,1.4,0.2]|  0.0|
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|[4.9,3.0,1.4,0.2]|  0.0|
+------------+-----------+------------+-----------+-----------+-----------------+-----+
only showing top 2 rows



## 2
Naive Bayes

In [27]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [40]:
splits = iviris_df.randomSplit([0.6,0.4],1)

In [41]:
train_df = splits[0]
train_df.count()

98

In [42]:
test_df = splits[1]
test_df.count()

52

In [39]:
nb = NaiveBayes(modelType='multinomial')

In [43]:
nb_model = nb.fit(train_df)

                                                                                

In [44]:
predictions_df = nb_model.transform(test_df)

In [47]:
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')

In [48]:
nbaccuracy = evaluator.evaluate(predictions_df)

                                                                                

In [49]:
nbaccuracy

0.9807692307692307

## 3
Multilayer Perceptron

In [51]:
from pyspark.ml.classification import MultilayerPerceptronClassifier

In [53]:
layers = [4,5,5,3]
mlp = MultilayerPerceptronClassifier(layers=layers, seed=1)

In [54]:
mlp_model = mlp.fit(train_df)

In [55]:
mlp_predictions = mlp_model.transform(test_df)

In [57]:
mlp_evaluator = MulticlassClassificationEvaluator(metricName='accuracy')

In [58]:
mlp_accuracy = mlp_evaluator.evaluate(mlp_predictions)

In [59]:
mlp_accuracy

0.6923076923076923

## 4
Decision trees

In [60]:
from pyspark.ml.classification import DecisionTreeClassifier

In [61]:
dt = DecisionTreeClassifier(labelCol='label', featuresCol='features')

In [62]:
dt_model = dt.fit(train_df)

In [63]:
dt_predictions = dt_model.transform(test_df)

In [66]:
dt_evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')

In [67]:
dt_accuracy = dt_evaluator.evaluate(dt_predictions)

In [68]:
dt_accuracy

0.9423076923076923

# ---Regression---
## 1
Linear regression

In [69]:
from pyspark.ml.regression import LinearRegression

In [70]:
pp_df = spark.read.csv('/Users/aaron/Downloads/CCPP/Folds5x2_pp.csv',header='True',inferSchema='True')

In [71]:
pp_df

DataFrame[AT: double, V: double, AP: double, RH: double, PE: double]

In [72]:
from pyspark.ml.feature import VectorAssembler

In [73]:
vectorAssembler = VectorAssembler(inputCols=['AT','V','AP','RH'],outputCol='features')

In [74]:
vpp_df = vectorAssembler.transform(pp_df)

In [76]:
vpp_df.show(2)

+-----+-----+-------+-----+------+--------------------+
|   AT|    V|     AP|   RH|    PE|            features|
+-----+-----+-------+-----+------+--------------------+
|14.96|41.76|1024.07|73.17|463.26|[14.96,41.76,1024...|
|25.18|62.96|1020.04|59.08|444.37|[25.18,62.96,1020...|
+-----+-----+-------+-----+------+--------------------+
only showing top 2 rows



In [77]:
lr = LinearRegression(featuresCol='features', labelCol='PE')

In [78]:
lr_model = lr.fit(vpp_df)

22/07/28 11:24:22 WARN Instrumentation: [86a35f35] regParam is zero, which might cause numerical instability and overfitting.
22/07/28 11:24:23 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [79]:
lr_model.coefficients

DenseVector([-1.9775, -0.2339, 0.0621, -0.1581])

In [80]:
lr_model.intercept

454.6092744523414

In [84]:
lr_model.summary.rootMeanSquaredError

4.557126016749488

In [85]:
lr_model.save('lr1.model')

                                                                                

## 2
Decision tree regression

In [87]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler

In [91]:
splits = vpp_df.randomSplit([0.7,0.3])
train_df = splits[0]
test_df = splits[1]

In [92]:
train_df.count()

6654

In [99]:
test_df.show(1)

+----+-----+-------+-----+-----+--------------------+
|  AT|    V|     AP|   RH|   PE|            features|
+----+-----+-------+-----+-----+--------------------+
|2.71|39.42|1026.66|81.11|489.3|[2.71,39.42,1026....|
+----+-----+-------+-----+-----+--------------------+
only showing top 1 row



In [94]:
dt = DecisionTreeRegressor(featuresCol='features', labelCol='PE')
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)

In [95]:
dt_evaluator = RegressionEvaluator(labelCol='PE', predictionCol='prediction', metricName='rmse')

In [96]:
rmse = dt_evaluator.evaluate(dt_predictions)

In [97]:
rmse

4.521986374989069

## 3
Gradient boosted tree regression 

In [98]:
from pyspark.ml.regression import GBTRegressor

In [103]:
gbt = GBTRegressor(featuresCol='features', labelCol='PE')
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)
gbt_evaluator = RegressionEvaluator(labelCol='PE', predictionCol='prediction', metricName='rmse')
gbt_rmse = gbt_evaluator.evaluate(gbt_predictions)

In [104]:
gbt_rmse

4.077615442165395