In [1]:
# !pip install pyspark
# !pip install findspark

# **pySpark Introduction**

This project uses `PySpark` which makes it a bit slower but still is adopted and used by developers (python developer community).

Spark distributes our data across multiple machines / cluster of machines improving data processing process. More detailed description about the cluster management system in Spark is referenced in [databricks and pySpark notes](databricks&pySpark-notes.md).

## Setting up the Spark environment

First we need to import pyspark and use Sparkcontext which will create a local spark environment. We want to use our local machine for it and more instructions on how to set it up are present at [pyspark docs](https://spark.apache.org/docs/0.9.0/quick-start.html#a-standalone-app-in-python)

>Only one SparkContext should be active per `JVM`. You must stop() the active SparkContext before creating a new one.
> Make sure you have installed and linked openjdk and spark files and set up spark configuration.

You need to download the following things to download and make it work:
- `JDK` environment
- `Spark` analytics engine
- `winlete` app for the `Hadoop` framework, *this may not be necessary but my local machine required it for it to work properly.*
- `Python` and `jupyter notebook` -- primary language and the workspace

Make sure to add all the elements above as `NAME_HOME` in you system environment variables and then set the path to their bin folders accordingly.
For windows, it would be as following in the `system environment variable` app:

<img src ="images/pyspark-env.png">

- oneDrive is not necessary here :)

We will also need `findspark` because it adds psypark to sys.path at runtime

In [2]:
import findspark 

findspark.init()
findspark.find()

'C:\\Program Files\\spark-3.5.1-bin-hadoop3'

In [3]:
from pyspark import SparkContext
from pyspark.conf import SparkConf

# Creating a SparkConf object to configure the Spark application
conf = SparkConf().setAppName("MyApp").setMaster("local[*]")  # You can change master URL as needed

# Creating a SparkContext object
sc = SparkContext(conf=conf)

We have access to the Spark through the variable `sc` now and have to stop after our process with `stop()`.

In [4]:
sc

In [6]:
# sc.stop() #commented out because we still need it

## Parallelising with Spark

Now we will set up some data to be parallelised. This list of number on the `driver` which will tell the `worker nodes` what to do to process the data the via a `cluster manager`. More information in [databricks and pySpark notes](databricks&pySpark-notes.md).

We are going to take a python list and distribute it into an `rdd`, `resilient distributed dataset`.

In [6]:
nums = list(range(1,1_000001))

nums_rdd = sc.parallelize(nums)
nums_rdd

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:289

We can ask for whats in the `rdd` with the `collect()` method. However, this is a risky operations as there can be a A LOT OF DATA distributed through different machines and we don't want to bring all that data back to the `driver` for no reason.


We can `take(n)` to only look at the `n` elements.

In [7]:
nums_rdd.collect()

