## LinkedIn Learning
### Spark for Machine Learning & AI
Apache Spark is one of the most widely used and supported open-source tools for machine learning and big data. In this course, discover how to work with this powerful platform for machine learning. Instructor Dan Sullivan discusses MLlib—the Spark machine learning library—which provides tools for data scientists and analysts who would rather find solutions to business problems than code, test, and maintain their own machine learning libraries. He shows how to use DataFrames to organize data structure, and he covers data preparation and the most commonly used types of machine learning algorithms: clustering, classification, regression, and recommendations. By the end of the course, you will have experience loading data into Spark, preprocessing data as needed to apply MLlib algorithms, and applying those algorithms to a variety of machine learning problems.

#### Learning Objectives
- Machine learning workflows
- Organizing data in DataFrames
- Preprocessing and data preparation steps for machine learning
- Clustering data
- Classification algorithms
- Regression methods available in Spark MLlib
- Common approaches to designing recommendation systems

In [None]:
# Uncomment the script below to install libraries to be utilized in this tutorial
#import sys
#!{sys.executable} -m pip install pyspark

In [1]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)

### Chapter 1: Introduction to Spark and MLlib
#### Organizing data in DataFrames

In [3]:
# Importing the file
emp_df = spark.read.csv("E:/LinkedIn Learning/Spark for Machine Learning & AI/Exercise Files/Ch01/01_04/employee.txt", 
                        header=True)

In [5]:
# Showing what a dataframe looks like - Headers
emp_df

DataFrame[id: string, last_name: string, email: string, gender: string, department: string, start_date: string, salary: string, job_title: string, region_id: string]

In [8]:
# Multiple ways of seeing the columns names
# Number 1
emp_df.schema

# Number 2
emp_df.printSchema()

# Number 3
emp_df.columns

root
 |-- id: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- department: string (nullable = true)
 |-- start_date: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- region_id: string (nullable = true)



['id',
 'last_name',
 'email',
 'gender',
 'department',
 'start_date',
 'salary',
 'job_title',
 'region_id']

In [9]:
# Counting the rows
emp_df.count()

1000

In [10]:
# Sampling for experiment (10%)
sample_df = emp_df.sample(False, 0.1)
sample_df.count()

105

In [12]:
# Finding salary of employees greater than and equal to 100000
emp_mgrs_df = emp_df.filter("salary >= 100000")
emp_mgrs_df.count()

478

In [13]:
# Viewing the top-20 
emp_mgrs_df.select("salary").show()

+------+
|salary|
+------+
|101768|
|118497|
|108657|
|108093|
|121966|
|141139|
|106659|
|148952|
|109890|
|115274|
|144724|
|126103|
|144965|
|113507|
|120579|
|107222|
|125668|
|113857|
|108378|
|133424|
+------+
only showing top 20 rows



### Chapter 2: Data Preparation and Transformation
#### Normalize Numeric Data

In [14]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors

In [15]:
features_df = spark.createDataFrame([
    (1, Vectors.dense([10.0,10000.0,1.0]),),
    (2, Vectors.dense([20.0,30000.0,2.0]),),
    (3, Vectors.dense([30.0,40000.0,3.0]),)], ["id","features"])

In [17]:
features_df.take(1)

[Row(id=1, features=DenseVector([10.0, 10000.0, 1.0]))]

In [18]:
feature_scaler = MinMaxScaler(inputCol = 'features', outputCol = 'sfeatures')

In [19]:
smodel = feature_scaler.fit(features_df)

In [20]:
sfeatures_df = smodel.transform(features_df)

In [21]:
sfeatures_df.take(1)

[Row(id=1, features=DenseVector([10.0, 10000.0, 1.0]), sfeatures=SparseVector(3, {}))]

In [22]:
sfeatures_df.select("features", "sfeatures").show()

