# Libraries

In [1]:
import pandas as pd

# python library (API) or spark
import pyspark


# Initiate Spark Session

In [2]:
#  or make a connection with a cluster

spark = pyspark.sql.SparkSession.builder.master('local[*]').appName('churn').getOrCreate()


In [3]:
spark

In [4]:
# RDD  

# Resilient - immutable
# Distributed - partitoned
# DataSet - it holds data


In [5]:
sc =  spark.sparkContext
sc

In [6]:
rdd = sc.textFile('agency_churn.csv')
rdd

agency_churn.csv MapPartitionsRDD[1] at textFile at <unknown>:0

In [7]:
type(rdd)

pyspark.rdd.RDD

In [8]:
rdd.take(2)

['Names,Age,Total_Purchase,Account_Manager,Years,Onboard_date,Location,Company,Churn',
 'Cameron Williams,42,11066.8,0,7.22,2013-08-30 7:00:40,"10265 Elizabeth Mission Barkerburgh, AK 89518",Harvey LLC,1']

In [9]:
# Dataframe - same as RDD(properties) but has column structure(schema)

df = spark.read.csv('agency_churn.csv', header = True,inferSchema=True )

In [10]:
df

DataFrame[Names: string, Age: int, Total_Purchase: double, Account_Manager: int, Years: double, Onboard_date: timestamp, Location: string, Company: string, Churn: int]

In [11]:
df.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Onboard_date',
 'Location',
 'Company',
 'Churn']

In [12]:
df.show(3)

+----------------+---+--------------+---------------+-----+-------------------+--------------------+--------------------+-----+
|           Names|Age|Total_Purchase|Account_Manager|Years|       Onboard_date|            Location|             Company|Churn|
+----------------+---+--------------+---------------+-----+-------------------+--------------------+--------------------+-----+
|Cameron Williams| 42|       11066.8|              0| 7.22|2013-08-30 07:00:40|10265 Elizabeth M...|          Harvey LLC|    1|
|   Kevin Mueller| 41|      11916.22|              0|  6.5|2013-08-13 00:38:46|6157 Frank Garden...|          Wilson PLC|    1|
|     Eric Lozano| 38|      12884.75|              0| 6.67|2016-06-29 06:20:07|1331 Keith Court ...|Miller, Johnson a...|    1|
+----------------+---+--------------+---------------+-----+-------------------+--------------------+--------------------+-----+
only showing top 3 rows



In [13]:
df.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [14]:
df.dtypes

[('Names', 'string'),
 ('Age', 'int'),
 ('Total_Purchase', 'double'),
 ('Account_Manager', 'int'),
 ('Years', 'double'),
 ('Onboard_date', 'timestamp'),
 ('Location', 'string'),
 ('Company', 'string'),
 ('Churn', 'int')]

# Find unique values

In [15]:
df.select('Age' , 'Total_Purchase').show(10)

+----+--------------+
| Age|Total_Purchase|
+----+--------------+
|  42|       11066.8|
|  41|      11916.22|
|  38|      12884.75|
|  42|       8010.76|
|  37|       9191.58|
|  48|      10356.02|
|null|      11331.58|
|  32|       9885.12|
|  43|       14062.6|
|  40|       8066.94|
+----+--------------+
only showing top 10 rows



In [16]:
df.select('Age'  ).distinct().show(10)

+---+
|Age|
+---+
| 31|
| 65|
| 53|
| 34|
| 28|
| 26|
| 27|
| 44|
| 22|
| 47|
+---+
only showing top 10 rows



In [17]:
# Number of unique

df.select('Age'  ).distinct().count()


37

# Describe - Sumamry stats

In [18]:
df.describe().show()

+-------+-------------+-----------------+------------------+-------------------+-----------------+--------------------+--------------------+-------------------+
|summary|        Names|              Age|    Total_Purchase|    Account_Manager|            Years|            Location|             Company|              Churn|
+-------+-------------+-----------------+------------------+-------------------+-----------------+--------------------+--------------------+-------------------+
|  count|          900|              898|               899|                898|              900|                 900|                 899|                900|
|   mean|         null|41.81069042316258|10064.888242491663| 0.4799554565701559| 5.27315555555555|                null|                null|0.16666666666666666|
| stddev|         null| 6.13303075073418| 2409.188637458988|0.49987645989755647|1.274449013194616|                null|                null| 0.3728852122772358|
|    min|   Aaron King|           

In [19]:
df.select('Names', 'Age', 'Total_Purchase').describe().show()

+-------+-------------+-----------------+------------------+
|summary|        Names|              Age|    Total_Purchase|
+-------+-------------+-----------------+------------------+
|  count|          900|              898|               899|
|   mean|         null|41.81069042316258|10064.888242491663|
| stddev|         null| 6.13303075073418| 2409.188637458988|
|    min|   Aaron King|               22|             100.0|
|    max|Zachary Walsh|               65|          18026.01|
+-------+-------------+-----------------+------------------+



## Check the shape

In [20]:
# number 0f rows
df.count()



900

