### Spark Training - Spark DF vs Pandas DF, GLM implementation using SparkML, Pros and Cons of SparkML

**Objective 1:** Understand, document and provide example on data input-ouput in Spark. Differences between Spark DFs and Pandas DFs?

**Objective 2:** Build GLM models using SparkML library (replicate analysis on AutoClaims data)

**Objective 3:** Understand and demonstrate Pro's and Con's of using SparkML

**What is Spark?**

- A big data processing framework written in scala 
- provides other languages like Java, Python and R
- PySpark - Python API for Spark
- efficiently working with huge amounts of data, which do not fit into the memory of a single computer
- distributed processing
- Initially Spark only provided RDDs API but later it picked up the good ideas of data frames in Pandas and R, which is now the preferred API to use

***Additional Info:***
- RDDs - resilient distributed datasets - one dimensional data - performing transformation on unstructured data at a lower level than DF - ex: media streams

**Create a spark session** 

https://sparkbyexamples.com/pyspark/pyspark-what-is-sparksession/ <br>
https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html?highlight=sparksession

- entry point to Spark introduced with Spark 2.0 
- SparkSession is a combined class for all different contexts we used to have prior to 2.0 relase - SparkContext, SQL Context, HIVE Context
- Prior Spark 2.0, Spark Context was the entry point of any spark application and for any other spark interactions we had to create separate context - SQL, HIVE, etc
- you can only create one spark session per application

***Additional Info:***
- sparkConf is required to create the spark context object, which stores configuration parameter like appName (to identify your spark driver), application, number of core and memory size of executor running on worker node

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("Training").getOrCreate()     
# “spark://master:7077” to run on a Spark cluster or local[4] to run locally with 4 cores
# the name of your app
#.config("spark.some.config.option", "some-value")    # config how much memory to use, number of cores, etc
#.enableHiveSupport()                                 # Gets an existing SparkSession or creates a new one

                                            
spark

### Objective 1
Understand, document and provide example on data input-ouput in Spark (RDD). How does this fit with current Python / Pandas type workflow?

### **Loading data**

Spark data frame supports reading data from formats like json, parquet, HIVE table, existing RDD – be it from local file system, distributed file system (HDFS), cloud storage, external databases. 

##### RDD example <br> 
https://sparkbyexamples.com/apache-spark-rdd/different-ways-to-create-spark-rdd/#from-parallelize

In [20]:
from pyspark.sql import Row
# a list of tuples
l = [('Ankit',25),('Jalfaizy',22),('saurabh',20),('Bala',26)]
sc = spark.sparkContext
# partition the data and distribute it to rdds
rdd = sc.parallelize(l)
people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
rdd.collect()

[('Ankit', 25), ('Jalfaizy', 22), ('saurabh', 20), ('Bala', 26)]

##### I. Loading csv as a spark df using spark

In [2]:
# spark has an integrated function to read csv
# if we don't do inferSchema everything will be a string
spark_df = spark.read.csv("car_insurance_claim.csv", header='true', inferSchema=True)
#print it
spark_df.show(1)

+--------+--------+-------+---+--------+---+-------+-------+--------+-------+------+---------+------------+--------+-------+--------+---+--------+-------+--------+--------+-------+-------+-------+-------+----------+-------------------+
|      ID|KIDSDRIV|  BIRTH|AGE|HOMEKIDS|YOJ| INCOME|PARENT1|HOME_VAL|MSTATUS|GENDER|EDUCATION|  OCCUPATION|TRAVTIME|CAR_USE|BLUEBOOK|TIF|CAR_TYPE|RED_CAR|OLDCLAIM|CLM_FREQ|REVOKED|MVR_PTS|CLM_AMT|CAR_AGE|CLAIM_FLAG|         URBANICITY|
+--------+--------+-------+---+--------+---+-------+-------+--------+-------+------+---------+------------+--------+-------+--------+---+--------+-------+--------+--------+-------+-------+-------+-------+----------+-------------------+
|63581743|       0|16MAR39| 60|       0| 11|$67,349|     No|      $0|   z_No|     M|      PhD|Professional|      14|Private| $14,230| 11| Minivan|    yes|  $4,461|       2|     No|      3|     $0|     18|         0|Highly Urban/ Urban|
+--------+--------+-------+---+--------+---+-------+----

