#### Consultancy Project
A marketing agency has many customers that use their service to produce ads for the client/customer websites. They've noticed that they have quite a bit of churn in clients. They basically randomly assign account managers right now, but want you to create a machine learning model that will help predict which customers will churn (stop buying their service) so that they can correctly assign the customers most at risk to churn an account manager. Luckily they have some historical data, can you help them out? Create a classification algorithm that will help classify whether or not a customer churned. Then the company can test this against incoming data for future customers to predict which customers will churn and assign them an account manager.

The data is saved as customer_churn.csv. Here are the fields and their definitions:

Name : Name of the latest contact at Company
Age: Customer Age
Total_Purchase: Total Ads Purchased
Account_Manager: Binary 0=No manager, 1= Account manager assigned
Years: Totaly Years as a customer
Num_sites: Number of websites that use the service.
Onboard_date: Date that the name of the latest contact was onboarded
Location: Client HQ Address
Company: Name of Client Company
Once you've created the model and evaluated it, test out the model on some new data (you can think of this almost like a hold-out set) that your client has provided, saved under new_customers.csv. The client wants to know which customers are most likely to churn given this data (they don't have the label yet).

In [1]:
import pandas as pd

In [2]:
#import the necessary libraries
from pyspark.sql import SparkSession

In [3]:
#import the isnan function from the column package
from pyspark.sql.functions import (count, when, isnull, month, year)

In [4]:
#starting the spark session
spark = SparkSession.builder.appName('customerchurn').getOrCreate()

In [5]:
#load the dataset
data = spark.read.csv('12.customer_churn.csv', inferSchema=True, header=True)

#### Exploratory analysis

In [6]:
#analyzing each column
data.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [7]:
#print a couple of rows for further analysis
for row in data.head(5):
    print(row,'\n')

Row(Names='Cameron Williams', Age=42.0, Total_Purchase=11066.8, Account_Manager=0, Years=7.22, Num_Sites=8.0, Onboard_date=datetime.datetime(2013, 8, 30, 7, 0, 40), Location='10265 Elizabeth Mission Barkerburgh, AK 89518', Company='Harvey LLC', Churn=1) 

Row(Names='Kevin Mueller', Age=41.0, Total_Purchase=11916.22, Account_Manager=0, Years=6.5, Num_Sites=11.0, Onboard_date=datetime.datetime(2013, 8, 13, 0, 38, 46), Location='6157 Frank Gardens Suite 019 Carloshaven, RI 17756', Company='Wilson PLC', Churn=1) 

Row(Names='Eric Lozano', Age=38.0, Total_Purchase=12884.75, Account_Manager=0, Years=6.67, Num_Sites=12.0, Onboard_date=datetime.datetime(2016, 6, 29, 6, 20, 7), Location='1331 Keith Court Alyssahaven, DE 90114', Company='Miller, Johnson and Wallace', Churn=1) 

Row(Names='Phillip White', Age=42.0, Total_Purchase=8010.76, Account_Manager=0, Years=6.71, Num_Sites=10.0, Onboard_date=datetime.datetime(2014, 4, 22, 12, 43, 12), Location='13120 Daniel Mount Angelabury, WY 30645-4695',

In [8]:
def null_counter(dataframe): 
    '''Accepts a spark dataframe, and returns the number of null values in each column:
    ex:
    input:  null_counter(dataframe_with_missingvalues)
     output: 
            The number of missing values in  Id  column :  0
            The number of missing values in  Name  column :  2 
    '''
    for column in dataframe.columns:
        print('The number of missing values in ',column, ' column : ',
              dataframe.select(count(when(isnull(column),column)).alias(column)).collect()[0][0]
              )

In [9]:
null_counter(data)

The number of missing values in  Names  column :  0
The number of missing values in  Age  column :  0
The number of missing values in  Total_Purchase  column :  0
The number of missing values in  Account_Manager  column :  0
The number of missing values in  Years  column :  0
The number of missing values in  Num_Sites  column :  0
The number of missing values in  Onboard_date  column :  0
The number of missing values in  Location  column :  0
The number of missing values in  Company  column :  0
The number of missing values in  Churn  column :  0


#### Data seems complete
#### There are no missing values in the data.
#### Now let's check the categorical values

In [10]:
data.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



##### Names column will not affect the analysis
##### Company Location and Company address reveal the same data. We will drop the address
##### Onboard_date needs to be modifed only to year and month
##### Company needs to be one-hot-encoded

In [11]:
#create a list of all the stages of workflow for the pipeline
stages = []

###  Convert the onboard date to month duration

In [12]:
#import the date time module
import datetime

In [13]:
#converting the current date (3/15/2020) to months
currentmonth = datetime.datetime.now().year*12 + datetime.datetime.now().month

In [14]:
#import the lit function from pyspark to be able to add a constant as a column
from pyspark.sql.functions import lit

#adding the current month to the the data frame
data = data.withColumn('Current_Month', lit(currentmonth))

In [15]:
#the duration which the latest contact stayed in contact could be important. So:

#calculating the date of the onboard_date in terms of month
data = data.withColumn('Onboard_Month', ( year('Onboard_date')*12  +  month('Onboard_date') ).alias('Onboard_month'))

#finding the difference between the current month and the onboard_date
data = data.withColumn('Onboard_Date_Difference', data['Current_Month'] - data['Onboard_Month'])

In [16]:
data.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company',
 'Churn',
 'Current_Month',
 'Onboard_Month',
 'Onboard_Date_Difference']