In [21]:
# Number of columns
len(df.columns)

9

Subset by row(filtering)

In [22]:
df.filter(df['Age'] > 30).count()

868

In [23]:
df.filter(df['Age'] > 30).show(10)

+----------------+---+--------------+---------------+-----+-------------------+--------------------+--------------------+-----+
|           Names|Age|Total_Purchase|Account_Manager|Years|       Onboard_date|            Location|             Company|Churn|
+----------------+---+--------------+---------------+-----+-------------------+--------------------+--------------------+-----+
|Cameron Williams| 42|       11066.8|              0| 7.22|2013-08-30 07:00:40|10265 Elizabeth M...|          Harvey LLC|    1|
|   Kevin Mueller| 41|      11916.22|              0|  6.5|2013-08-13 00:38:46|6157 Frank Garden...|          Wilson PLC|    1|
|     Eric Lozano| 38|      12884.75|              0| 6.67|2016-06-29 06:20:07|1331 Keith Court ...|Miller, Johnson a...|    1|
|   Phillip White| 42|       8010.76|              0| 6.71|2014-04-22 12:43:12|13120 Daniel Moun...|           Smith Inc|    1|
|  Cynthia Norton| 37|       9191.58|              0| 5.56|2016-01-19 15:31:15|765 Tricia Row Ka...|    

In [24]:
df.filter((df['Age'] > 30 ) & (df['churn'] == 1)).count()  
# and

143

In [25]:
df.filter((df['Age'] > 30 ) | (df['churn'] == 1)).count()  
#or

875

In [26]:
# all eric
df.filter((df['Names'].startswith('Eric')  ) & (df['churn'] == 0)).show()  


+-----------+---+--------------+---------------+-----+-------------------+--------------------+---------------+-----+
|      Names|Age|Total_Purchase|Account_Manager|Years|       Onboard_date|            Location|        Company|Churn|
+-----------+---+--------------+---------------+-----+-------------------+--------------------+---------------+-----+
| Eric Smith| 37|      11416.56|              0| 6.06|2006-04-03 04:29:08|166 Julie Vista A...|  Thomas-Curtis|    0|
|Eric Castro| 38|      10770.82|              0| 4.99|2016-06-19 01:23:45|80338 Scott Trail...|Campbell-Willis|    0|
|Eric Martin| 44|      14155.97|              1| 6.99|2006-05-31 09:24:41|162 Paul Freeway ...|   Warner-Welch|    0|
| Eric Terry| 42|      16371.42|              1| 3.84|2014-12-18 16:51:16|2384 Lucero Traff...|   White-Thomas|    0|
+-----------+---+--------------+---------------+-----+-------------------+--------------------+---------------+-----+



In [27]:
# all eric
#df.filter((df['Names'].contains('Terry')  ).show()  


In [28]:
df.filter((df['Age'].isNull() ) & (df['churn'] == 1)).count()  
# and


2

In [29]:
df.dropna().count()

896

In [30]:
df = df.dropna()  # drop null and assign to the df.. no inpacr attribute

In [31]:
df.count()

896

In [35]:
df.filter(df['Onboard_date'].between('2006-04-03','2006-10-03')).show()

+-------------------+---+--------------+---------------+-----+-------------------+--------------------+--------------------+-----+
|              Names|Age|Total_Purchase|Account_Manager|Years|       Onboard_date|            Location|             Company|Churn|
+-------------------+---+--------------+---------------+-----+-------------------+--------------------+--------------------+-----+
|        Troy Oliver| 40|      10041.13|              0| 7.61|2006-07-30 15:27:23|USCGC Blake FPO A...|           Gates Ltd|    1|
|   Brittany Jackson| 50|       12682.9|              0| 4.58|2006-09-11 07:07:17|082 Roth Island D...|     Mcdonald-Cooper|    1|
|       Jeffrey Hill| 40|        9922.3|              1| 5.07|2006-09-22 20:58:51|97907 Jensen Forg...|Williams, Bautist...|    1|
|    Daniel Bartlett| 45|       6749.49|              0| 5.88|2006-08-19 17:13:19|353 Gregory Spur ...|         Owens Group|    1|
|     Sarah Holloway| 45|       8429.65|              1|  7.1|2006-08-26 16:40:13|0

In [37]:
# filter using sql
df.createOrReplaceTempView('churn_table')

In [39]:
spark.sql("select * from churn_table LIMIT 3").show()

+----------------+---+--------------+---------------+-----+-------------------+--------------------+--------------------+-----+
|           Names|Age|Total_Purchase|Account_Manager|Years|       Onboard_date|            Location|             Company|Churn|
+----------------+---+--------------+---------------+-----+-------------------+--------------------+--------------------+-----+
|Cameron Williams| 42|       11066.8|              0| 7.22|2013-08-30 07:00:40|10265 Elizabeth M...|          Harvey LLC|    1|
|   Kevin Mueller| 41|      11916.22|              0|  6.5|2013-08-13 00:38:46|6157 Frank Garden...|          Wilson PLC|    1|
|     Eric Lozano| 38|      12884.75|              0| 6.67|2016-06-29 06:20:07|1331 Keith Court ...|Miller, Johnson a...|    1|
+----------------+---+--------------+---------------+-----+-------------------+--------------------+--------------------+-----+