[1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 18,
 19,
 20,
 21,
 22,
 23,
 24,
 25,
 26,
 27,
 28,
 29,
 30,
 31,
 32,
 33,
 34,
 35,
 36,
 37,
 38,
 39,
 40,
 41,
 42,
 43,
 44,
 45,
 46,
 47,
 48,
 49,
 50,
 51,
 52,
 53,
 54,
 55,
 56,
 57,
 58,
 59,
 60,
 61,
 62,
 63,
 64,
 65,
 66,
 67,
 68,
 69,
 70,
 71,
 72,
 73,
 74,
 75,
 76,
 77,
 78,
 79,
 80,
 81,
 82,
 83,
 84,
 85,
 86,
 87,
 88,
 89,
 90,
 91,
 92,
 93,
 94,
 95,
 96,
 97,
 98,
 99,
 100,
 101,
 102,
 103,
 104,
 105,
 106,
 107,
 108,
 109,
 110,
 111,
 112,
 113,
 114,
 115,
 116,
 117,
 118,
 119,
 120,
 121,
 122,
 123,
 124,
 125,
 126,
 127,
 128,
 129,
 130,
 131,
 132,
 133,
 134,
 135,
 136,
 137,
 138,
 139,
 140,
 141,
 142,
 143,
 144,
 145,
 146,
 147,
 148,
 149,
 150,
 151,
 152,
 153,
 154,
 155,
 156,
 157,
 158,
 159,
 160,
 161,
 162,
 163,
 164,
 165,
 166,
 167,
 168,
 169,
 170,
 171,
 172,
 173,
 174,
 175,
 176,
 177,
 178,
 179,
 180,
 181,
 182,
 183,
 184,
 185

In [8]:
nums_rdd.take(5)

[1, 2, 3, 4, 5]

# **Data manipulation**

To do any sort of manipulations to the data, we can `map()` a function to it, such as the following procedure which squares the number. We can do the same to have some composite functions such as forming a tuple of the squared number and the number of digits in them.

In [9]:
squared_nums_rdd = nums_rdd.map(lambda x: x**2)
squared_nums_rdd.take(5)

[1, 4, 9, 16, 25]

In [10]:
pairs = squared_nums_rdd.map(lambda x: (x, len(str(x))))
pairs.take(5)

[(1, 1), (4, 1), (9, 1), (16, 2), (25, 2)]

We can use the same syntax to `filter()` out the data with specific conditions. The following code will only retain the data with even number of digits

In [11]:
pairs_filtered = pairs.filter(lambda x: (x[1] % 2) == 0)
pairs_filtered.take(10)

[(16, 2),
 (25, 2),
 (36, 2),
 (49, 2),
 (64, 2),
 (81, 2),
 (1024, 4),
 (1089, 4),
 (1156, 4),
 (1225, 4)]

We can also group to and the summarise the data to get aggregrated information or some summary statistics. We will use `groupByKey()` so, we need to flip the pairs and we can the number of digits as the key. As the grouped items are iterable, we can treat them as list and then take the summary statistic fo them, mean, in our case.

In [12]:
# flip the pairs
flipped_pairs = pairs_filtered.map(lambda x: (x[1], x[0]))
grouped = flipped_pairs.groupByKey() # takes a bit longer as grouping is a more complex function
grouped.take(25)

[(8, <pyspark.resultiterable.ResultIterable at 0x2038dcf41d0>),
 (2, <pyspark.resultiterable.ResultIterable at 0x2038dcf5950>),
 (10, <pyspark.resultiterable.ResultIterable at 0x2038dcf48d0>),
 (4, <pyspark.resultiterable.ResultIterable at 0x20388aeda10>),
 (12, <pyspark.resultiterable.ResultIterable at 0x2038dcf5a50>),
 (6, <pyspark.resultiterable.ResultIterable at 0x2038732d050>)]

In [13]:
grouped_list = grouped.map(lambda x: (x[0], list(x[1])))

Not going to show the number as these lists can get pretty long, even the nested arrays will have large number of elements. However, I am doing the `list` typecasting and summary statistic calculations in 2 steps in order to be more efficient.

In [14]:
averaged = grouped_list.map(lambda x: (x[0], sum(x[1])/len(x[1])))
averaged.collect()

[(8, 47204941.666666664),
 (2, 45.166666666666664),
 (10, 4720705565.0),
 (4, 4675.5),
 (12, 472075391214.1667),
 (6, 471838.0)]

As we can see that due the squared numbers, the mean values were quite high. Just to make it look nicer, we will also sort it in the correct order. As sorting is already a high complexity function in comparison to mapping, this will also take a long while, especially with `rdd`

> In our case, the data was small enough to be stored in the local machine so we should have, stored there and then sorted. But either ways, try to minimise sorting processes in our data manipulation

In [15]:
averaged = averaged.sortByKey()
averaged.collect()

[(2, 45.166666666666664),
 (4, 4675.5),
 (6, 471838.0),
 (8, 47204941.666666664),
 (10, 4720705565.0),
 (12, 472075391214.1667)]

In [16]:
sc.stop()

# **Dataframes in pySpark**

We can also with dataframes with spark which will be the main purpose in data science projects. However, We will need to use ***SparkSession*** instead of ***SparkContext***. 

The main difference between the two:

 | SparkSession | SparkContext |
 | ------------ | ------------ |
 | used to access the underlying Spark environment | used to access the data stored in Spark  |
 | better for working data frames and datasets | better for working with RDD and low-level spark features |

We are first going to create a SparkSession with same name as above and then create our data and schema to work with the dataframe. Similar to `sc.stop()`, we can stop the Session with `spark.stop()`

In [8]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("MyApp").getOrCreate()

data = [('james',23,4000,'rude person'), 
        ('nupur',19,10_000,'the nicest person'), 
        ('paul',40,3000, 'goldilock'),
       ('karen',40,2000,'karen')]


cols = ['name','age','score','comments']

df = spark.createDataFrame(data = data, schema = cols)
print(df.printSchema())

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- score: long (nullable = true)
 |-- comments: string (nullable = true)

None


We can show our df using the `show()` method

In [9]:
df.show()

+-----+---+-----+-----------------+
| name|age|score|         comments|
+-----+---+-----+-----------------+
|james| 23| 4000|      rude person|
|nupur| 19|10000|the nicest person|
| paul| 40| 3000|        goldilock|
|karen| 40| 2000|            karen|
+-----+---+-----+-----------------+



We can do the same sort of manipulations as before but now we have to provide the ey. Such as the following function does a simple `groupBy` and then aggregrate using the mean / `avg`. However, in such a case, the age will become the index. We can also sort by the index.

In [10]:
avg_df = df.groupBy("age").agg({'score':'mean'})
avg_df.show()

+---+----------+
|age|avg(score)|
+---+----------+
| 19|   10000.0|
| 23|    4000.0|
| 40|    2500.0|
+---+----------+



This is nice but maybe we want other attributes back so we can `join()` the two tables no age as following. We will have to mention which columsn to `select()` to be specific and we will have to mention on what attribute to join and how to do so.

In [11]:
avg_full = avg_df.join(df.select('name','age','score','comments'), on = 'age', how = 'left')
avg_full.show()

+---+----------+-----+-----+-----------------+
|age|avg(score)| name|score|         comments|
+---+----------+-----+-----+-----------------+
| 19|   10000.0|nupur|10000|the nicest person|
| 23|    4000.0|james| 4000|      rude person|
| 40|    2500.0|karen| 2000|            karen|
| 40|    2500.0| paul| 3000|        goldilock|
+---+----------+-----+-----+-----------------+



We can also order them according to the comments if we want. We can simply mention `orderBy(attribute)` or sort it in a descending order by giving it the series and mentioning the `desc`.

In [12]:
avg_full_sorted = avg_full.orderBy(avg_full['age'].desc())
avg_full_sorted.show()

+---+----------+-----+-----+-----------------+
|age|avg(score)| name|score|         comments|
+---+----------+-----+-----+-----------------+
| 40|    2500.0| paul| 3000|        goldilock|
| 40|    2500.0|karen| 2000|            karen|
| 23|    4000.0|james| 4000|      rude person|
| 19|   10000.0|nupur|10000|the nicest person|
+---+----------+-----+-----+-----------------+



## Further notes

We can also mention the data type in the schema using DDL:

```python
schema = "name STRING, age INT, score INT, comments STRING"
```

or define it programmatically:
```python
from pyspark.sql.types import *
schema = StructType([
    StructField('name', StringType(), False),
    StructField('age', IntegerType(), False),
    StructField('score', IntegerType(), True),
    StructField('comments', StringType(), False)
])
```

### Dataset vs DataFrame

There is also something called DataSet in Spark and the main differences are as following:

| DataFrame | DataSet |
| --------- | ------- | 
|  distributed collection of data organized into named columns | distributed collection of data that provides the benefits of RDDs, strong type, lambda funcs and Spark SQL's optimised engine |
| similar to dataframes in Python and R | similar to SQL exec |
| API avail in Python, Scala, Java, R | API available in Java and Scala | 
| high-level abstraction | compile time sagety |
| optimised and suggested for more structured data | flexible for working with structured and unstructured data |

# **Machine Learning Libraries in Spark**

We can use Apache Spark `Mlib` to create machine learning model which includes training a model, tuning a model and managing a model. I will skip some sections in this lecture as those notes will be similar to the concepts of creating an ML pipeline in `Sklearn` or `TensorFlow` with which I already have experience

We will first download ML specific libraries.

In [27]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, VectorAssembler, StringIndexer
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import RFormula
import pandas as pd
import click

We will create an ML model to predict price of an airbnb apartment. We will use a simpler LinearRegression model.

Firstly, We read the data as a Spark DataFrame

In [15]:
filePath = "ML_data.parquet"

airbnbDF = spark.read.parquet(filePath)

airbnbDF.select("neighbourhood_cleansed", "room_type", "bedrooms", "bathrooms",
"number_of_reviews", "price").show(5)

+----------------------+---------------+--------+---------+-----------------+-----+
|neighbourhood_cleansed|      room_type|bedrooms|bathrooms|number_of_reviews|price|
+----------------------+---------------+--------+---------+-----------------+-----+
|      Western Addition|Entire home/apt|     1.0|      1.0|            180.0|170.0|
|        Bernal Heights|Entire home/apt|     2.0|      1.0|            111.0|235.0|
|        Haight Ashbury|   Private room|     1.0|      4.0|             17.0| 65.0|
|        Haight Ashbury|   Private room|     1.0|      4.0|              8.0| 65.0|
|      Western Addition|Entire home/apt|     2.0|      1.5|             27.0|785.0|
+----------------------+---------------+--------+---------+-----------------+-----+
only showing top 5 rows



Then we `randomSplit` the training and testing data, then we `vectorise` the predictors and create a pipeline to see how it performs against the test data. I will never test against the `testDF` in a real model building project but rather have a validation set or perform cross validation.

In [16]:
trainDF, testDF = airbnbDF.randomSplit([.8, .2], seed=42)

print(f"""There are {trainDF.count()} rows in the training set,
and {testDF.count()} in the test set""")

# take bedrooms and vectorise it to make features
vecAssembler = VectorAssembler(inputCols=["bedrooms"], outputCol="features")
vecTrainDF = vecAssembler.transform(trainDF)

lr = LinearRegression(featuresCol="features", labelCol="price")
lrModel = lr.fit(vecTrainDF)

# the model parameters
m = round(lrModel.coefficients[0], 2)
b = round(lrModel.intercept, 2)

print(f"""
The formula for the linear regression line is
price = {m}*bedrooms + {b}""")

There are 5780 rows in the training set,
and 1366 in the test set

The formula for the linear regression line is
price = 123.68*bedrooms + 47.51


We will not create a simple pipeline using the two steps above of using a `VecAssembler` and `LR` fitting model

In [17]:
pipeline = Pipeline(stages=[vecAssembler, lr])
pipelineModel = pipeline.fit(trainDF)

# predicted data
predDF_simple = pipelineModel.transform(testDF)
predDF_simple.select("bedrooms", "features", "price", "prediction").show(10)

+--------+--------+------+------------------+
|bedrooms|features| price|        prediction|
+--------+--------+------+------------------+
|     1.0|   [1.0]|  85.0|171.18598011578285|
|     1.0|   [1.0]|  45.0|171.18598011578285|
|     1.0|   [1.0]|  70.0|171.18598011578285|
|     1.0|   [1.0]| 128.0|171.18598011578285|
|     1.0|   [1.0]| 159.0|171.18598011578285|
|     2.0|   [2.0]| 250.0|294.86172649777757|
|     1.0|   [1.0]|  99.0|171.18598011578285|
|     1.0|   [1.0]|  95.0|171.18598011578285|
|     1.0|   [1.0]| 100.0|171.18598011578285|
|     1.0|   [1.0]|2010.0|171.18598011578285|
+--------+--------+------+------------------+
only showing top 10 rows



Now we will use all the features and use different data manipulations on them.

1. Check for columns that are of string type and store them separately
2. String columns will have`StringIndexing` and `OHE` performed on them and add them and added into our models.
3. Extract double values as double (except for price)
4. vectorise all of them
5. Create a DataFrame of predictors using the same syntax as in `R`
6. Create the same pipeline as before

In [22]:
categoricalCols = [field for (field, dataType) in trainDF.dtypes if dataType == "string"]

# string input
indexOutputCols = [x + "Index" for x in categoricalCols]
oheOutputCols = [x + "OHE" for x in categoricalCols]

stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=indexOutputCols, handleInvalid="skip")
oheEncoder = OneHotEncoder(inputCols=indexOutputCols, outputCols=oheOutputCols)

