Your goal is to create a model that can predict whether a customer will churn (0 or 1) based on the features in [this dataset](https://www.kaggle.com/hassanamin/customer-churn). See the slides for more information.

First, we need to create the Spark Session

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://mirrors.sonic.net/apache/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar xzf spark-3.1.2-bin-hadoop3.2.tgz
!pip install -q findspark


import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"


import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

Afterwards, we can read the file and inspect it

In [None]:
#Please drop the file in the environment's 'Files' panel
df = spark.read.options(header="true", inferSchema="true").csv("/content/customer_churn.csv")
df.describe().toPandas()

Unnamed: 0,summary,Names,Age,Total_Purchase,Account_Manager,Years,Num_Sites,Onboard_date,Location,Company,Churn
0,count,900,900.0,900.0,900.0,900.0,900.0,900,900,900,900.0
1,mean,,41.81666666666667,10062.82403333334,0.4811111111111111,5.27315555555555,8.587777777777777,,,,0.1666666666666666
2,stddev,,6.127560416916251,2408.644531858096,0.4999208935073339,1.274449013194616,1.7648355920350969,,,,0.3728852122772358
3,min,Aaron King,22.0,100.0,0.0,1.0,3.0,2006-01-02 04:16:13,"00103 Jeffrey Crest Apt. 205 Padillaville, IA ...",Abbott-Thompson,0.0
4,max,Zachary Walsh,65.0,18026.01,1.0,9.15,14.0,2016-12-28 04:07:38,Unit 9800 Box 2878 DPO AA 75157,"Zuniga, Clark and Shaffer",1.0


In [None]:
#To see which columns we wish to select, we do:
df.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



Here, we have to think wether it is work to index some columns: Both company name and location are pretty much unique, so I see no use for indexing them, although it could be interesting (but far too complicated) to index by country, for instance. I could also index the date by year, or add it to the model as datetime variable type, but, since I dont recall how to tell spark to accept a column as datetime (cant get .to_date() to work :/ ) I will leave it out for now.

In [None]:
#And we proceed to select them:
mycols =  df.select(['Churn', 'Age', 'Total_Purchase', 'Years', 'Num_sites'])
#Hint: since 'Account_Manager' is currently selected as random, we dont have to select it :p
final_train = mycols.na.drop()

Since this dataset did not provide us with a 'train' and a 'test' sub-database, we need to manually make the divission ourselves. Thus:

In [None]:
(train, test) = final_train.randomSplit([0.7,0.3])
train.describe().show(), test.describe().show()

+-------+-------------------+------------------+------------------+------------------+------------------+
|summary|              Churn|               Age|    Total_Purchase|             Years|         Num_sites|
+-------+-------------------+------------------+------------------+------------------+------------------+
|  count|                604|               604|               604|               604|               604|
|   mean| 0.1456953642384106|41.764900662251655|10058.752913907281|  5.24854304635762| 8.509933774834437|
| stddev|0.35309296232693316| 6.293619382523293|2443.5585608960027|1.2693878324304124|1.6725966135800687|
|    min|                  0|              22.0|             100.0|               1.0|               4.0|
|    max|                  1|              65.0|          16955.76|              9.15|              13.0|
+-------+-------------------+------------------+------------------+------------------+------------------+

+-------+-------------------+----------------

(None, None)

Since it seems like the divission was correctly made, and like we have enough items both in the train and test dataframes, we can proceed to building the model based on the train data:

In [None]:
from pyspark.ml.feature import (VectorAssembler, OneHotEncoder, VectorIndexer, StringIndexer)
assembler = VectorAssembler(inputCols=['Age', 'Total_Purchase', 'Years', 'Num_sites'],
                            outputCol='features')

Interesting to know is that we can use: 


```
from pyspark.ml.feature import (VectorAssembler, OneHotEncoder, VectorIndexer, StringIndexer)

gender_indexer = StringIndexer(inputCol='Sex', outputCol='SexIndex')
gender_encoder = OneHotEncoder(inputCol='SexIndex', outputCol='SexVec')

embarked_indexer = StringIndexer(inputCol='Embarked', outputCol='EmbarkedIndex')
embarked_encoder = OneHotEncoder(inputCol='EmbarkedIndex', outputCol='EmbarkedVec')
```

To index vectors, and then add this to pipeline and assembler:     

```
assembler = VectorAssembler(inputCols=['Pclass','SexVec','Age','SibSp','Parch','Fare','EmbarkedVec'],
                            outputCol='features')

pipeline = Pipeline(stages=[gender_indexer, embarked_indexer, gender_encoder, embarked_encoder, assembler, lr])
```

To work with parameters such as sex or others; however, as I have discussed previously, I will not use this here.

In [None]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol='features', labelCol='Churn')

In [None]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[assembler, lr])

In [None]:
model = pipeline.fit(train)

Once we have developed the model, we curate the test data and use this 

In [None]:
curated_test = test.na.drop()
predictions = model.transform(curated_test)

Lets see the results!

In [49]:
predictions.toPandas()

Unnamed: 0,Churn,Age,Total_Purchase,Years,Num_sites,features,rawPrediction,probability,prediction
0,0,27.0,8628.80,5.30,7.0,"[27.0, 8628.8, 5.3, 7.0]","[6.675936689418812, -6.675936689418812]","[0.9987406968746404, 0.001259303125359601]",0.0
1,0,28.0,9090.43,5.74,10.0,"[28.0, 9090.43, 5.74, 10.0]","[2.1718950670750665, -2.1718950670750665]","[0.8976971350218025, 0.10230286497819752]",0.0
2,0,29.0,9617.59,5.49,8.0,"[29.0, 9617.59, 5.49, 8.0]","[4.976543878227016, -4.976543878227016]","[0.9931493932780102, 0.006850606721989783]",0.0
3,0,29.0,10203.18,5.82,8.0,"[29.0, 10203.18, 5.82, 8.0]","[4.801549127250805, -4.801549127250805]","[0.9918499609427239, 0.00815003905727607]",0.0
4,0,30.0,13473.35,3.84,10.0,"[30.0, 13473.35, 3.84, 10.0]","[2.633769753642472, -2.633769753642472]","[0.9330035738020525, 0.06699642619794755]",0.0
...,...,...,...,...,...,...,...,...,...
291,1,50.0,14398.89,5.54,12.0,"[50.0, 14398.89, 5.54, 12.0]","[-2.6224366916584323, 2.6224366916584323]","[0.06770831797659854, 0.9322916820234015]",1.0
292,1,51.0,8100.43,4.92,13.0,"[51.0, 8100.43, 4.92, 13.0]","[-3.522746635680445, 3.522746635680445]","[0.02867190349642679, 0.9713280965035732]",1.0
293,1,55.0,5024.52,8.11,9.0,"[55.0, 5024.52, 8.11, 9.0]","[0.4878396300738217, -0.4878396300738217]","[0.619597372506964, 0.380402627493036]",0.0
294,1,56.0,12217.95,5.79,11.0,"[56.0, 12217.95, 5.79, 11.0]","[-1.7215722913002764, 1.7215722913002764]","[0.15166875344376754, 0.8483312465562325]",1.0


Now that we have made the predictions, lets evaluate the accuracy of our prediction model:   

In [53]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='Churn')
acc = evaluator.evaluate(predictions)
acc

0.738833746898263

As we can see, the precition model is not that great, but it works to an extent.As previously discussed, we may be able to improve the model by indexing other parameters.