# Group by

In [41]:
df.groupBy('Churn').mean().show()

+-----+------------------+-------------------+--------------------+------------------+----------+
|Churn|          avg(Age)|avg(Total_Purchase)|avg(Account_Manager)|        avg(Years)|avg(Churn)|
+-----+------------------+-------------------+--------------------+------------------+----------+
|    1|42.986301369863014| 10202.051575342472|  0.5547945205479452| 5.893698630136986|       1.0|
|    0| 41.58133333333333| 10036.952853333332|  0.4653333333333333|5.1510666666666625|       0.0|
+-----+------------------+-------------------+--------------------+------------------+----------+



In [42]:
df.groupBy('Churn').mean().toPandas()

Unnamed: 0,Churn,avg(Age),avg(Total_Purchase),avg(Account_Manager),avg(Years),avg(Churn)
0,1,42.986301,10202.051575,0.554795,5.893699,1.0
1,0,41.581333,10036.952853,0.465333,5.151067,0.0


# machine Learning
Predict churn

In [44]:
# need to combine into 1 feature
from pyspark.ml.feature import VectorAssembler

In [45]:
df.dtypes

[('Names', 'string'),
 ('Age', 'int'),
 ('Total_Purchase', 'double'),
 ('Account_Manager', 'int'),
 ('Years', 'double'),
 ('Onboard_date', 'timestamp'),
 ('Location', 'string'),
 ('Company', 'string'),
 ('Churn', 'int')]

In [46]:
# churn base on numerical feature as one-hot encoding is bit complicated
input_columns =['Age', 'Total_Purchase' ,'Account_Manager' , 'Years']

In [50]:
assembler = VectorAssembler(inputCols=input_columns, outputCol = 'features' )
assembler

VectorAssembler_9f09b2c3510f

In [53]:
output = assembler.transform(df)
output

DataFrame[Names: string, Age: int, Total_Purchase: double, Account_Manager: int, Years: double, Onboard_date: timestamp, Location: string, Company: string, Churn: int, features: vector]

In [58]:
output.select('features').show(3)

+--------------------+
|            features|
+--------------------+
|[42.0,11066.8,0.0...|
|[41.0,11916.22,0....|
|[38.0,12884.75,0....|
+--------------------+
only showing top 3 rows



In [59]:
final_data = output.select('features' ,'Churn')

In [60]:
final_data.show(3)

+--------------------+-----+
|            features|Churn|
+--------------------+-----+
|[42.0,11066.8,0.0...|    1|
|[41.0,11916.22,0....|    1|
|[38.0,12884.75,0....|    1|
+--------------------+-----+
only showing top 3 rows



In [61]:
# Train/Test Split

train, test = final_data.randomSplit([0.7,0.3] , seed=123)

In [62]:
train.count()

629

In [63]:
test.count()

267

# Modeling - Decision Trees

In [82]:
# import ML algorithms

from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier

In [67]:
tree_model = DecisionTreeClassifier(featuresCol='features',  labelCol= 'Churn')

In [68]:
tree_model_fitted = tree_model.fit(train)

In [69]:
tree_model_fitted

DecisionTreeClassificationModel (uid=DecisionTreeClassifier_aa81569e5c38) of depth 5 with 27 nodes

In [70]:
tree_model_fitted.featureImportances

SparseVector(4, {0: 0.1471, 1: 0.295, 2: 0.0262, 3: 0.5317})

# Evaluation

In [71]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [72]:
# Make prediction
predictions_tree = tree_model_fitted.transform(test)

In [76]:
predictions_tree.show(3)

+--------------------+-----+-------------+--------------------+----------+
|            features|Churn|rawPrediction|         probability|prediction|
+--------------------+-----+-------------+--------------------+----------+
|[25.0,9672.03,0.0...|    0| [134.0,27.0]|[0.83229813664596...|       0.0|
|[26.0,8787.39,1.0...|    1| [134.0,27.0]|[0.83229813664596...|       0.0|
|[26.0,8939.61,0.0...|    0|    [3.0,0.0]|           [1.0,0.0]|       0.0|
+--------------------+-----+-------------+--------------------+----------+
only showing top 3 rows



In [77]:
# 
churn_eval = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='Churn', metricName='accuracy')

In [80]:
churn_eval.evaluate(predictions_tree)

0.8164794007490637

In [81]:
churn_eval = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='Churn', metricName='accuracy')
churn_eval.evaluate(predictions_tree)

0.8164794007490637

In [87]:
rf_model = RandomForestClassifier(featuresCol='features',  labelCol= 'Churn')

rf_model_fitted = rf_model.fit(train)




In [88]:

rf_model_fitted.featureImportances

SparseVector(4, {0: 0.2655, 1: 0.2604, 2: 0.0365, 3: 0.4376})