# 3 - Apache Spark ML- Create machine learning models

In this chapter, you will:

• Build your first ML model with Spark ML

• Learn more Spark functionality and how to use it

In [1]:
from pyspark.sql import SparkSession 

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("ApacheSparkML") \
    .getOrCreate()

df_train = spark.read.parquet("classified_train_data")
df_test = spark.read.parquet("classified_test_data")



In [2]:
df_train.limit(5) .toPandas ()

Unnamed: 0,screen_name,location,description,followers_count,friends_count,listed_count,favourites_count,verified,statuses_count,status,default_profile,name,bot
0,1,0,"[, @inky, BIG, DATA]",8,1,7,0,0,258,1,0,1,0
1,1,0,"[, LP;, intelligence, natural, Enterprises, mo...",328658,110,4381,3,1,319,1,0,1,0
2,1,0,"[, by, Created, http:tcoWKb4iyqL9H, Holidays, ...",43,1,19,0,0,3846,1,0,1,0
3,1,0,"[, by, Created, much, ‰Ï, hour, as, gossip, U...",7,1,8,0,0,5856,1,0,1,0
4,1,0,"[, curated, by, papers, twitterbot, #PubMed, f...",49,12,7,1,0,831,1,0,1,1


✅ Task :

### Frequent Patterns Growth algorithm - FPGrowth()

Your data is clean and organized. It's your turn to create your **first** ML model with Spark.

Run the next code; it runs the **Frequent Patterns Growth** algorithm to extract patterns if those exist.

Tweak the minSupport and minCondifence.


<details><summary>Hint</summary>
<p>

Change the minSupport and minCondifence to 0.1 and see what happens.

</p>
</details>



In [3]:
from pyspark.ml.fpm import FPGrowth

fpGrowth = FPGrowth(itemsCol="description", minSupport=0.09, minConfidence=0.09)
fpGrowth_model = fpGrowth.fit(df_train)

# Display frequent itemsets.
fpGrowth_model.freqItemsets.show()

# Display generated association rules.
fpGrowth_model.associationRules.show()

# transform examines the input items against all the association rules and summarize the
# consequents as prediction
fpGrowth_model.transform(df_train).toPandas().transpose()


+---------+----+
|    items|freq|
+---------+----+
|    [for]| 180|
|      [a]| 267|
|    [and]| 410|
|       []| 231|
|     [of]| 380|
|      [I]| 170|
|    [the]| 365|
|[the, of]| 166|
|    [bot]| 159|
|     [to]| 221|
|     [in]| 210|
|     [by]| 326|
+---------+----+

