In [1]:
#!java -version

- wget https://dlcdn.apache.org/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
- tar xf spark-*
- sudo mv spark-3.2.0-bin-hadoop3.2 /opt/spark

In [1]:
#!pip install findspark


## There are two solutions

 copy downloaded spark folder in somewhere in C directory and give the link as below

- import findspark
- findspark.init('C:/spark')
- use the function of findspark to find automatically the spark folder
#### 

- import findspark
- findspark.find()

https://stackoverflow.com/questions/38411914/the-spark-home-env-variable-is-set-but-jupyter-notebook-doesnt-see-it-windows?rq=1

In [1]:
import findspark
findspark.init('spark-3.2.0-bin-hadoop3.2')

import pyspark
import random

""" Testing
If I have spark and it works properly
"""
sc = pyspark.SparkContext(appName="Pi")
num_samples = 100000000

def inside(p):     
  x, y = random.random(), random.random()
  return x*x + y*y < 1

count = sc.parallelize(range(0, num_samples)).filter(inside).count()

pi = 4 * count / num_samples
print(pi)

sc.stop()

3.14180084


In [2]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)

# https://stackoverflow.com/questions/39541204/pyspark-nameerror-name-spark-is-not-defined

# Data Preparation and Transformation

- ### Numeric
    - MinMaxScaler
    - StandardScaler
    - Bucketizer
- ### Text
    - Tokenizer
    - HashingTF

### Normalize Numeric Data

In [4]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors

In [5]:
features_df = spark.createDataFrame([
    (1, Vectors.dense([10.0, 10000.0, 1.0]),),
    (2, Vectors.dense([20.0, 30000.0, 2.0]),),
    (3, Vectors.dense([30.0, 40000.0, 3.0]))
], ["id","features"])

In [6]:
features_df.take(1)

[Row(id=1, features=DenseVector([10.0, 10000.0, 1.0]))]

In [7]:
feature_scaler = MinMaxScaler(inputCol = "features", outputCol = "sfeatures")
smodel = feature_scaler.fit(features_df)
sfeatures_df = smodel.transform(features_df)

In [8]:
sfeatures_df.take(1)

[Row(id=1, features=DenseVector([10.0, 10000.0, 1.0]), sfeatures=SparseVector(3, {}))]

In [9]:
sfeatures_df.take(3)

[Row(id=1, features=DenseVector([10.0, 10000.0, 1.0]), sfeatures=SparseVector(3, {})),
 Row(id=2, features=DenseVector([20.0, 30000.0, 2.0]), sfeatures=DenseVector([0.5, 0.6667, 0.5])),
 Row(id=3, features=DenseVector([30.0, 40000.0, 3.0]), sfeatures=DenseVector([1.0, 1.0, 1.0]))]

In [10]:
sfeatures_df.select("features", "sfeatures").show()