In [10]:
# as in columns and dtypes in pandas
spark_df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- KIDSDRIV: integer (nullable = true)
 |-- BIRTH: string (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- HOMEKIDS: integer (nullable = true)
 |-- YOJ: integer (nullable = true)
 |-- INCOME: string (nullable = true)
 |-- PARENT1: string (nullable = true)
 |-- HOME_VAL: string (nullable = true)
 |-- MSTATUS: string (nullable = true)
 |-- GENDER: string (nullable = true)
 |-- EDUCATION: string (nullable = true)
 |-- OCCUPATION: string (nullable = true)
 |-- TRAVTIME: integer (nullable = true)
 |-- CAR_USE: string (nullable = true)
 |-- BLUEBOOK: string (nullable = true)
 |-- TIF: integer (nullable = true)
 |-- CAR_TYPE: string (nullable = true)
 |-- RED_CAR: string (nullable = true)
 |-- OLDCLAIM: string (nullable = true)
 |-- CLM_FREQ: integer (nullable = true)
 |-- REVOKED: string (nullable = true)
 |-- MVR_PTS: integer (nullable = true)
 |-- CLM_AMT: string (nullable = true)
 |-- CAR_AGE: integer (nullable = true)
 |-- CLAIM_FLAG: i

##### II. Loading csv as a spark df using pandas

In [None]:
#spark_df2 = spark.createDataFrame(pd.read_csv("car_insurance_claim.csv"))

**Exporting Data**

https://sparkbyexamples.com/spark/spark-read-write-dataframe-parquet-example/ <br>
https://medium.com/@mrpowers/managing-spark-partitions-with-coalesce-and-repartition-4050c57ad5c4 <br>
https://towardsdatascience.com/a-brief-introduction-to-pyspark-ff4284701873 <br>
https://mungingdata.com/apache-spark/output-one-file-csv-parquet/

##### I. If we are going to process results with Spark then parquet is a good format to use for saving data frames.
- column names and data types are preserved
- When saving a dataframe in parquet format, it is often partitioned into multiple files

In [None]:
# mode - append or overwrite
# partition by - creates a folder hierarchy for each spark partition - gender folder and then salary within the gender folder

spark_df.write.mode('overwrite').partitionBy("gender","salary").parquet("/tmp/output/claims_data.parquet")

<img src="images/partitions.PNG" width="600" />

- manual partitioning of files
- 2 ways to do it: repartition and coalesce

In [26]:
# check number of partitions
spark_df.rdd.getNumPartitions()

1

In [None]:
# Spark splits data into partitions and executes computations on the partitions in parallel

# coalesce
# imagine we have A,B,C,D - coalesce is reducing data moving, B moved to A and D moved to C
# you can't increase the number of partitions to > 4
spark_df.coalesce(2).write.mode().parquet()

# repartition
# you can increase and decrease
# shuffles the data and then do partitioning without trying to minimize the data movement
spark_df.repartition(5).write.mode().parquet()

##### II. Writing out a csv file
- all of the data will be pulled to a single node before being output to CSV
- recommended when you need to save a small dataframe and process it in a system outside of Spark
- we can control the directory but not the file name when saving locally

In [None]:
# repartition can be used too
spark_df.coalesce(1).write.csv("home/Documents/tmp/one-file-coalesce")

<img src="images/one_file_csv.PNG" width="600" />

### **When is Spark useful**

https://towardsdatascience.com/spark-vs-pandas-part-2-spark-c57f8ea3a781

#### **How a spark df is different from a pandas df**

**1. Schema**
- Spark df is built on the concept of columns and rows with the set of columns implicitly implying a schema shared among all rows
- In contrast to Pandas, the schema also dictates the data type for each column that can be stored in each row

In [9]:
import pandas as pd
import numpy as np

x = pd.DataFrame(columns=['a','b'],data={'a':[1,'1'],'b':['3',np.nan]})
x

Unnamed: 0,a,b
0,1,3.0
1,1,


In [3]:
spark_df.show(1)

+--------+--------+-------+---+--------+---+-------+-------+--------+-------+------+---------+------------+--------+-------+--------+---+--------+-------+--------+--------+-------+-------+-------+-------+----------+-------------------+
|      ID|KIDSDRIV|  BIRTH|AGE|HOMEKIDS|YOJ| INCOME|PARENT1|HOME_VAL|MSTATUS|GENDER|EDUCATION|  OCCUPATION|TRAVTIME|CAR_USE|BLUEBOOK|TIF|CAR_TYPE|RED_CAR|OLDCLAIM|CLM_FREQ|REVOKED|MVR_PTS|CLM_AMT|CAR_AGE|CLAIM_FLAG|         URBANICITY|
+--------+--------+-------+---+--------+---+-------+-------+--------+-------+------+---------+------------+--------+-------+--------+---+--------+-------+--------+--------+-------+-------+-------+-------+----------+-------------------+
|63581743|       0|16MAR39| 60|       0| 11|$67,349|     No|      $0|   z_No|     M|      PhD|Professional|      14|Private| $14,230| 11| Minivan|    yes|  $4,461|       2|     No|      3|     $0|     18|         0|Highly Urban/ Urban|
+--------+--------+-------+---+--------+---+-------+----

**2. Index**
- as opposed to pd, Spark doesn’t support any indexing for efficient access to individual rows in a DataFrame
    - since all transformations are always performed on all records, Spark will reorganize the data as needed on the fly

**3. Data Scaling**

- spark is designed to scale with data in terms of number of rows not number of columns
    - it can easily work with billions of rows but the number of columns should be limited (hundreds to a couple of thousands)
    - you cannot simply perform a transpose operation in Apache Spark

**4. Nested data structures**
- as opposed to pandas, spark does not support indices nor nested indices but it offers very good support for deeply nested data structures like in json docs
    - these are often used in the internal communication between apps

<img src="images/json_nested_data.PNG" width="300" />

- The tabular representation below doesn’t really show the complex nature of the tweets

<img src="images/twitter_nested.PNG" width="700" />

- however, the schema does

<img src="images/tweets_schema.PNG" width="400" />

**5. Execution**
- spark uses a lazy execution model
- transformations on a data set are not processed immediately 
- Spark will start to work when the result of all transformations is required
    - displaying some records on the console 
    - when you write the results into a file
- Spark has the ability to optimize the whole plan before execution instead of blindingly following the steps as specified by the developer

**6. Processing scalability**
- breaks down work into individual tasks to be processed in parallel and make use of the resources

**7. Data Scalability**
- scales very well with huge amounts of data

**8. Transformations**
##### A broad set of transformations - uses SQL wording but not the syntax in all its methods

- projection - creating a new data set with less columns - similar to select in SQL <br>
##### Ex:

In [4]:
len(spark_df.columns)

27

In [5]:
# choosing the variables for the model
important_vars = ['AGE','GENDER','CAR_USE','BLUEBOOK','CAR_TYPE','URBANICITY','MSTATUS']
spark_df = spark_df.select(important_vars+ ['CLAIM_FLAG','CLM_AMT'])

- filtering - selecting subsets of rows - where in SQL <br>
##### Ex:

In [18]:
import pyspark.sql.functions as F 

private = spark_df.where(F.col('CAR_USE') == 'Private')
private.count()

6513

In [19]:
private.first()

Row(AGE=60, GENDER='M', CAR_USE='Private', BLUEBOOK='$14,230', CAR_TYPE='Minivan', URBANICITY='Highly Urban/ Urban', MSTATUS='z_No', CLAIM_FLAG=0, CLM_AMT='$0')

- concatenation - it is only vertical
    - adding rows from a second DataFrame with the same columns
- join is to do horizontal concatenation - records with similar join keys are sent to the same machine and then the join is executed in parallel
    - tricky if no natural join key is available 

##### Ex:

##### Pandas 

In [31]:
import pandas as pd
x = pd.DataFrame(columns=('a','b','c'),data={'a':[1,2],'b':[3,4],'c':[6,7]})
y = pd.DataFrame(columns=('c','d'),data={'c':[5,6],'d':[8,9]})

# vertical concatenation
pd.concat([x,y],axis=0)

Unnamed: 0,a,b,c,d
0,1.0,3.0,6,
1,2.0,4.0,7,
0,,,5,8.0
1,,,6,9.0


In [32]:
# horizontal concatenation - adding columns from two data frames
# this is not possible in spark unless we join the tables
pd.concat([x,y],axis=1)

Unnamed: 0,a,b,c,c.1,d
0,1,3,6,5,8
1,2,4,7,6,9


##### Spark

In [34]:
non_private = spark_df.where(F.col('CAR_USE') != 'Private')

spark_df_concat = private.union(non_private)
spark_df_concat.count(), private.count(), non_private.count()

(10302, 6513, 3789)

- aggregations 
    - similar wording as in SQL - min, max, sum, etc
    - column wise aggregation is possible in spark as in pandas as shown below
        - as said above spark is developed to scale with the number of rows not columns
    - row-wise aggregations over columns is to be done manually as opposed to pd 

In [36]:
# column wise aggregation is possible in spark as in pandas
age_agg = private.select(F.min('AGE'), F.max('AGE'), F.avg('AGE'))
age_agg.show()

+--------+--------+-----------------+
|min(AGE)|max(AGE)|         avg(AGE)|
+--------+--------+-----------------+
|      16|      81|45.01629014907023|
+--------+--------+-----------------+



In [39]:
private = private.withColumn('AGE_2nd_driver', F.col('AGE')-F.lit(5))
private.select(['AGE_2nd_driver','AGE']).first()

Row(AGE_2nd_driver=55, AGE=60)

In [43]:
# avg age
private = private.withColumn('AVG_AGE',(F.col('AGE')+F.col('AGE_2nd_driver'))/2)
private.select(['AGE_2nd_driver','AGE','AVG_AGE']).first()

Row(AGE_2nd_driver=55, AGE=60, AVG_AGE=57.5)

### Objective 2:
Build GLM models using SparkML library (replicate analysis on AutoClaims data) <br>

https://people.stat.sc.edu/haigang/GLM_in_spark.html <br>
https://towardsdatascience.com/building-a-linear-regression-with-pyspark-and-mllib-d065c3ba246a <br>
https://towardsdatascience.com/machine-learning-with-pyspark-and-mllib-solving-a-binary-classification-problem-96396065d2aa <br>
https://spark.apache.org/docs/latest/ml-classification-regression.html <br>
https://goodboychan.github.io/chans_jupyter/python/datacamp/pyspark/machine_learning/2020/08/10/01-Model-tuning-and-selection-in-PySpark.html

### **Data Prep**

In [20]:
spark_df = spark_df.dropna()

In [21]:
# convert object to numerical
for col in ['INCOME','HOME_VAL','BLUEBOOK','OLDCLAIM', 'CLM_AMT']:
    spark_df = spark_df.withColumn(col, F.regexp_replace(spark_df[col], r"([^.0-9])", ""))
    spark_df = spark_df.withColumn(col, spark_df[col].cast("float"))
    #spark_df = spark_df.fillna({col:0})
spark_df.first()

Row(ID=63581743, KIDSDRIV=0, BIRTH='16MAR39', AGE=60, HOMEKIDS=0, YOJ=11, INCOME=67349.0, PARENT1='No', HOME_VAL=0.0, MSTATUS='z_No', GENDER='M', EDUCATION='PhD', OCCUPATION='Professional', TRAVTIME=14, CAR_USE='Private', BLUEBOOK=14230.0, TIF=11, CAR_TYPE='Minivan', RED_CAR='yes', OLDCLAIM=4461.0, CLM_FREQ=2, REVOKED='No', MVR_PTS=3, CLM_AMT=0.0, CAR_AGE=18, CLAIM_FLAG=0, URBANICITY='Highly Urban/ Urban')

In [22]:
# clean text classes
spark_df = spark_df.withColumn('MSTATUS', F.upper(F.col('MSTATUS')))
spark_df = spark_df.withColumn('MSTATUS', F.regexp_replace(spark_df['MSTATUS'], r"([Z_])", ""))
spark_df = spark_df.withColumn('MSTATUS', F.regexp_replace(spark_df['MSTATUS'], r"([^A-Z])", ""))
spark_df.first()

Row(ID=63581743, KIDSDRIV=0, BIRTH='16MAR39', AGE=60, HOMEKIDS=0, YOJ=11, INCOME=67349.0, PARENT1='No', HOME_VAL=0.0, MSTATUS='NO', GENDER='M', EDUCATION='PhD', OCCUPATION='Professional', TRAVTIME=14, CAR_USE='Private', BLUEBOOK=14230.0, TIF=11, CAR_TYPE='Minivan', RED_CAR='yes', OLDCLAIM=4461.0, CLM_FREQ=2, REVOKED='No', MVR_PTS=3, CLM_AMT=0.0, CAR_AGE=18, CLAIM_FLAG=0, URBANICITY='Highly Urban/ Urban')

In [24]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

# encodes a string column of labels to a column of label indices
stage_string = [StringIndexer(inputCol= c, outputCol= c+"_string_encoded") for c in ['GENDER','CAR_USE','CAR_TYPE','URBANICITY','MSTATUS']]
stage_one_hot = [OneHotEncoder(inputCol= c+"_string_encoded", outputCol= c+ "_one_hot") for c in ['GENDER','CAR_USE','CAR_TYPE','URBANICITY','MSTATUS']]

ppl = Pipeline(stages = stage_string + stage_one_hot)
spark_df = ppl.fit(spark_df).transform(spark_df)

In [25]:
spark_df.first()

Row(AGE=60, GENDER='M', CAR_USE='Private', BLUEBOOK=14230.0, CAR_TYPE='Minivan', URBANICITY='Highly Urban/ Urban', MSTATUS='NO', CLAIM_FLAG=0, CLM_AMT=0.0, GENDER_string_encoded=1.0, CAR_USE_string_encoded=0.0, CAR_TYPE_string_encoded=1.0, URBANICITY_string_encoded=0.0, MSTATUS_string_encoded=1.0, GENDER_one_hot=SparseVector(1, {}), CAR_USE_one_hot=SparseVector(1, {0: 1.0}), CAR_TYPE_one_hot=SparseVector(5, {1: 1.0}), URBANICITY_one_hot=SparseVector(1, {0: 1.0}), MSTATUS_one_hot=SparseVector(1, {}))

In [27]:
from pyspark.ml.feature import Normalizer, VectorAssembler, StandardScaler

# merges columns into a vector column
vector_assembler = VectorAssembler(inputCols = ['AGE','BLUEBOOK'], outputCol = "numericals")
spark_df = vector_assembler.transform(spark_df)

scaler = StandardScaler(inputCol = "numericals", outputCol = "numericals_after_scale")

ppl2 = Pipeline(stages = [scaler])
spark_df = ppl2.fit(spark_df).transform(spark_df)

In [28]:
spark_df.first()

Row(AGE=60, GENDER='M', CAR_USE='Private', BLUEBOOK=14230.0, CAR_TYPE='Minivan', URBANICITY='Highly Urban/ Urban', MSTATUS='NO', CLAIM_FLAG=0, CLM_AMT=0.0, GENDER_string_encoded=1.0, CAR_USE_string_encoded=0.0, CAR_TYPE_string_encoded=1.0, URBANICITY_string_encoded=0.0, MSTATUS_string_encoded=1.0, GENDER_one_hot=SparseVector(1, {}), CAR_USE_one_hot=SparseVector(1, {0: 1.0}), CAR_TYPE_one_hot=SparseVector(5, {1: 1.0}), URBANICITY_one_hot=SparseVector(1, {0: 1.0}), MSTATUS_one_hot=SparseVector(1, {}), numericalss=DenseVector([60.0, 14230.0]), numericals=DenseVector([60.0, 14230.0]), numericals_after_scale=DenseVector([6.9159, 1.7628]))

In [30]:
# Training and Testing
categoricals = [var for var in spark_df.columns if var.endswith("_one_hot")]
num = ["numericals"]

vector_assembler = VectorAssembler(inputCols = categoricals + num, outputCol= "features")
spark_df = vector_assembler.transform(spark_df)

train_set, test_set = spark_df.randomSplit([0.7, 0.3], seed = 42)

### **GLM**

In [None]:
# weightCol is also available as an argument
# regParam - regularization param that decreases ncreases the coefficients 
# the bigger the regParam the closer the coef to 0
# this is done to discourage learning of more complex model and thus, overfitting
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator

# FREQUENCY
frequency =  GeneralizedLinearRegression(family="binomial",
                                           link="logit",
                                           #maxIter=10,
                                           fitIntercept = True,
                                           labelCol = "CLAIM_FLAG",
                                           #regParam=0.3
                                        )
# customized name to the prediction col
frequency.setPredictionCol("frq_pred")

# set the parameters 
para_grid = ParamGridBuilder()\
           .addGrid(frequency.regParam, [0.1, 0.3, 0.5, 0.7, 0.9])\
           .build()

frq_evaluator = BinaryClassificationEvaluator(labelCol="CLAIM_FLAG", rawPredictionCol='frq_pred', metricName='areaUnderROC')

# GridSearchCV in sklearn
frq_cross_val = CrossValidator(estimator = frequency,
                           estimatorParamMaps= para_grid,
                           evaluator = frq_evaluator,
                              numFolds=3)

fitted_frequency = frq_cross_val.fit(train_set)

In [47]:
# best hyperparameters
frq_model = fitted_frequency.bestModel
frq_model._java_obj.getRegParam()

0.1

In [62]:
# Make predictions.
frq_predictions = frq_model.transform(test_set)

# Select example rows to display.
frq_predictions.select("frq_pred", "CLAIM_FLAG", "features").show(5)

+-------------------+----------+--------------------+
|           frq_pred|CLAIM_FLAG|            features|
+-------------------+----------+--------------------+
| 0.4084206431036104|         0|(11,[0,1,3,7,9,10...|
| 0.4546058774666069|         0|(11,[1,6,7,9,10],...|
|  0.519013380964209|         1|[1.0,1.0,0.0,0.0,...|
|0.49195439797650353|         1|[1.0,1.0,1.0,0.0,...|
| 0.6366999586372941|         0|(11,[0,2,7,9,10],...|
+-------------------+----------+--------------------+
only showing top 5 rows



In [58]:
# SEVERITY
train_set_severity = train_set.where((train_set["CLAIM_FLAG"] > 0))

severity =  GeneralizedLinearRegression(family="gamma",
                                           link="log",
                                           #maxIter=10,
                                           fitIntercept = True,
                                           labelCol = "CLM_AMT",
                                           #regParam=0.3
                                       )
# customized name to the prediction col
severity.setPredictionCol("sev_pred")

# cross validation 
para_grid = ParamGridBuilder()\
           .addGrid(frequency.regParam, [0.1, 0.3, 0.5, 0.7, 0.9])\
           .build()

sev_evaluator = RegressionEvaluator(predictionCol="sev_pred", labelCol="CLM_AMT", metricName="mae")

sev_cross_val = CrossValidator(estimator = severity,
                            estimatorParamMaps= para_grid,
                            evaluator = sev_evaluator,
                               numFolds=3)

fitted_severity = sev_cross_val.fit(train_set_severity)

In [59]:
# best hyperparameters
sev_model = fitted_severity.bestModel
sev_model._java_obj.getRegParam()

0.0

In [64]:
test_set_severity = test_set.where((test_set["CLAIM_FLAG"] > 0))

# Make predictions
sev_predictions = sev_model.transform(test_set_severity)

# Select example rows to display.
sev_predictions.select("sev_pred", "CLM_AMT", "features").show(5)

mae = sev_evaluator.evaluate(sev_predictions)
print("MAE on test data = %g" % mae)

+------------------+-------+--------------------+
|          sev_pred|CLM_AMT|            features|
+------------------+-------+--------------------+
|3860.4483393879086| 4386.0|[1.0,1.0,0.0,0.0,...|
| 4388.005225467674| 3854.0|[1.0,1.0,1.0,0.0,...|
| 5815.854536216155| 4400.0|(11,[0,1,2,7,9,10...|
| 4877.560594427159| 2851.0|(11,[0,3,9,10],[1...|
| 6680.578176346916| 2644.0|(11,[0,2,8,9,10],...|
+------------------+-------+--------------------+
only showing top 5 rows

MAE on test data = 3289.69


In [75]:
# add the average of claims amount as a column
mean_clm_amt = sev_predictions.agg({"CLM_AMT":"avg"}).take(1)[0][0]
sev_predictions = sev_predictions.withColumn("CLM_AMT_MEAN", F.lit(mean_clm_amt))

In [76]:
sev_predictions.select("sev_pred", "CLM_AMT","CLM_AMT_MEAN", "features").show(5)

+------------------+-------+-----------------+--------------------+
|          sev_pred|CLM_AMT|     CLM_AMT_MEAN|            features|
+------------------+-------+-----------------+--------------------+
|3860.4483393879086| 4386.0|5287.752650176679|[1.0,1.0,0.0,0.0,...|
| 4388.005225467674| 3854.0|5287.752650176679|[1.0,1.0,1.0,0.0,...|
| 5815.854536216155| 4400.0|5287.752650176679|(11,[0,1,2,7,9,10...|
| 4877.560594427159| 2851.0|5287.752650176679|(11,[0,3,9,10],[1...|
| 6680.578176346916| 2644.0|5287.752650176679|(11,[0,2,8,9,10],...|
+------------------+-------+-----------------+--------------------+
only showing top 5 rows



In [77]:
sev_evaluator_mean = RegressionEvaluator(predictionCol="CLM_AMT_MEAN", labelCol="CLM_AMT", metricName="mae")
mae_mean = sev_evaluator_mean.evaluate(sev_predictions)
print("MAE on test data id the model was to predcit the mean claim amount = %g" % mae_mean)

MAE on test data id the model was to predcit the mean claim amount = 3081.09


### Objective 3:
Why Spark ML? Advantages and disadvantages <br>
Integrated framework to perform advanced analytics

https://towardsdatascience.com/apache-spark-mllib-tutorial-ec6f1cb336a9 <br> https://zaleslaw.medium.com/weakness-of-the-apache-spark-ml-library-41e674103591 <br> https://www.qubole.com/blog/apache-spark-use-cases/

- for the last few years the data has been growing exponentially and running a poerful ml model requires very powerful machines
- high-end machines however is not advantageous (high costs)

#### Advantages
- simplicity and compatibility with tools coming from python and R
- scalability - able to run the same code on a local machine and on a cluster as well - allows business to use the same workflow as the data grows
- distributed computing speeds up the learning phase and creates better models
- The MLlib can work in areas such as clustering, classification, and dimensionality reduction
    - used in customer segmentation
    - sentiment analysis
    - to process streaming data
- fast enough to perform exploratory queries without sampling
    - complex data sets can be processed and visualised by combining spark with data visualization tools, 

#### Disadvanatges
- a fewer algorithms in the SparkML lib
- Spark ML has a very limited support of Pandas DataFrame functions - plotting, building advanced statistical functions, etc
    - correlation between 2 columns can be calculated but it doesn't support ANOVA, different descriptive stats, hypothesis tests, etc
- hard integration with TensorFlow and PyTorch
- very basic neural network application - it has its own Keras-like API to build CNN,RNN, etc
- spark is not designed as a multi-user environment
    - spark users will need to know whether the memory they have access to will be sufficient for a dataset
    - you need to coordinate memory usage with the team so you can run projects concurrently