# Logistic Regression Consulting Project Using a Pipeline

## Binary Customer Churn

A marketing agency has many customers that use their service to produce ads for the client/customer websites. They've noticed that they have quite a bit of churn in clients. They basically randomly assign account managers right now, but want you to create a machine learning model that will help predict which customers will churn (stop buying their service) so that they can correctly assign the customers most at risk to churn an account manager. Luckily they have some historical data, can you help them out? Create a classification algorithm that will help classify whether or not a customer churned. Then the company can test this against incoming data for future customers to predict which customers will churn and assign them an account manager.

The data is saved as customer_churn.csv. Here are the fields and their definitions:

    Name : Name of the latest contact at Company
    Age: Customer Age
    Total_Purchase: Total Ads Purchased
    Account_Manager: Binary 0=No manager, 1= Account manager assigned
    Years: Totaly Years as a customer
    Num_sites: Number of websites that use the service.
    Onboard_date: Date that the name of the latest contact was onboarded
    Location: Client HQ Address
    Company: Name of Client Company
    
Once you've created the model and evaluated it, test out the model on some new data (you can think of this almost like a hold-out set) that your client has provided, saved under new_customers.csv. The client wants to know which customers are most likely to churn given this data (they don't have the label yet).

### Imports & Build Season

In [0]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml import Pipeline

In [0]:
spark = SparkSession.builder.appName('logreg5').getOrCreate()

### Analysing database

In [0]:
churn_df = spark.read.csv('/FileStore/tables/customer_churn-2.csv', inferSchema=True, header=True)

In [0]:
churn_df.show(5)

+----------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+
|           Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|       Onboard_date|            Location|             Company|Churn|
+----------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+
|Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|2013-08-30 07:00:40|10265 Elizabeth M...|          Harvey LLC|    1|
|   Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|2013-08-13 00:38:46|6157 Frank Garden...|          Wilson PLC|    1|
|     Eric Lozano|38.0|      12884.75|              0| 6.67|     12.0|2016-06-29 06:20:07|1331 Keith Court ...|Miller, Johnson a...|    1|
|   Phillip White|42.0|       8010.76|              0| 6.71|     10.0|2014-04-22 12:43:12|13120 Daniel Moun...|           Smith Inc|    1|
|  Cynthia Norton|37.0|    

In [0]:
churn_df.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



I'm going to consider or discard the different categories for the analysis:

 - Name : Is unique to the client so it will be out of the analysis. Besides, it's difficult to think that the name will have any influence in the clients decission to churn.
 - Age: Definitely could influence the clients decission to churn.
 - Total_Purchase: Also a relevant factor.
 - Account_Manager: The managers are assigned randomly, but the fact of having one may influence the clients' decission making, so I will include it.
 - Years: Clearly a factor of permanence or churning, it will be considered.
 - Num_sites: Again, it seems a relevant factor for the client.
 - Onboard_date: At first sight, it seems that the onboard date does not provide more info than 'Years', but I'm going to use the month when the client started the suscription to consider effects of seasonality. To do that, I need to extract the month from the date as a numerical value.
 - Location: The location of the client may be relevant since a bad reputation in a particular geographical region or regional competition could influence the clients' decission.
 - Company: Almost as unique as client name, so of no value for the analyisis

## Feature engineering

#### Extracting state
I'm going to build a function that extracts the state out of the company's address. For that, I'm going to use regular expressions to extract the state code.

In [0]:
import re
from pyspark.sql.functions import regexp_extract

In [0]:
def add_state_column(df):
    return df.withColumn('State', regexp_extract('Location', '\s[A-Z][A-Z]\s', 0))

Dataframe containing a state code column

In [0]:
churn_state_df = add_state_column(churn_df)

In [0]:
churn_state_df.show(5)

+----------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+-----+
|           Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|       Onboard_date|            Location|             Company|Churn|State|
+----------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+-----+
|Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|2013-08-30 07:00:40|10265 Elizabeth M...|          Harvey LLC|    1|  AK |
|   Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|2013-08-13 00:38:46|6157 Frank Garden...|          Wilson PLC|    1|  RI |
|     Eric Lozano|38.0|      12884.75|              0| 6.67|     12.0|2016-06-29 06:20:07|1331 Keith Court ...|Miller, Johnson a...|    1|  DE |
|   Phillip White|42.0|       8010.76|              0| 6.71|     10.0|2014-04-22 12:43:12|13120 Daniel Moun...|           Smith In

In [0]:
print('Number of unique states: ', churn_clean_df.select('State').distinct().count())

Number of unique states:  62


#### Extract the onboard month

In [0]:
from pyspark.sql.functions import month

Dataframe containing a column with the month

In [0]:
churn_month_df = churn_state_df.withColumn('Onboard_month', month(churn_state_df['Onboard_date']))

In [0]:
churn_month_df.show(5)

+----------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+-----+-------------+
|           Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|       Onboard_date|            Location|             Company|Churn|State|Onboard_month|
+----------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+-----+-------------+
|Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|2013-08-30 07:00:40|10265 Elizabeth M...|          Harvey LLC|    1|  AK |            8|
|   Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|2013-08-13 00:38:46|6157 Frank Garden...|          Wilson PLC|    1|  RI |            8|
|     Eric Lozano|38.0|      12884.75|              0| 6.67|     12.0|2016-06-29 06:20:07|1331 Keith Court ...|Miller, Johnson a...|    1|  DE |            6|
|   Phillip White|42.0|       8010.76|        

