### Telecom customer churn project
predictive analytics use churn prediction models that predict customer churn by assessing their propensity of risk to      churn. Since these models generate a small prioritized list of potential defectors, they are effective at focusing         customer retention marketing programs on the subset of the customer base who are most vulnerable to churn.

In [32]:
from pyspark.sql import SparkSession

In [33]:
spark = SparkSession.builder.appName('telecom_churn').getOrCreate()

In [34]:
df = spark.read.csv('../datasets/telecom_data.csv',inferSchema=True,header=True)

In [35]:
df.printSchema()

root
 |-- state: string (nullable = true)
 |-- account length: integer (nullable = true)
 |-- area code: integer (nullable = true)
 |-- phone number: string (nullable = true)
 |-- international plan: string (nullable = true)
 |-- voice mail plan: string (nullable = true)
 |-- number vmail messages: integer (nullable = true)
 |-- total day minutes: double (nullable = true)
 |-- total day calls: integer (nullable = true)
 |-- total day charge: double (nullable = true)
 |-- total eve minutes: double (nullable = true)
 |-- total eve calls: integer (nullable = true)
 |-- total eve charge: double (nullable = true)
 |-- total night minutes: double (nullable = true)
 |-- total night calls: integer (nullable = true)
 |-- total night charge: double (nullable = true)
 |-- total intl minutes: double (nullable = true)
 |-- total intl calls: integer (nullable = true)
 |-- total intl charge: double (nullable = true)
 |-- customer service calls: integer (nullable = true)
 |-- churn: boolean (nullable 

In [36]:
df.select('state','international plan','churn').show()

+-----+------------------+-----+
|state|international plan|churn|
+-----+------------------+-----+
|   KS|                no|false|
|   OH|                no|false|
|   NJ|                no|false|
|   OH|               yes|false|
|   OK|               yes|false|
|   AL|               yes|false|
|   MA|                no|false|
|   MO|               yes|false|
|   LA|                no|false|
|   WV|               yes|false|
|   IN|                no| true|
|   RI|                no|false|
|   IA|                no|false|
|   MT|                no|false|
|   IA|                no|false|
|   NY|                no| true|
|   ID|                no|false|
|   VT|                no|false|
|   VA|                no|false|
|   TX|                no|false|
+-----+------------------+-----+
only showing top 20 rows



In [37]:
df.groupBy('churn').count().show()

+-----+-----+
|churn|count|
+-----+-----+
| true|  483|
|false| 2850|
+-----+-----+



## Data Preprocessing

#### Checking for null data points

In [38]:
# This function use to print feature with null values and null count 
def null_value_count(df):
  null_columns_counts = []
  numRows = df.count()
  for k in df.columns:
    nullRows = df.where(df[k].isNull()).count()
    if(nullRows > 0):
      temp = k,nullRows
      null_columns_counts.append(temp)
  return(null_columns_counts)

In [39]:
null_values = null_value_count(df)
print("There are %s null data points in dataset"%len(null_values))

There are 0 null data points in dataset


*Next: Converting the Boolean 'churn' column to String, so that we can convert String to Numerical column using String indexer*

In [40]:
from pyspark.sql.types import StringType

df = df.withColumn("churn", df["churn"].cast(StringType()))
df.printSchema()

root
 |-- state: string (nullable = true)
 |-- account length: integer (nullable = true)
 |-- area code: integer (nullable = true)
 |-- phone number: string (nullable = true)
 |-- international plan: string (nullable = true)
 |-- voice mail plan: string (nullable = true)
 |-- number vmail messages: integer (nullable = true)
 |-- total day minutes: double (nullable = true)
 |-- total day calls: integer (nullable = true)
 |-- total day charge: double (nullable = true)
 |-- total eve minutes: double (nullable = true)
 |-- total eve calls: integer (nullable = true)
 |-- total eve charge: double (nullable = true)
 |-- total night minutes: double (nullable = true)
 |-- total night calls: integer (nullable = true)
 |-- total night charge: double (nullable = true)
 |-- total intl minutes: double (nullable = true)
 |-- total intl calls: integer (nullable = true)
 |-- total intl charge: double (nullable = true)
 |-- customer service calls: integer (nullable = true)
 |-- churn: string (nullable =

*Now churn is a String column*

### Feature Engineering:
    converting categorical columns to numerical columns

In [41]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.pipeline import Pipeline

indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(df) for column in ["state","voice mail plan","churn","international plan"]]
pipeline = Pipeline(stages=indexers)
df = pipeline.fit(df).transform(df)

In [42]:
# Let's see our new columns
df.select('state_index','voice mail plan_index','churn_index','international plan_index').show()

+-----------+---------------------+-----------+------------------------+
|state_index|voice mail plan_index|churn_index|international plan_index|
+-----------+---------------------+-----------+------------------------+
|       17.0|                  1.0|        0.0|                     0.0|
|        5.0|                  1.0|        0.0|                     0.0|
|       18.0|                  0.0|        0.0|                     0.0|
|        5.0|                  0.0|        0.0|                     1.0|
|       34.0|                  0.0|        0.0|                     1.0|
|        3.0|                  0.0|        0.0|                     1.0|
|       24.0|                  1.0|        0.0|                     0.0|
|       28.0|                  0.0|        0.0|                     1.0|
|       47.0|                  0.0|        0.0|                     0.0|
|        0.0|                  1.0|        0.0|                     1.0|
|       15.0|                  0.0|        1.0|    

### Feature Selection and Formating Feature:
    Formating feature to provide it to spark's MLlib library

In [43]:
from pyspark.ml.feature import VectorAssembler

In [44]:
assembler = VectorAssembler(inputCols=[
 'account length',
 'number vmail messages',
 'total day minutes',
 'total day calls',
 'total day charge',
 'total eve minutes',
 'total eve calls',
 'total eve charge',
 'total night minutes',
 'total night calls',
 'total night charge',
 'total intl minutes',
 'total intl calls',
 'total intl charge',
 'customer service calls',
 'state_index',
 'voice mail plan_index',
 'international plan_index'],outputCol='features')

In [45]:
df = assembler.transform(df)
df.printSchema()

root
 |-- state: string (nullable = true)
 |-- account length: integer (nullable = true)
 |-- area code: integer (nullable = true)
 |-- phone number: string (nullable = true)
 |-- international plan: string (nullable = true)
 |-- voice mail plan: string (nullable = true)
 |-- number vmail messages: integer (nullable = true)
 |-- total day minutes: double (nullable = true)
 |-- total day calls: integer (nullable = true)
 |-- total day charge: double (nullable = true)
 |-- total eve minutes: double (nullable = true)
 |-- total eve calls: integer (nullable = true)
 |-- total eve charge: double (nullable = true)
 |-- total night minutes: double (nullable = true)
 |-- total night calls: integer (nullable = true)
 |-- total night charge: double (nullable = true)
 |-- total intl minutes: double (nullable = true)
 |-- total intl calls: integer (nullable = true)
 |-- total intl charge: double (nullable = true)
 |-- customer service calls: integer (nullable = true)
 |-- churn: string (nullable =

*Now we have a new "Features" column with all features assembled as a vector*

**Selecting our label and features column**

In [46]:
final_data = df.select('churn_index','features')

In [47]:
final_data.show()

+-----------+--------------------+
|churn_index|            features|
+-----------+--------------------+
|        0.0|[128.0,25.0,265.1...|
|        0.0|[107.0,26.0,161.6...|
|        0.0|[137.0,0.0,243.4,...|
|        0.0|[84.0,0.0,299.4,7...|
|        0.0|[75.0,0.0,166.7,1...|
|        0.0|[118.0,0.0,223.4,...|
|        0.0|[121.0,24.0,218.2...|
|        0.0|[147.0,0.0,157.0,...|
|        0.0|[117.0,0.0,184.5,...|
|        0.0|[141.0,37.0,258.6...|
|        1.0|[65.0,0.0,129.1,1...|
|        0.0|[74.0,0.0,187.7,1...|
|        0.0|[168.0,0.0,128.8,...|
|        0.0|[95.0,0.0,156.6,8...|
|        0.0|[62.0,0.0,120.7,7...|
|        1.0|[161.0,0.0,332.9,...|
|        0.0|[85.0,27.0,196.4,...|
|        0.0|[93.0,0.0,190.7,1...|
|        0.0|[76.0,33.0,189.7,...|
|        0.0|[73.0,0.0,224.4,9...|
+-----------+--------------------+
only showing top 20 rows



### Normalize the data:
    to feed it to ML algorithm

In [48]:
from pyspark.ml.feature import StandardScaler

In [49]:
scaler = StandardScaler(inputCol='features',outputCol='scaled_features')

In [50]:
final_data = scaler.fit(final_data).transform(final_data)

In [51]:
final_data.show()

+-----------+--------------------+--------------------+
|churn_index|            features|     scaled_features|
+-----------+--------------------+--------------------+
|        0.0|[128.0,25.0,265.1...|[3.21429510105554...|
|        0.0|[107.0,26.0,161.6...|[2.68694981103861...|
|        0.0|[137.0,0.0,243.4,...|[3.44030022534851...|
|        0.0|[84.0,0.0,299.4,7...|[2.10938116006770...|
|        0.0|[75.0,0.0,166.7,1...|[1.88337603577473...|
|        0.0|[118.0,0.0,223.4,...|[2.96317829628557...|
|        0.0|[121.0,24.0,218.2...|[3.03851333771656...|
|        0.0|[147.0,0.0,157.0,...|[3.69141703011847...|
|        0.0|[117.0,0.0,184.5,...|[2.93806661580858...|
|        0.0|[141.0,37.0,258.6...|[3.54074694725649...|
|        1.0|[65.0,0.0,129.1,1...|[1.63225923100476...|
|        0.0|[74.0,0.0,187.7,1...|[1.85826435529773...|
|        0.0|[168.0,0.0,128.8,...|[4.21876232013540...|
|        0.0|[95.0,0.0,156.6,8...|[2.38560964531466...|
|        0.0|[62.0,0.0,120.7,7...|[1.55692418957

In [52]:
final_data = final_data.select('churn_index','scaled_features')
final_data.printSchema()

root
 |-- churn_index: double (nullable = false)
 |-- scaled_features: vector (nullable = true)



### Train Test Split of dataset:
    To Train and Test our ML Model

In [53]:
# Splitting to 70 % and 30 % as train and test data respectively

train_data, test_data = final_data.randomSplit([0.7,0.3])

In [54]:
test_data.describe().show()

+-------+-------------------+
|summary|        churn_index|
+-------+-------------------+
|  count|                993|
|   mean|0.14400805639476336|
| stddev|0.35127482109457975|
|    min|                0.0|
|    max|                1.0|
+-------+-------------------+



### Model building:
    Trying two algorithms Gradient Boosting Tree and Random Forest Classifier

In [55]:
from pyspark.ml.classification import GBTClassifier,RandomForestClassifier

In [56]:
gbc = GBTClassifier(featuresCol='scaled_features',labelCol='churn_index',seed=101) 
rfc = RandomForestClassifier(featuresCol='scaled_features',labelCol='churn_index',numTrees=50,seed=101)

In [57]:
model_gbc = gbc.fit(train_data)
model_rfc = rfc.fit(train_data)

### Making predictions using testing data

In [58]:
result_gbc = model_gbc.transform(test_data)
result_rfc = model_rfc.transform(test_data)

In [59]:
result_rfc.show()

+-----------+--------------------+--------------------+--------------------+----------+
|churn_index|     scaled_features|       rawPrediction|         probability|prediction|
+-----------+--------------------+--------------------+--------------------+----------+
|        0.0|[0.02511168047699...|[47.2313086366029...|[0.94462617273205...|       0.0|
|        0.0|[0.02511168047699...|[46.198533156258,...|[0.92397066312516...|       0.0|
|        0.0|[0.07533504143098...|[46.5567606335217...|[0.93113521267043...|       0.0|
|        0.0|[0.07533504143098...|[47.3363718376515...|[0.94672743675303...|       0.0|
|        0.0|[0.10044672190798...|[41.6463342815863...|[0.83292668563172...|       0.0|
|        0.0|[0.15067008286197...|[46.6382207657665...|[0.93276441531533...|       0.0|
|        0.0|[0.15067008286197...|[46.6560355550944...|[0.93312071110188...|       0.0|
|        0.0|[0.17578176333897...|[45.6082638566766...|[0.91216527713353...|       0.0|
|        0.0|[0.17578176333897..

## Evaluation of Model

In [60]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [61]:
acc_eval = MulticlassClassificationEvaluator(labelCol='churn_index',metricName='accuracy')

In [62]:
acc_gbc = acc_eval.evaluate(result_gbc)
acc_rfc = acc_eval.evaluate(result_rfc)

print("Accuracy of Gradient Boosting Tree: "+str(acc_gbc))
print("Accuracy of Random Forest: "+str(acc_rfc))

Accuracy of Gradient Boosting Tree: 0.9556898288016112
Accuracy of Random Forest: 0.9154078549848943


### Gradient Boosting gives us 95 % accuracy 