In [17]:
#finalize the dataset
data_in_progress = data.select(
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Company',
 'Onboard_Date_Difference', 
 'Churn')

### Handle the categorical variables

In [18]:
#importing the indexer and the onehotencoder
from pyspark.ml.feature import StringIndexer, OneHotEncoder

In [19]:
#first create the company indexer
company_indexer = StringIndexer(inputCol='Company', outputCol='Company_indexed')
stages += [company_indexer] #adding to the stages for the pipeline

#secondly, create a company encoder
company_encoder = OneHotEncoder(inputCol='Company_indexed', outputCol='Company_encoded')
stages += [company_encoder]  #adding to the stages for the pipeline

### Build the assembler

In [20]:
#importing the VectorAssembler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors

In [21]:
#building the assembler
assembler = VectorAssembler(inputCols=['Age','Total_Purchase',
                                        'Account_Manager','Years',
                                        'Num_Sites','Company_encoded',
                                        'Onboard_Date_Difference'],
                            outputCol='features')
stages += [assembler]

### Building the workflow pipeline

In [22]:
#import the pipeline
from pyspark.ml import Pipeline

In [23]:
#create the pipeline
pipeline = Pipeline(stages = stages)

In [24]:
final_data = pipeline.fit(data_in_progress).transform(data_in_progress)

In [25]:
final_data.printSchema()

root
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Company: string (nullable = true)
 |-- Onboard_Date_Difference: integer (nullable = true)
 |-- Churn: integer (nullable = true)
 |-- Company_indexed: double (nullable = false)
 |-- Company_encoded: vector (nullable = true)
 |-- features: vector (nullable = true)



### Create the train and test datasets

In [26]:
#splitting the data
train_data, test_data = final_data.randomSplit([0.7, 0.3])

### Build the regression model

In [27]:
#importing the logistic regression
from pyspark.ml.classification import LogisticRegression

In [28]:
#start the log_regression object
log_reg = LogisticRegression(labelCol='Churn')

### Build the model

In [29]:
#train the model
log_reg_model = log_reg.fit(final_data)

### Evaluate the model

In [30]:
#introduce the test data to the model
results = log_reg_model.transform(test_data)

In [31]:
#checking the output
results.columns

['Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Company',
 'Onboard_Date_Difference',
 'Churn',
 'Company_indexed',
 'Company_encoded',
 'features',
 'rawPrediction',
 'probability',
 'prediction']

In [32]:
#review the results dataframe using pandas pandas for better formatting
pd.DataFrame(data=results.take(3),columns=results.columns).T

Unnamed: 0,0,1,2
Age,29,29,30
Total_Purchase,9617.59,13255,8677.28
Account_Manager,0,1,1
Years,5.49,4.89,7.31
Num_Sites,8,8,7
Company,Mendoza Inc,"Gonzalez, Abbott and Lane","Wiley, Macdonald and Lewis"
Onboard_Date_Difference,85,49,161
Churn,0,0,0
Company_indexed,478,196,400
Company_encoded,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."


### Use the evaluation metrics for further analysis

In [33]:
#importing the evaluation metrics
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

In [34]:
#build an evaluator
evaluate = BinaryClassificationEvaluator(labelCol='Churn')

In [35]:
#calculate the AUC
AUC = evaluate.evaluate(results)

In [36]:
AUC

1.0

### Our model classified all data correctly

### Now, we will do further analysis before deploying our model