### Initiate a Spark Session & Import Data

In [1]:
import findspark
findspark.init('/home/ubuntu/spark-2.4.5-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('lr_example').getOrCreate()

In [2]:
df = spark.read.csv('customer_churn.csv', inferSchema=True, header=True)

In [3]:
df.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [4]:
df.show()

+-------------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+
|              Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|       Onboard_date|            Location|             Company|Churn|
+-------------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+
|   Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|2013-08-30 07:00:40|10265 Elizabeth M...|          Harvey LLC|    1|
|      Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|2013-08-13 00:38:46|6157 Frank Garden...|          Wilson PLC|    1|
|        Eric Lozano|38.0|      12884.75|              0| 6.67|     12.0|2016-06-29 06:20:07|1331 Keith Court ...|Miller, Johnson a...|    1|
|      Phillip White|42.0|       8010.76|              0| 6.71|     10.0|2014-04-22 12:43:12|13120 Daniel Moun...|           Smith Inc|    1|
|     

In [5]:
df.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company',
 'Churn']

In [6]:
df.describe('Age', 'Total_Purchase', 'Years', 'Num_Sites').show()

+-------+-----------------+-----------------+-----------------+------------------+
|summary|              Age|   Total_Purchase|            Years|         Num_Sites|
+-------+-----------------+-----------------+-----------------+------------------+
|  count|              900|              900|              900|               900|
|   mean|41.81666666666667|10062.82403333334| 5.27315555555555| 8.587777777777777|
| stddev|6.127560416916251|2408.644531858096|1.274449013194616|1.7648355920350969|
|    min|             22.0|            100.0|              1.0|               3.0|
|    max|             65.0|         18026.01|             9.15|              14.0|
+-------+-----------------+-----------------+-----------------+------------------+



### Feature Engineering

In [7]:
from pyspark.sql.functions import month, year

In [8]:
df = df.withColumn(colName='Onboard_year', col=year('Onboard_date'))

In [9]:
df.head(2)

[Row(Names='Cameron Williams', Age=42.0, Total_Purchase=11066.8, Account_Manager=0, Years=7.22, Num_Sites=8.0, Onboard_date=datetime.datetime(2013, 8, 30, 7, 0, 40), Location='10265 Elizabeth Mission Barkerburgh, AK 89518', Company='Harvey LLC', Churn=1, Onboard_year=2013),
 Row(Names='Kevin Mueller', Age=41.0, Total_Purchase=11916.22, Account_Manager=0, Years=6.5, Num_Sites=11.0, Onboard_date=datetime.datetime(2013, 8, 13, 0, 38, 46), Location='6157 Frank Gardens Suite 019 Carloshaven, RI 17756', Company='Wilson PLC', Churn=1, Onboard_year=2013)]

In [10]:
df.select('Onboard_year').distinct().count()

11

In [11]:
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler

In [12]:
stage1 = StringIndexer(inputCol='Onboard_year', outputCol='onboard_index')

In [13]:
stage2 = OneHotEncoderEstimator(inputCols=[stage1.getOutputCol()], outputCols=['onboard_encode'])

In [14]:
stage3 = VectorAssembler(inputCols=['Age', 'Total_Purchase', 'Account_Manager', 'Years', 'Num_Sites','onboard_encode'], 
                         outputCol='features')

In [15]:
### Building ML Pipeline

In [16]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

In [17]:
regression = LogisticRegression(featuresCol='features', labelCol='Churn')

In [18]:
pipeline = Pipeline(stages=[stage1, stage2, stage3, regression])

In [19]:
test_data, train_data = df.randomSplit([0.7, 0.3])

In [20]:
model = pipeline.fit(train_data)

In [21]:
result = model.transform(test_data)

In [22]:
result.select('prediction', 'Churn').show()

+----------+-----+
|prediction|Churn|
+----------+-----+
|       0.0|    0|
|       0.0|    0|
|       0.0|    0|
|       1.0|    1|
|       0.0|    0|
|       0.0|    0|
|       0.0|    1|
|       1.0|    1|
|       0.0|    0|
|       0.0|    0|
|       0.0|    0|
|       0.0|    1|
|       1.0|    0|
|       0.0|    0|
|       0.0|    0|
|       0.0|    0|
|       0.0|    0|
|       0.0|    0|
|       0.0|    0|
|       0.0|    0|
+----------+-----+
only showing top 20 rows



In [23]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [24]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='Churn')

In [25]:
AUC = evaluator.evaluate(result)

In [26]:
AUC

0.7799909204403586