numericCols = [field for (field, dataType) in trainDF.dtypes if ((dataType == "double") & (field != "price"))]


# Assemble features
assemblerInputs = oheOutputCols + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

rFormula = RFormula(formula="price ~ .", featuresCol="features", labelCol="price", handleInvalid="skip")

In [23]:
# Linear Regression model
lr = LinearRegression(featuresCol="features", labelCol="price")

# pipeline = Pipeline(stages = [stringIndexer, oheEncoder, vecAssembler, lr])
# Or use RFormula
pipeline = Pipeline(stages = [rFormula, lr])

pipelineModel = pipeline.fit(trainDF)
predDF = pipelineModel.transform(testDF)
predDF.select("features", "price", "prediction").show(5)

+--------------------+-----+------------------+
|            features|price|        prediction|
+--------------------+-----+------------------+
|(98,[0,3,6,7,23,4...| 85.0| 55.30094763354373|
|(98,[0,3,6,7,23,4...| 45.0| 22.70940291742818|
|(98,[0,3,6,7,23,4...| 70.0|27.182906571761578|
|(98,[0,3,6,7,13,4...|128.0|-91.90969569190747|
|(98,[0,3,6,7,13,4...|159.0| 94.54162775821169|
+--------------------+-----+------------------+
only showing top 5 rows



In [88]:
# # Show the model summary
print("Model coefficients: " + str(model.stages[-1].coefficients))
print("Model intercept: " + str(model.stages[-1].intercept))

Model coefficients: [-10.680348908293674,-1390.952489733411,-1402.527978674161,-1386.907903065293,-1454.4484899169297,-1402.116377384968,-6.362146769272623,-0.056621067972006604,55.89424123365027,38.80825515478183,8.383043179873187,6.019001031007852,50.84662599297539,29.67477717334662,18.586989107252542,59.96268328414251,39.97943885288819,19.123807416768702,6.715022797603014,-22.838679671928915,11.96369190276733,39.26948325037505,14.28798691554543,-9.579600274544747,49.53153102087754,72.8069233455944,-41.2218298793475,-4.5342329044995635,105.76724472030375,-3.021675321475595,131.87149838117,65.28472932889453,-42.4603941265768,95.4065720527332,39.15524906601873,105.83164117979906,44.611544569888224,84.95910377737069,85.88653390835726,81.955903142468,222.7141012973797,48.10397820489529,64.26407019205433,2486.8385272461405,789.8432072531353,-37.71597343844105,1.356268599951937,28.980391021534427,-20.437568214523672,4.234126737275918,-16.177839312194575,-6.751182988707704,-35.1878536240866

We are going to evaluate this model using $R^2$ and $RMSE$.

In [24]:
regressionEvaluator = RegressionEvaluator(
predictionCol = "prediction",
labelCol = "price",
metricName = "rmse")

rmse = regressionEvaluator.evaluate(predDF_simple)
print(f"RMSE is {rmse:.1f}")

r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)
print(f"R2 is {r2}")

RMSE is 221.6
R2 is 0.15985154393435386