+----------+----------+------------------+------------------+
|antecedent|consequent|        confidence|              lift|
+----------+----------+------------------+------------------+
|      [of]|     [the]|0.4368421052631579|2.0884643114635906|
|     [the]|      [of]|0.4547945205479452|2.0884643114635906|
+----------+----------+------------------+------------------+



Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,1735,1736,1737,1738,1739,1740,1741,1742,1743,1744
screen_name,1,1,1,1,1,1,1,1,1,1,...,1,1,1,1,1,1,1,1,1,1
location,0,0,0,0,0,0,0,0,0,0,...,1,1,1,1,1,1,1,1,1,1
description,"[, @inky, BIG, DATA]","[, LP;, intelligence, natural, Enterprises, mo...","[, by, Created, http:tcoWKb4iyqL9H, Holidays, ...","[, by, Created, much, ‰Ï, hour, as, gossip, U...","[, curated, by, papers, twitterbot, #PubMed, f...","[, follow, _Ùªâ, @jalfiebutler, for, back, Fol...","[, mass, by, completely, against, fight, @thri...","[, that, for, TV, past, the, it, Tweeting, sta...","[, to, feedback, the, @beaugunderson, threedes...","[, what, for, but, Outlines, Producer, of, it,...",...,"[https:tcohf0W3M7i2S, event, Violet, 100, link:]","[lack, my, Opinions, books, Anyone, of, own, f...","[name, as, my, york, motors, is, timmy, aka, H...","[naughty, """"you, bad, """"""20, me, made, and]","[never, myself, depend, but, on, nobody]","[sometimes, behalf, company, not, tweets, usua...","[source, for, to, trends, those, about, the, e...","[that, affiliated, not, with, the, Museum, obj...","[throam, 30, a, 1, tweets, volume, the, in, lo...","[to, with, general, #Neurogenetics, #Parkinson..."
followers_count,8,328658,43,7,49,34,213,6,204,31,...,1878,348,20,,62647,899271,3,903,293,601
friends_count,1,110,1,1,12,787,0,0,1,0,...,262,302,75,1391,277,202,343,1,344,0
listed_count,7,4381,19,8,7,0,23,6,34,6,...,64,10,1,477,1197,3228,0,50,2,53
favourites_count,0,3,0,0,1,7,0,0,0,0,...,109,8413,59,,431,103,37,0,0,0
verified,0,1,0,0,0,0,0,0,0,0,...,0,0,0,0,1,1,0,0,0,0
statuses_count,258,319,3846,5856,831,2,5685,1066,7491,975,...,1718,2578,294,,11885,2204,41,3654,8638,24135
status,1,1,1,1,1,1,1,1,1,1,...,1,1,1,1,1,1,1,1,1,1


What did you get?



<details><summary>Check me after running the previous code multipale times</summary>
<p>

When tweaking the minSupport=0.21 and minConfidence=0.1

You get:

|items|freq|
|----|:----:|
|[and]| 389|


</p>
</details>

---

✅ **Task :**

### LinearRegression functionality

It's your turn to create your **second** ML model with Spark - Linear Regression.




<details><summary>What is linear regression used for?</summary>
<p>
Linear regression is a common Statistical Data Analysis technique. It is used to determine the extent to which there is a linear relationship between a dependent variable and one or more independent variables.
</p>
</details>


**But**, before jumping right into it, you should know:

Spark ML Linear Regression **input** is:
1. `label` of type Double - our classification
2. `features` of type - `Vector[Double]` - Vector of Double, turn all columns into one column named features.
Hence, you will transform all _numeric_ columns into one Vector.

Leave the `description` out as it is not relevant for your next task.
    
For creating `features` column we use Vector Assembler
``` python
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=["a", "b", "c"], outputCol="features",handleInvalid = "skip")
new_df = vecAssembler.transform(dataFrame)
new_df.show()

```
<details><summary><b>Did you know!</b></summary>
<p>

Vector has two types in Spark:
    
    1. Dense Vector
    2. Sparse Vector
    
_Sparse vector_ is when you have many values in the vector as zero or null.

_Dense vector_ is when most of the values in the vector are non zero or non-null.

</p>
</details>


🤔 **Question**

Have you noticed `handleInvalid` param?  
```python
handleInvalid = "skip"
```




What happens if you remove it?

Validate yourself with `vecAssembler.show()`

Notice! You have a new DataFrame now. 

Remember to check yourself and work with the new DataFrame

In [4]:
from pyspark.ml.feature import VectorAssembler

train = df_train.drop('description')
vecAssembler = VectorAssembler(inputCols=['screen_name','location','followers_count','friends_count','listed_count','favourites_count','verified','statuses_count','status','default_profile','name'], outputCol="features", handleInvalid = "skip")
df_train = vecAssembler.transform(train)
df_train.toPandas().transpose()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,1659,1660,1661,1662,1663,1664,1665,1666,1667,1668
screen_name,1,1,1,1,1,1,1,1,1,1,...,1,1,1,1,1,1,1,1,1,1
location,0,0,0,0,0,0,0,0,0,0,...,1,1,1,1,1,1,1,1,1,1
followers_count,8,328658,43,7,49,34,213,6,204,31,...,141730,1878,348,20,62647,899271,3,903,293,601
friends_count,1,110,1,1,12,787,0,0,1,0,...,582,262,302,75,277,202,343,1,344,0
listed_count,7,4381,19,8,7,0,23,6,34,6,...,2386,64,10,1,1197,3228,0,50,2,53
favourites_count,0,3,0,0,1,7,0,0,0,0,...,24,109,8413,59,431,103,37,0,0,0
verified,0,1,0,0,0,0,0,0,0,0,...,1,0,0,0,1,1,0,0,0,0
statuses_count,258,319,3846,5856,831,2,5685,1066,7491,975,...,1281,1718,2578,294,11885,2204,41,3654,8638,24135
status,1,1,1,1,1,1,1,1,1,1,...,1,1,1,1,1,1,1,1,1,1
default_profile,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,1,0,0,0


---


✅ **Task :** 

To run ML training phase on the scalar vector you need to create DataFrame out of it.

bot and features are the **only** columns that we care about.

Do it with drop function:
    
```python

output = df_train.drop("val1","val2")
```



In [5]:
output_train = df_train.drop('screen_name','location','followers_count','friends_count','listed_count','favourites_count','verified','statuses_count','status','default_profile','name')
output_train.show()

+---+--------------------+
|bot|            features|
+---+--------------------+
|  0|[1.0,0.0,8.0,1.0,...|
|  0|[1.0,0.0,328658.0...|
|  0|[1.0,0.0,43.0,1.0...|
|  0|[1.0,0.0,7.0,1.0,...|
|  1|[1.0,0.0,49.0,12....|
|  0|[1.0,0.0,34.0,787...|
|  0|(11,[0,2,4,7,8,10...|
|  0|(11,[0,2,4,7,8,10...|
|  0|[1.0,0.0,204.0,1....|
|  0|(11,[0,2,4,7,8,10...|
|  0|[1.0,0.0,2.329065...|
|  0|[1.0,0.0,52.0,1.0...|
|  0|[1.0,0.0,873.0,8....|
|  0|[1.0,0.0,48641.0,...|
|  0|[1.0,0.0,1825704....|
|  0|[1.0,0.0,9.632156...|
|  0|[1.0,0.0,620714.0...|
|  0|(11,[0,3,5,7,8,10...|
|  0|(11,[0,2,4,7,8,10...|
|  0|[1.0,0.0,358.0,50...|
+---+--------------------+
only showing top 20 rows



After turning _numeric_ columns into one `features` column and dropping `description`:

We got left with creating `label` column:


Create a new DataFrame with `label` column:
Code sample:
```python
df_for_lr  = output_train.selectExpr("features", "bot as label")
df_for_lr.show()

df_for_lr.toPandas().transpose()
```


Notice that now `df_for_lr` is your new DataFrame for creating `LinearRegression` model

In [6]:
df_for_lr  = output_train.selectExpr("features", "bot as label")
df_for_lr.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[1.0,0.0,8.0,1.0,...|    0|
|[1.0,0.0,328658.0...|    0|
|[1.0,0.0,43.0,1.0...|    0|
|[1.0,0.0,7.0,1.0,...|    0|
|[1.0,0.0,49.0,12....|    1|
|[1.0,0.0,34.0,787...|    0|
|(11,[0,2,4,7,8,10...|    0|
|(11,[0,2,4,7,8,10...|    0|
|[1.0,0.0,204.0,1....|    0|
|(11,[0,2,4,7,8,10...|    0|
|[1.0,0.0,2.329065...|    0|
|[1.0,0.0,52.0,1.0...|    0|
|[1.0,0.0,873.0,8....|    0|
|[1.0,0.0,48641.0,...|    0|
|[1.0,0.0,1825704....|    0|
|[1.0,0.0,9.632156...|    0|
|[1.0,0.0,620714.0...|    0|
|(11,[0,3,5,7,8,10...|    0|
|(11,[0,2,4,7,8,10...|    0|
|[1.0,0.0,358.0,50...|    0|
+--------------------+-----+
only showing top 20 rows



Run the next code.
Check out where you use the new DataFrame - `df_for_lr`

It creates a machine learning model out of linear regression.

Tweak the `maxIter`,`regParam` and `elasticNetParam` !

In [7]:
from pyspark.ml.regression import LinearRegression


lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(df_for_lr)

# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(lrModel.coefficients))
print("Intercept: %s" % str(lrModel.intercept))

# Summarize the model over the training set and print out some metrics
trainingSummary = lrModel.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

Coefficients: [0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]
Intercept: 0.12103055721989214
numIterations: 1
objectiveHistory: [0.49999999999999994]
+--------------------+
|           residuals|
+--------------------+
|-0.12103055721989214|
|-0.12103055721989214|
|-0.12103055721989214|
|-0.12103055721989214|
|  0.8789694427801078|
|-0.12103055721989214|
|-0.12103055721989214|
|-0.12103055721989214|
|-0.12103055721989214|
|-0.12103055721989214|
|-0.12103055721989214|
|-0.12103055721989214|
|-0.12103055721989214|
|-0.12103055721989214|
|-0.12103055721989214|
|-0.12103055721989214|
|-0.12103055721989214|
|-0.12103055721989214|
|-0.12103055721989214|
|-0.12103055721989214|
+--------------------+
only showing top 20 rows

RMSE: 0.564345
r2: -0.000000


What did you get?

How does it look like?

What is `r2`? `r2` is a shortcut for R Square: 
>R-squared is a statistical measure of how close the data are to the fitted regression line. It is also known as the coefficient of determination, or the coefficient of multiple determination for multiple regression.

What is RMSE?
> Root Mean Square Error (RMSE) is the standard deviation of the residuals (prediction errors). Residuals are a measure of how far from the regression line data points are; RMSE is a measure of how spread out these residuals are. In other words, it tells you how concentrated the data is around the line of best fit.

Try to play with the parameters and watch how they change.

RMSE alone is meaningless until we compare with the actual `label` value, such as mean, min and max. 
After such comparison, our RMSE looks pretty good.

Compare `RMSE` and `mean` output.
After such comparison, our RMSE looks pretty good.


In [8]:
df_for_lr.describe().show()

+-------+-------------------+
|summary|              label|
+-------+-------------------+
|  count|               1669|
|   mean|0.12103055721989216|
| stddev| 0.5645140879911827|
|    min|                  0|
|    max|                 19|
+-------+-------------------+



You built 2 machine learning models!

However, didn't get the best results.

It's OK!

And absolutly normal.

In chapter 4 you learn how to improve it.

### To load the models later, Save them to file:

In [9]:
lrModel.save("linearRegression_model")

In [10]:
fpGrowth_model.save("fpGrowth_model")

## Well Done! 👏👏👏
## You just finished: Apache Spark ML and create machine learning models¶
## Next Chapter: Evaluating ML models and using pipelines 