+------------------+--------------------+
|          features|           sfeatures|
+------------------+--------------------+
|[10.0,10000.0,1.0]|           (3,[],[])|
|[20.0,30000.0,2.0]|[0.5,0.6666666666...|
|[30.0,40000.0,3.0]|       [1.0,1.0,1.0]|
+------------------+--------------------+



### Standardize numeric data

In [11]:
from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import Vectors

In [12]:
features_df = spark.createDataFrame([
    (1, Vectors.dense([10.0, 10000.0, 1.0]),),
    (2, Vectors.dense([20.0, 30000.0, 2.0]),),
    (3, Vectors.dense([30.0, 40000.0, 3.0]))
], ["id","features"])

In [13]:
features_df.take(1)

[Row(id=1, features=DenseVector([10.0, 10000.0, 1.0]))]

In [14]:
feature_stand_scaler = StandardScaler(inputCol = "features", outputCol = "sfeatures", withStd = True, withMean = True)
stand_smodel = feature_stand_scaler.fit(features_df)
stand_sfeatures_df = stand_smodel.transform(features_df)

In [15]:
stand_sfeatures_df.take(1)

[Row(id=1, features=DenseVector([10.0, 10000.0, 1.0]), sfeatures=DenseVector([-1.0, -1.0911, -1.0]))]

In [16]:
stand_sfeatures_df.take(3)

[Row(id=1, features=DenseVector([10.0, 10000.0, 1.0]), sfeatures=DenseVector([-1.0, -1.0911, -1.0])),
 Row(id=2, features=DenseVector([20.0, 30000.0, 2.0]), sfeatures=DenseVector([0.0, 0.2182, 0.0])),
 Row(id=3, features=DenseVector([30.0, 40000.0, 3.0]), sfeatures=DenseVector([1.0, 0.8729, 1.0]))]

In [17]:
stand_sfeatures_df.show()

+---+------------------+--------------------+
| id|          features|           sfeatures|
+---+------------------+--------------------+
|  1|[10.0,10000.0,1.0]|[-1.0,-1.09108945...|
|  2|[20.0,30000.0,2.0]|[0.0,0.2182178902...|
|  3|[30.0,40000.0,3.0]|[1.0,0.8728715609...|
+---+------------------+--------------------+



### Bucketize Numeric Data

In [18]:
from pyspark.ml.feature import Bucketizer # allows me to group data based on boundaries
splits = [-float("inf"), -10.0, 0.0, 10.0, float("inf")]

In [20]:
b_data = [(-800.0,), (-10.5,), (-1.7,), (0.0,), (8.2,), (90.1,)]
b_df = spark.createDataFrame(b_data, ["features"])
b_df.show()

+--------+
|features|
+--------+
|  -800.0|
|   -10.5|
|    -1.7|
|     0.0|
|     8.2|
|    90.1|
+--------+



In [21]:
bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bfeatures")
bucketed_df = bucketizer.transform(b_df)
bucketed_df.show()

+--------+---------+
|features|bfeatures|
+--------+---------+
|  -800.0|      0.0|
|   -10.5|      0.0|
|    -1.7|      1.0|
|     0.0|      2.0|
|     8.2|      2.0|
|    90.1|      3.0|
+--------+---------+



### Tokenize text data

In [32]:
from pyspark.ml.feature import Tokenizer


In [22]:
sentences_df = spark.createDataFrame([(1, "This is an introduction to Spark MLlib"),
                                     (2, "MLlib includes libraries for classification and regression"),
                                     (3, "It also contains supporting tools for pipelines")],
                                     ["id", "sentence"])

In [23]:
sentences_df.show()

+---+--------------------+
| id|            sentence|
+---+--------------------+
|  1|This is an introd...|
|  2|MLlib includes li...|
|  3|It also contains ...|
+---+--------------------+



In [24]:
sent_token = Tokenizer(inputCol = "sentence", outputCol = "words")
sent_tokenized_df = sent_token.transform(sentences_df)
sent_tokenized_df.show()

+---+--------------------+--------------------+
| id|            sentence|               words|
+---+--------------------+--------------------+
|  1|This is an introd...|[this, is, an, in...|
|  2|MLlib includes li...|[mllib, includes,...|
|  3|It also contains ...|[it, also, contai...|
+---+--------------------+--------------------+



### TF - IDF

In [31]:
from pyspark.ml.feature import HashingTF, IDF

In [33]:
sentences_df

DataFrame[id: bigint, sentence: string]

In [34]:
sentences_df.take(1)

[Row(id=1, sentence='This is an introduction to Spark MLlib')]

In [35]:
sent_tokenized_df.take(1)

[Row(id=1, sentence='This is an introduction to Spark MLlib', words=['this', 'is', 'an', 'introduction', 'to', 'spark', 'mllib'])]

In [36]:
hashingTF = HashingTF(inputCol = "words", outputCol = "rawFeatures", numFeatures=20)
sent_hfTf_df = hashingTF.transform(sent_tokenized_df)
sent_hfTf_df.take(1)

[Row(id=1, sentence='This is an introduction to Spark MLlib', words=['this', 'is', 'an', 'introduction', 'to', 'spark', 'mllib'], rawFeatures=SparseVector(20, {6: 2.0, 8: 1.0, 9: 1.0, 10: 1.0, 13: 1.0, 15: 1.0}))]

In [38]:
idf = IDF(inputCol = "rawFeatures", outputCol = "idf_Features")
idfModel = idf.fit(sent_hfTf_df)
tfidf_df = idfModel.transform(sent_hfTf_df)
tfidf_df.take(1)

[Row(id=1, sentence='This is an introduction to Spark MLlib', words=['this', 'is', 'an', 'introduction', 'to', 'spark', 'mllib'], rawFeatures=SparseVector(20, {6: 2.0, 8: 1.0, 9: 1.0, 10: 1.0, 13: 1.0, 15: 1.0}), idf_Features=SparseVector(20, {6: 0.5754, 8: 0.2877, 9: 0.6931, 10: 0.6931, 13: 0.6931, 15: 0.2877}))]

# Clustering

- ### K-means = useful for data exploration of small and mid-sized data sets 
- ### Hierarchical clustering - useful for clustering large data sets
- ### Different algorithms may find different cluster centers

In [2]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

In [11]:
cluster_df = spark.read.csv("Exercise\ Files/Ch03/03_02", header=True, inferSchema=True)
cluster_df


DataFrame[col1: int, col2: int, col3: int]

In [13]:
cluster_df.show()

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   7|   4|   1|
|   7|   7|   9|
|   7|   9|   6|
|   1|   6|   5|
|   6|   7|   7|
|   7|   9|   4|
|   7|  10|   6|
|   7|   8|   2|
|   8|   3|   8|
|   4|  10|   5|
|   7|   4|   5|
|   7|   8|   4|
|   2|   5|   1|
|   2|   6|   2|
|   2|   3|   8|
|   3|   9|   1|
|   4|   2|   9|
|   1|   7|   1|
|   6|   2|   3|
|   4|   1|   9|
+----+----+----+
only showing top 20 rows



In [15]:
# cluster_df.show(75) # There are 3 clusters in these you may observe them comparing the numbers.

In [18]:
vectorAssembler = VectorAssembler(inputCols = ["col1", "col2", "col3"], outputCol = "features")
vcluster_df = vectorAssembler.transform(cluster_df)
vcluster_df.show()

+----+----+----+--------------+
|col1|col2|col3|      features|
+----+----+----+--------------+
|   7|   4|   1| [7.0,4.0,1.0]|
|   7|   7|   9| [7.0,7.0,9.0]|
|   7|   9|   6| [7.0,9.0,6.0]|
|   1|   6|   5| [1.0,6.0,5.0]|
|   6|   7|   7| [6.0,7.0,7.0]|
|   7|   9|   4| [7.0,9.0,4.0]|
|   7|  10|   6|[7.0,10.0,6.0]|
|   7|   8|   2| [7.0,8.0,2.0]|
|   8|   3|   8| [8.0,3.0,8.0]|
|   4|  10|   5|[4.0,10.0,5.0]|
|   7|   4|   5| [7.0,4.0,5.0]|
|   7|   8|   4| [7.0,8.0,4.0]|
|   2|   5|   1| [2.0,5.0,1.0]|
|   2|   6|   2| [2.0,6.0,2.0]|
|   2|   3|   8| [2.0,3.0,8.0]|
|   3|   9|   1| [3.0,9.0,1.0]|
|   4|   2|   9| [4.0,2.0,9.0]|
|   1|   7|   1| [1.0,7.0,1.0]|
|   6|   2|   3| [6.0,2.0,3.0]|
|   4|   1|   9| [4.0,1.0,9.0]|
+----+----+----+--------------+
only showing top 20 rows



In [20]:
kmeans = KMeans().setK(3)
kmeans = kmeans.setSeed(1)
kmodel = kmeans.fit(vcluster_df)

In [21]:
centers = kmodel.clusterCenters()
centers # it shows mid , last and first clusters. You may observe them below by their range.

[array([35.88461538, 31.46153846, 34.42307692]),
 array([80.        , 79.20833333, 78.29166667]),
 array([5.12, 5.84, 4.84])]

In [23]:
# vcluster_df.show() # to remember again

In [24]:
from pyspark.ml.clustering import BisectingKMeans
bkmeans = BisectingKMeans().setK(3)
bkmeans = bkmeans.setSeed(1)

In [25]:
bkmodel = bkmeans.fit(vcluster_df)
bkcenters = bkmodel.clusterCenters()
bkcenters #it appeared the same values with 'centers'; however, it doesn't has to.

[array([5.12, 5.84, 4.84]),
 array([35.88461538, 31.46153846, 34.42307692]),
 array([80.        , 79.20833333, 78.29166667])]

# Classification

- ### Naive Bayes
- ### Multilayer Perceptron
- ### Decision Trees

In [27]:
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer

In [28]:
iris_df = spark.read.csv("Exercise\ Files/Ch04/iris.data", inferSchema=True)
iris_df.take(1)

[Row(_c0=5.1, _c1=3.5, _c2=1.4, _c3=0.2, _c4='Iris-setosa')]

In [29]:
iris_df = iris_df.select(col("_c0").alias("sepal_length"),
                        col("_c1").alias("sepal_width"),
                        col("_c2").alias("petal_length"),
                        col("_c3").alias("petal_width"),
                        col("_c4").alias("species"))
iris_df.take(1)

[Row(sepal_length=5.1, sepal_width=3.5, petal_length=1.4, petal_width=0.2, species='Iris-setosa')]

In [31]:
vectorAssembler = VectorAssembler(inputCols=["sepal_length", "sepal_width", "petal_length", "petal_width"], outputCol = "features")
viris_df = vectorAssembler.transform(iris_df)
viris_df.take(1)

[Row(sepal_length=5.1, sepal_width=3.5, petal_length=1.4, petal_width=0.2, species='Iris-setosa', features=DenseVector([5.1, 3.5, 1.4, 0.2]))]

In [32]:
indexer = StringIndexer(inputCol = "species", outputCol ="label")
iviris_df = indexer.fit(viris_df).transform(viris_df)
iviris_df.show(1)

+------------+-----------+------------+-----------+-----------+-----------------+-----+
|sepal_length|sepal_width|petal_length|petal_width|    species|         features|label|
+------------+-----------+------------+-----------+-----------+-----------------+-----+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|[5.1,3.5,1.4,0.2]|  0.0|
+------------+-----------+------------+-----------+-----------+-----------------+-----+
only showing top 1 row



### Naive Bayes Classification

In [33]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [34]:
splits = iviris_df.randomSplit([0.6, 0.4], 1)
train_df = splits[0]
test_df = splits[1]

In [36]:
print(train_df.count())
print(test_df.count())
print(iviris_df.count())

98
52
150


In [38]:
nb = NaiveBayes(modelType="multinomial")
nbmodel = nb.fit(train_df)
predictions_df = nbmodel.transform(test_df)
predictions_df.take(1)

[Row(sepal_length=4.3, sepal_width=3.0, petal_length=1.1, petal_width=0.1, species='Iris-setosa', features=DenseVector([4.3, 3.0, 1.1, 0.1]), label=0.0, rawPrediction=DenseVector([-9.9894, -11.3476, -11.902]), probability=DenseVector([0.7118, 0.183, 0.1051]), prediction=0.0)]

In [45]:
evaluator = MulticlassClassificationEvaluator(labelCol = "label", predictionCol = "prediction", metricName="accuracy")
nbaccuracy = evaluator.evaluate(predictions_df)
nbaccuracy

0.9807692307692307

### Multilayer Perceptron Classification

In [48]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
layers = [4, 5, 5, 3]
mlp = MultilayerPerceptronClassifier(layers= layers, seed= 1)
mlp_model = mlp.fit(train_df)
mlp_predictions = mlp_model.transform(test_df)
mlp_evaluator = MulticlassClassificationEvaluator(metricName= "accuracy")
mlp_accuracy = mlp_evaluator.evaluate(mlp_predictions)
mlp_accuracy

0.6923076923076923

### Decision Trees Classification



In [49]:
from pyspark.ml.classification import DecisionTreeClassifier


In [50]:
dt = DecisionTreeClassifier(labelCol = "label", featuresCol = "features")
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
dt_accuracy = dt_evaluator.evaluate(dt_predictions)
dt_accuracy

0.9423076923076923

# Regression

- ### Linear Regression
- ### Decision Tree Regression
- ### Gradient-boosted Tree Regression

About Data #https://archive.ics.uci.edu/ml/machine-learning-databases/00294/
- Column AT = Ambient Temperature
- V = Vacuum
- AP = Ambient Pressure
- RH = Relative Humidity
- PE = Measure of how much power was generated

### Linear Regression

In [4]:
from pyspark.ml.regression import LinearRegression

pp_df = spark.read.csv("Exercise\ Files/power_plant.csv", header=True, inferSchema=True)
pp_df

DataFrame[AT: double, V: double, AP: double, RH: double, PE: double]

In [6]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols=["AT","V","AP","RH"], outputCol="features")
vpp_df = vectorAssembler.transform(pp_df)
vpp_df.take(1)

[Row(AT=14.96, V=41.76, AP=1024.07, RH=73.17, PE=463.26, features=DenseVector([14.96, 41.76, 1024.07, 73.17]))]

In [7]:
lr = LinearRegression(featuresCol = "features", labelCol = "PE")
lr_model = lr.fit(vpp_df)
lr_model.coefficients # list which correspond to the coefficients 
                    # of the different variables that we were 
                #using to build the model

DenseVector([-1.9775, -0.2339, 0.0621, -0.1581])

In [8]:
lr_model.intercept # that gives us a point where the line crosses the Y axis

454.6092744523414

In [None]:
# So basically, what we've done is we've fit a line to our data 
# and that's what linear regression

In [9]:
lr_model.summary.rootMeanSquaredError

4.557126016749488

In [11]:
lr_model.save("lr1.model") # saved model

### Decision Tree Regression

In [13]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler

In [16]:
# splitting our data into train and test sets
splits = vpp_df.randomSplit([0.7,0.3]) #70% and 30%
train_df = splits[0]
test_df = splits[1]
print(train_df.count())
print(test_df.count())
print(vpp_df.count())

6701
2867
9568


In [18]:
dt = DecisionTreeRegressor(featuresCol="features", labelCol="PE")
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_evaluator = RegressionEvaluator(labelCol="PE", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
rmse

4.509122685272387

### Gradient-boosted Tree Regression

In [20]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol="features", labelCol = "PE")
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)
gbt_evaluator = RegressionEvaluator(labelCol="PE", predictionCol="prediction", metricName="rmse" )
gbt_rmse = gbt_evaluator.evaluate(gbt_predictions)
gbt_rmse # it's a little better than both the decision tree and the linear regression

4.088762719816632