+------------------+--------------------+
|          features|           sfeatures|
+------------------+--------------------+
|[10.0,10000.0,1.0]|           (3,[],[])|
|[20.0,30000.0,2.0]|[0.5,0.6666666666...|
|[30.0,40000.0,3.0]|       [1.0,1.0,1.0]|
+------------------+--------------------+



#### Standardize Numeric Data

In [23]:
from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import Vectors

In [24]:
features_df = spark.createDataFrame([
    (1, Vectors.dense([10.0,10000.0,1.0]),),
    (2, Vectors.dense([20.0,30000.0,2.0]),),
    (3, Vectors.dense([30.0,40000.0,3.0]),)], ["id","features"])

In [25]:
features_df.take(1)

[Row(id=1, features=DenseVector([10.0, 10000.0, 1.0]))]

In [27]:
feature_stand_scaler = StandardScaler(inputCol="features", outputCol="sfeatures", withStd = True, withMean = True)

In [28]:
stand_smodel = feature_stand_scaler.fit(features_df)

In [29]:
stand_sfeatures_df = stand_smodel.transform(features_df)

In [30]:
stand_sfeatures_df.take(1)

[Row(id=1, features=DenseVector([10.0, 10000.0, 1.0]), sfeatures=DenseVector([-1.0, -1.0911, -1.0]))]

In [31]:
stand_sfeatures_df.select("features", "sfeatures").show()

+------------------+--------------------+
|          features|           sfeatures|
+------------------+--------------------+
|[10.0,10000.0,1.0]|[-1.0,-1.09108945...|
|[20.0,30000.0,2.0]|[0.0,0.2182178902...|
|[30.0,40000.0,3.0]|[1.0,0.8728715609...|
+------------------+--------------------+



#### Bucketize Numeric Data

In [32]:
from pyspark.ml.feature import Bucketizer

In [33]:
splits = [-float("inf"), -10.0, 0.0, 10.0, float("inf")]

In [36]:
b_data = [(-800.0,),(-10.5,),(-1.7,),(0.0,),(8.2,),(90.1,)]

In [37]:
b_df = spark.createDataFrame(b_data, ["features"])

In [38]:
b_df.show()

+--------+
|features|
+--------+
|  -800.0|
|   -10.5|
|    -1.7|
|     0.0|
|     8.2|
|    90.1|
+--------+



In [39]:
bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bfeatures")

In [40]:
bucketed_df = bucketizer.transform(b_df)

In [41]:
bucketed_df.show()

+--------+---------+
|features|bfeatures|
+--------+---------+
|  -800.0|      0.0|
|   -10.5|      0.0|
|    -1.7|      1.0|
|     0.0|      2.0|
|     8.2|      2.0|
|    90.1|      3.0|
+--------+---------+



#### Tokenize Text Data

In [42]:
from pyspark.ml.feature import Tokenizer

In [43]:
sentences_df = spark.createDataFrame([
    (1, "This is an introduction to Spark MLlib"),
    (2, "MLlib includes the libraries for classification and regression"),
    (3, "It also contains supporting tools for pipelines")
], ["id", "sentences"])

In [44]:
sentences_df.show()

+---+--------------------+
| id|           sentences|
+---+--------------------+
|  1|This is an introd...|
|  2|MLlib includes th...|
|  3|It also contains ...|
+---+--------------------+



In [45]:
sent_token = Tokenizer(inputCol="sentences", outputCol="words")

In [46]:
sent_tokenized_df = sent_token.transform(sentences_df)

In [47]:
sent_tokenized_df.show()

+---+--------------------+--------------------+
| id|           sentences|               words|
+---+--------------------+--------------------+
|  1|This is an introd...|[this, is, an, in...|
|  2|MLlib includes th...|[mllib, includes,...|
|  3|It also contains ...|[it, also, contai...|
+---+--------------------+--------------------+



#### TF-IDF

In [48]:
from pyspark.ml.feature import HashingTF, IDF

In [49]:
sentences_df

DataFrame[id: bigint, sentences: string]

In [50]:
sentences_df.take(1)

[Row(id=1, sentences='This is an introduction to Spark MLlib')]

In [51]:
sent_tokenized_df.take(1)

[Row(id=1, sentences='This is an introduction to Spark MLlib', words=['this', 'is', 'an', 'introduction', 'to', 'spark', 'mllib'])]

In [55]:
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures",numFeatures=20)

In [56]:
sent_hfTF_df = hashingTF.transform(sent_tokenized_df)

In [57]:
sent_hfTF_df.take(1)

[Row(id=1, sentences='This is an introduction to Spark MLlib', words=['this', 'is', 'an', 'introduction', 'to', 'spark', 'mllib'], rawFeatures=SparseVector(20, {6: 2.0, 8: 1.0, 9: 1.0, 10: 1.0, 13: 1.0, 15: 1.0}))]

In [58]:
idf = IDF(inputCol="rawFeatures", outputCol="idf_features")

In [59]:
idfModel = idf.fit(sent_hfTF_df)

In [60]:
tf_idf_df = idfModel.transform(sent_hfTF_df)

In [61]:
tf_idf_df.take(1)

[Row(id=1, sentences='This is an introduction to Spark MLlib', words=['this', 'is', 'an', 'introduction', 'to', 'spark', 'mllib'], rawFeatures=SparseVector(20, {6: 2.0, 8: 1.0, 9: 1.0, 10: 1.0, 13: 1.0, 15: 1.0}), idf_features=SparseVector(20, {6: 0.5754, 8: 0.2877, 9: 0.6931, 10: 0.6931, 13: 0.6931, 15: 0.2877}))]

### Chapter 3: Clustering
#### K-means Clustering

In [2]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

In [3]:
cluster_df = spark.read.csv("E:/LinkedIn Learning/Spark for Machine Learning & AI/Exercise Files/Ch03/03_02/clustering_dataset.csv", 
                        header=True,inferSchema = True)

In [4]:
cluster_df

DataFrame[col1: int, col2: int, col3: int]

In [5]:
cluster_df.show()

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   7|   4|   1|
|   7|   7|   9|
|   7|   9|   6|
|   1|   6|   5|
|   6|   7|   7|
|   7|   9|   4|
|   7|  10|   6|
|   7|   8|   2|
|   8|   3|   8|
|   4|  10|   5|
|   7|   4|   5|
|   7|   8|   4|
|   2|   5|   1|
|   2|   6|   2|
|   2|   3|   8|
|   3|   9|   1|
|   4|   2|   9|
|   1|   7|   1|
|   6|   2|   3|
|   4|   1|   9|
+----+----+----+
only showing top 20 rows



In [6]:
cluster_df.show(75)

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   7|   4|   1|
|   7|   7|   9|
|   7|   9|   6|
|   1|   6|   5|
|   6|   7|   7|
|   7|   9|   4|
|   7|  10|   6|
|   7|   8|   2|
|   8|   3|   8|
|   4|  10|   5|
|   7|   4|   5|
|   7|   8|   4|
|   2|   5|   1|
|   2|   6|   2|
|   2|   3|   8|
|   3|   9|   1|
|   4|   2|   9|
|   1|   7|   1|
|   6|   2|   3|
|   4|   1|   9|
|   4|   8|   5|
|   6|   6|   7|
|   4|   6|   2|
|   8|   1|   1|
|   7|   5|  10|
|  17|  25|  21|
|  15|  23|  32|
|  42|  25|  45|
|  41|  47|  21|
|  37|  20|  27|
|  40|  18|  26|
|  41|  28|  50|
|  32|  25|  40|
|  24|  29|  35|
|  47|  18|  47|
|  36|  42|  45|
|  49|  29|  15|
|  47|  39|  22|
|  38|  27|  25|
|  45|  23|  40|
|  23|  36|  19|
|  47|  40|  50|
|  37|  30|  40|
|  42|  48|  41|
|  29|  31|  21|
|  36|  39|  48|
|  50|  24|  31|
|  42|  44|  37|
|  37|  39|  46|
|  22|  40|  30|
|  17|  29|  41|
|  85| 100|  69|
|  68|  76|  67|
|  76|  70|  93|
|  62|  66|  91|
|  83|  93|  7

In [7]:
vectorAssembler = VectorAssembler(inputCols=["col1","col2","col3"], outputCol="features")
vcluster_df = vectorAssembler.transform(cluster_df)

In [9]:
vcluster_df.show()

+----+----+----+--------------+
|col1|col2|col3|      features|
+----+----+----+--------------+
|   7|   4|   1| [7.0,4.0,1.0]|
|   7|   7|   9| [7.0,7.0,9.0]|
|   7|   9|   6| [7.0,9.0,6.0]|
|   1|   6|   5| [1.0,6.0,5.0]|
|   6|   7|   7| [6.0,7.0,7.0]|
|   7|   9|   4| [7.0,9.0,4.0]|
|   7|  10|   6|[7.0,10.0,6.0]|
|   7|   8|   2| [7.0,8.0,2.0]|
|   8|   3|   8| [8.0,3.0,8.0]|
|   4|  10|   5|[4.0,10.0,5.0]|
|   7|   4|   5| [7.0,4.0,5.0]|
|   7|   8|   4| [7.0,8.0,4.0]|
|   2|   5|   1| [2.0,5.0,1.0]|
|   2|   6|   2| [2.0,6.0,2.0]|
|   2|   3|   8| [2.0,3.0,8.0]|
|   3|   9|   1| [3.0,9.0,1.0]|
|   4|   2|   9| [4.0,2.0,9.0]|
|   1|   7|   1| [1.0,7.0,1.0]|
|   6|   2|   3| [6.0,2.0,3.0]|
|   4|   1|   9| [4.0,1.0,9.0]|
+----+----+----+--------------+
only showing top 20 rows



In [10]:
kmeans = KMeans().setK(3)

In [11]:
kmeans = kmeans.setSeed(1)

In [12]:
kmodel = kmeans.fit(vcluster_df)

In [13]:
centers = kmodel.clusterCenters()

In [14]:
centers

[array([35.88461538, 31.46153846, 34.42307692]),
 array([80.        , 79.20833333, 78.29166667]),
 array([5.12, 5.84, 4.84])]

#### Hierarchical Clustering

In [15]:
vcluster_df.show()

+----+----+----+--------------+
|col1|col2|col3|      features|
+----+----+----+--------------+
|   7|   4|   1| [7.0,4.0,1.0]|
|   7|   7|   9| [7.0,7.0,9.0]|
|   7|   9|   6| [7.0,9.0,6.0]|
|   1|   6|   5| [1.0,6.0,5.0]|
|   6|   7|   7| [6.0,7.0,7.0]|
|   7|   9|   4| [7.0,9.0,4.0]|
|   7|  10|   6|[7.0,10.0,6.0]|
|   7|   8|   2| [7.0,8.0,2.0]|
|   8|   3|   8| [8.0,3.0,8.0]|
|   4|  10|   5|[4.0,10.0,5.0]|
|   7|   4|   5| [7.0,4.0,5.0]|
|   7|   8|   4| [7.0,8.0,4.0]|
|   2|   5|   1| [2.0,5.0,1.0]|
|   2|   6|   2| [2.0,6.0,2.0]|
|   2|   3|   8| [2.0,3.0,8.0]|
|   3|   9|   1| [3.0,9.0,1.0]|
|   4|   2|   9| [4.0,2.0,9.0]|
|   1|   7|   1| [1.0,7.0,1.0]|
|   6|   2|   3| [6.0,2.0,3.0]|
|   4|   1|   9| [4.0,1.0,9.0]|
+----+----+----+--------------+
only showing top 20 rows



In [16]:
from pyspark.ml.clustering import BisectingKMeans

In [17]:
bkmeans = BisectingKMeans().setK(3)

In [18]:
bkmeans = bkmeans.setSeed(1)

In [19]:
bkmodel = bkmeans.fit(vcluster_df)

In [20]:
bkcenters = bkmodel.clusterCenters()

In [22]:
bkcenters

[array([5.12, 5.84, 4.84]),
 array([35.88461538, 31.46153846, 34.42307692]),
 array([80.        , 79.20833333, 78.29166667])]

In [23]:
centers

[array([35.88461538, 31.46153846, 34.42307692]),
 array([80.        , 79.20833333, 78.29166667]),
 array([5.12, 5.84, 4.84])]

### Chapter 4: Classification
#### Preprocessing the Iris Data-Set

In [24]:
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer

In [52]:
iris_df = spark.read.csv("E:/LinkedIn Learning/Spark for Machine Learning & AI/iris.txt", inferSchema=True)

In [53]:
iris_df.take(1)

[Row(_c0=5.1, _c1=3.5, _c2=1.4, _c3=0.2, _c4='Iris-setosa')]

In [54]:
iris_df = iris_df.select(col("_c0").alias("sepal_length"),
                        col("_c1").alias("sepal_width"),
                        col("_c2").alias("petal_length"),
                        col("_c3").alias("petal_width"),
                        col("_c4").alias("species"))

In [55]:
vector_assembler = VectorAssembler(inputCols=["sepal_length", "sepal_width", "petal_length", "petal_width"], 
                                   outputCol="features")

In [57]:
viris_df = vector_assembler.transform(iris_df)

In [58]:
viris_df.take(1)

[Row(sepal_length=5.1, sepal_width=3.5, petal_length=1.4, petal_width=0.2, species='Iris-setosa', features=DenseVector([5.1, 3.5, 1.4, 0.2]))]

In [59]:
indexer = StringIndexer(inputCol="species", outputCol="label")

In [60]:
iviris_df = indexer.fit(viris_df).transform(viris_df)

In [61]:
iviris_df.show()

+------------+-----------+------------+-----------+-----------+-----------------+-----+
|sepal_length|sepal_width|petal_length|petal_width|    species|         features|label|
+------------+-----------+------------+-----------+-----------+-----------------+-----+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|[5.1,3.5,1.4,0.2]|  0.0|
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|[4.9,3.0,1.4,0.2]|  0.0|
|         4.7|        3.2|         1.3|        0.2|Iris-setosa|[4.7,3.2,1.3,0.2]|  0.0|
|         4.6|        3.1|         1.5|        0.2|Iris-setosa|[4.6,3.1,1.5,0.2]|  0.0|
|         5.0|        3.6|         1.4|        0.2|Iris-setosa|[5.0,3.6,1.4,0.2]|  0.0|
|         5.4|        3.9|         1.7|        0.4|Iris-setosa|[5.4,3.9,1.7,0.4]|  0.0|
|         4.6|        3.4|         1.4|        0.3|Iris-setosa|[4.6,3.4,1.4,0.3]|  0.0|
|         5.0|        3.4|         1.5|        0.2|Iris-setosa|[5.0,3.4,1.5,0.2]|  0.0|
|         4.4|        2.9|      

#### Naive Bayes Classification

In [62]:
iviris_df

DataFrame[sepal_length: double, sepal_width: double, petal_length: double, petal_width: double, species: string, features: vector, label: double]

In [63]:
iviris_df.take(1)

[Row(sepal_length=5.1, sepal_width=3.5, petal_length=1.4, petal_width=0.2, species='Iris-setosa', features=DenseVector([5.1, 3.5, 1.4, 0.2]), label=0.0)]

In [64]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [65]:
splits = iviris_df.randomSplit([0.6,0.4],1)

In [66]:
train_df = splits[0]

In [67]:
test_df = splits[1]

In [68]:
train_df.count()

98

In [69]:
test_df.count()

52

In [70]:
nb = NaiveBayes(modelType="multinomial")

In [71]:
nbmodel = nb.fit(train_df)

In [72]:
predictions_df = nbmodel.transform(test_df)

In [73]:
predictions_df.take(1)

[Row(sepal_length=4.3, sepal_width=3.0, petal_length=1.1, petal_width=0.1, species='Iris-setosa', features=DenseVector([4.3, 3.0, 1.1, 0.1]), label=0.0, rawPrediction=DenseVector([-9.9894, -11.3476, -11.902]), probability=DenseVector([0.7118, 0.183, 0.1051]), prediction=0.0)]

In [75]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol = "prediction", metricName="accuracy")

In [76]:
nbaccuracy = evaluator.evaluate(predictions_df)

In [77]:
nbaccuracy

0.9807692307692307

#### Multi-Layer Perceptron

In [88]:
iviris_df

DataFrame[sepal_length: double, sepal_width: double, petal_length: double, petal_width: double, species: string, features: vector, label: double]

In [89]:
iviris_df.take(1)

[Row(sepal_length=5.1, sepal_width=3.5, petal_length=1.4, petal_width=0.2, species='Iris-setosa', features=DenseVector([5.1, 3.5, 1.4, 0.2]), label=0.0)]

In [90]:
train_df.count()

98

In [91]:
test_df.count()

52

In [92]:
from pyspark.ml.classification import MultilayerPerceptronClassifier

In [93]:
layers = [4,5,5,3]

In [94]:
mlp = MultilayerPerceptronClassifier(layers=layers, seed=1)

In [95]:
mlp_model = mlp.fit(train_df)

In [98]:
mlp_predictions = mlp_model.transform(test_df)

Py4JJavaError: An error occurred while calling o572.getParam.
: java.util.NoSuchElementException: Param blockSize does not exist.
	at org.apache.spark.ml.param.Params.$anonfun$getParam$2(params.scala:729)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.ml.param.Params.getParam(params.scala:729)
	at org.apache.spark.ml.param.Params.getParam$(params.scala:727)
	at org.apache.spark.ml.PipelineStage.getParam(Pipeline.scala:43)
	at jdk.internal.reflect.GeneratedMethodAccessor81.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:567)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:830)


In [None]:
mlp_evaluator = MulticlassClassificationEvaluator(metricName = "accuracy")
mlp_accuracy = mlp_evaluator.evaluate(mlp_predictions)
mlp_accuracy

#### Decision Trees Classification

In [99]:
iviris_df

DataFrame[sepal_length: double, sepal_width: double, petal_length: double, petal_width: double, species: string, features: vector, label: double]

In [100]:
iviris_df.take(1)

[Row(sepal_length=5.1, sepal_width=3.5, petal_length=1.4, petal_width=0.2, species='Iris-setosa', features=DenseVector([5.1, 3.5, 1.4, 0.2]), label=0.0)]

In [101]:
from pyspark.ml.classification import DecisionTreeClassifier

In [102]:
dt = DecisionTreeClassifier(labelCol= "label", featuresCol="features")

In [103]:
dt_model = dt.fit(train_df)

In [104]:
dt_predictions = dt_model.transform(test_df)

In [107]:
dt_evaluator = MulticlassClassificationEvaluator(labelCol = "label", predictionCol="prediction",metricName = "accuracy")

In [109]:
dt_accuracy = dt_evaluator.evaluate(dt_predictions)

In [110]:
dt_accuracy

0.9423076923076923

### Chapter 5: Regression
#### Preprocessing Regression Data

In [111]:
from pyspark.ml.regression import LinearRegression

In [115]:
pp_df = spark.read.csv("E:/LinkedIn Learning/Spark for Machine Learning & AI/CCPP/power_plant.csv", header=True,
                   inferSchema=True)

In [116]:
pp_df

DataFrame[AT: double, V: double, AP: double, RH: double, PE: double]

In [117]:
from pyspark.ml.feature import VectorAssembler

In [120]:
vectorAssembler = VectorAssembler(inputCols = ["AT","V","AP","RH"],
                                 outputCol = "features")

In [121]:
vpp_df = vectorAssembler.transform(pp_df)

In [122]:
vpp_df.take(1)

[Row(AT=14.96, V=41.76, AP=1024.07, RH=73.17, PE=463.26, features=DenseVector([14.96, 41.76, 1024.07, 73.17]))]

In [123]:
lr = LinearRegression(featuresCol = "features", labelCol="PE")

In [124]:
lr_model = lr.fit(vpp_df)

Exception ignored in: <object repr() failed>
Traceback (most recent call last):
  File "C:\Users\alina\Anaconda3\envs\myenv\lib\site-packages\pyspark\ml\wrapper.py", line 42, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'VectorAssembler' object has no attribute '_java_obj'


In [125]:
lr_model.coefficients

DenseVector([-1.9775, -0.2339, 0.0621, -0.1581])

In [126]:
lr_model.intercept

454.6092744523414

In [127]:
lr_model.summary.rootMeanSquaredError

4.557126016749488

In [128]:
lr_model.save("E:/LinkedIn Learning/Spark for Machine Learning & AI/CCPP/lr1.model")

#### Decision Tree Regression

In [130]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler

In [131]:
pp_df = spark.read.csv("E:/LinkedIn Learning/Spark for Machine Learning & AI/CCPP/power_plant.csv", header=True,
                   inferSchema=True)

In [132]:
pp_df.take(1)

[Row(AT=14.96, V=41.76, AP=1024.07, RH=73.17, PE=463.26)]

In [133]:
vectorAssembler = VectorAssembler(inputCols = ["AT","V","AP","RH"],
                                 outputCol = "features")

In [134]:
vpp_df = vectorAssembler.transform(pp_df)

In [135]:
vpp_df.take(1)

[Row(AT=14.96, V=41.76, AP=1024.07, RH=73.17, PE=463.26, features=DenseVector([14.96, 41.76, 1024.07, 73.17]))]

In [136]:
splits = vpp_df.randomSplit([0.70,0.30])

In [137]:
train_df = splits[0]

In [138]:
test_df = splits[1]

In [139]:
train_df.count()

6656

In [140]:
test_df.count()

2912

In [141]:
dt = DecisionTreeRegressor(featuresCol = "features",
                          labelCol = "PE")

In [142]:
dt_model = dt.fit(train_df)

In [143]:
dt_predictions = dt_model.transform(test_df)

In [144]:
dt_evaluator = RegressionEvaluator(labelCol = "PE",
                                  predictionCol = "prediction",
                                  metricName = "rmse")

In [145]:
rmse = dt_evaluator.evaluate(dt_predictions)

In [146]:
rmse

4.569008006634209

#### Gradient-Boosted Tree Regression

In [147]:
from pyspark.ml.regression import GBTRegressor

In [148]:
gbt = GBTRegressor(featuresCol = "features",
                  labelCol = "PE")

In [149]:
gbt_model = gbt.fit(train_df)

In [150]:
gbt_predictions = gbt_model.transform(test_df)

In [151]:
gbt_evaluator = RegressionEvaluator(labelCol = "PE",
                                  predictionCol = "prediction",
                                  metricName = "rmse")

In [152]:
gbt_rmse = gbt_evaluator.evaluate(gbt_predictions)

In [153]:
gbt_rmse

4.102561053094981

### Chapter 6: Recommendations
#### Collaborative Filtering


In [154]:
from pyspark.ml.recommendation import ALS