#### Analysing company name
The number of companies is similar to the number of entries, so they are not useful for the analysis because they are almost every one unique.

In [0]:
print('Number of unique companies: ', churn_month_df.select('Company').distinct().count())

Number of unique companies:  873


#### Select columns
As said, I will remove customer and company names because they are unique or close to unique.

In [0]:
churn_month_df.columns

Out[17]: ['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company',
 'Churn',
 'State',
 'Onboard_month']

In [0]:
churn_clean_df = churn_month_df.select(['Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Churn',
 'State',
 'Onboard_month'])

In [0]:
churn_clean_df.show(3)

+----+--------------+---------------+-----+---------+-----+-----+-------------+
| Age|Total_Purchase|Account_Manager|Years|Num_Sites|Churn|State|Onboard_month|
+----+--------------+---------------+-----+---------+-----+-----+-------------+
|42.0|       11066.8|              0| 7.22|      8.0|    1|  AK |            8|
|41.0|      11916.22|              0|  6.5|     11.0|    1|  RI |            8|
|38.0|      12884.75|              0| 6.67|     12.0|    1|  DE |            6|
+----+--------------+---------------+-----+---------+-----+-----+-------------+
only showing top 3 rows



## Indexing, One hot encoding, Assembling and Splitting

In this section I'm going to do the indexing (change categorical values to numerical), one-hot encoding (turn numerical values into vectors), the assembling (make a unique vector of al categories) and split the entries in training and test sets to analyse the model's performance. All this transformations will be stored to be executed in a Pipeline in the next section.

In [0]:
from pyspark.ml.feature import (StringIndexer, OneHotEncoder, VectorAssembler, VectorIndexer)
from pyspark.ml import Pipeline

In [0]:
state_indexer = StringIndexer(inputCol='State', outputCol='State_index')
state_trained_indexer = state_indexer.fit(churn_clean_df)
state_encoder = OneHotEncoder(inputCol='State_index', outputCol='State_vector')
assembler = VectorAssembler(inputCols=['Age',
 'Total_Purchase',
 'Years',
 'Account_Manager',
 'Num_Sites',
 'Onboard_month',
 'State_vector'], outputCol='features')

In [0]:
train_data, test_data = churn_clean_df.randomSplit([0.7,0.3])

## Building a Logistic regression model

In [0]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [0]:
log_reg = LogisticRegression(featuresCol='features',labelCol='Churn')

In [0]:
pipeline = Pipeline(stages=[state_indexer, state_encoder, assembler, log_reg])

In [0]:
fit_model = pipeline.fit(train_data)
results = fit_model.transform(test_data)

In [0]:
my_eval = BinaryClassificationEvaluator(labelCol='Churn')
AUC = my_eval.evaluate(results)
print('Area under the curve: ', AUC)

Area under the curve:  0.7178883328427047


In [0]:
multi_eval = MulticlassClassificationEvaluator(labelCol='Churn', metricName='accuracy')

In [0]:
acc = multi_eval.evaluate(results)
print('Model accuracy: ', acc)

Model accuracy:  0.8821428571428571


The model performs reasonably well for a dataset this short. The AUC is good and the accuracy is quite high.

## Testing against new data
In this section I'm going to create a prediction with new unlabeled data.

#### Loading and visualizing new data

In [0]:
new_df = spark.read.csv('/FileStore/tables/new_customers-2.csv', header=True, inferSchema=True)

In [0]:
new_df.show()

+--------------+----+--------------+---------------+-----+---------+-------------------+--------------------+----------------+
|         Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|       Onboard_date|            Location|         Company|
+--------------+----+--------------+---------------+-----+---------+-------------------+--------------------+----------------+
| Andrew Mccall|37.0|       9935.53|              1| 7.71|      8.0|2011-08-29 18:37:54|38612 Johnny Stra...|        King Ltd|
|Michele Wright|23.0|       7526.94|              1| 9.28|     15.0|2013-07-22 18:19:54|21083 Nicole Junc...|   Cannon-Benson|
|  Jeremy Chang|65.0|         100.0|              1|  1.0|     15.0|2006-12-11 07:48:13|085 Austin Views ...|Barron-Robertson|
|Megan Ferguson|32.0|        6487.5|              0|  9.4|     14.0|2016-10-28 05:32:13|922 Wright Branch...|   Sexton-Golden|
|  Taylor Young|32.0|      13147.71|              1| 10.0|      8.0|2012-03-20 00:36:46|Unit 0789 Box 073...|  

I do the same data engineering than for the model-building data

In [0]:
new_clean_df = add_state_column(new_df).withColumn('Onboard_month', month(new_df['Onboard_date']))

I create a new model using all the data available, not only the train data

In [0]:
fit_model_all = pipeline.fit(churn_clean_df)

Finally, I create a dataframe containing the predictions and select the relevant columns to display.

In [0]:
new_results = fit_model_all.transform(new_clean_df)

In [0]:
new_results.select('Names','Company','prediction').show()

+--------------+----------------+----------+
|         Names|         Company|prediction|
+--------------+----------------+----------+
| Andrew Mccall|        King Ltd|       0.0|
|Michele Wright|   Cannon-Benson|       1.0|
|  Jeremy Chang|Barron-Robertson|       1.0|
|Megan Ferguson|   Sexton-Golden|       1.0|
|  Taylor Young|        Wood LLC|       0.0|
| Jessica Drake|   Parks-Robbins|       0.0|
+--------------+----------------+----------+

