In [1]:
import findspark
findspark.init('/usr/local/spark')

In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.context import SparkContext

import os
from pyspark.sql.functions import isnull, count, log10, col, when, lit
import pyspark.sql.functions as F
import numpy as np

# Defining Spark Session and Context objects

In [3]:
sc = SparkContext.getOrCreate()
spark = SparkSession.builder.appName('ckpt2_spark').getOrCreate()

# Loading Data into Spark Session object

In [39]:
main_data_dir_path = os.path.abspath('Output_telecomData')
subdirectory_name = 'WithNaNs'

data_dir_path = os.path.join(main_data_dir_path, subdirectory_name)
assert os.path.exists(data_dir_path)

datafile_names = os.listdir(data_dir_path)
datafile_paths = [os.path.join(data_dir_path, datafile) for datafile in datafile_names]

datafile_paths

['/home/hduser/Deloitte_capstone_project/Output_telecomData/WithNaNs/Customer_account_info.csv',
 '/home/hduser/Deloitte_capstone_project/Output_telecomData/WithNaNs/Customer_Churn.csv',
 '/home/hduser/Deloitte_capstone_project/Output_telecomData/WithNaNs/Customer_demographics.csv',
 '/home/hduser/Deloitte_capstone_project/Output_telecomData/WithNaNs/Customer_services.csv']

In [40]:
cust_acc_df = spark.read.load(datafile_paths[0], format='csv', sep=',', inferSchema=True, header=True)
cust_churn_df = spark.read.load(datafile_paths[1], format='csv', sep=',', inferSchema=True, header=True)
cust_demo_df = spark.read.load(datafile_paths[2], format='csv', sep=',', inferSchema=True, header=True)
cust_serv_df = spark.read.load(datafile_paths[3], format='csv', sep=',', inferSchema=True, header=True)

In [41]:
cust_acc_df.printSchema()

root
 |-- customerID: integer (nullable = true)
 |-- Tenure: double (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: double (nullable = true)



In [42]:
cust_churn_df.printSchema()

root
 |-- customerID: integer (nullable = true)
 |-- Churn: string (nullable = true)



In [43]:
cust_demo_df.printSchema()

root
 |-- customerID: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- SeniorCitizen: double (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)



In [44]:
cust_serv_df.printSchema()

root
 |-- customerID: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)



In [45]:
cust_churn_df.count()

34413

In [46]:
cust_acc_df.createOrReplaceTempView('acc_df')
cust_demo_df.createOrReplaceTempView('demo_df')
cust_serv_df.createOrReplaceTempView('serv_df')

# NaN Value Treatment

In [47]:
def get_nan_count(df, col_name):
    return df.filter(df[col_name]==np.nan).count()

def replace_nan_in_col(df, col_name, by_value=0):
    dtypes_dict = dict(df.dtypes)
    col_dtype = dtypes_dict[col_name]
    if col_dtype!='string':
        return df.replace(np.nan, by_value, col_name)
    return df.replace('NaN', by_value, col_name)

def get_col_mean(df, col_name):
    temp = df.replace(np.nan, 0)
    return temp.agg({col_name:'avg'}).collect()[0][0]    

def get_col_mode(df, col_name):
    temp=df.replace(np.nan, 0)
    return df.groupBy(col_name).count().orderBy(F.desc("count")).first()[0]

### Imputing Tenure and Monthly Charges features:
As shown in **task1.1** tenure and monthly charges both show normal-like distribution with piled up values at their respective highs and lows. 

<h4> Basic Treatment Strategy </h4> Mean imputation will work fine as the data is mostly normal like.<br>
<h4> Complex Treatment Strategy </h4> Can categorize the data in three parts, the lows, the highs and rest. We can use models like random forest to classify the NaNs of both of the features into these three categories, and finally after classification we can impute the the highs and lows with their respective values and the rests with mean. (<b>But I feel it's a bit overkill as NaN count is not that high</b>)

In [48]:
nan_count_tenure = get_nan_count(cust_acc_df, 'Tenure')
nan_count_monthlyc = get_nan_count(cust_acc_df, 'MonthlyCharges')

print("No. of NaN values in Tenure:", nan_count_tenure)
print("No. of NaN values in MonthlyCharges:", nan_count_monthlyc)

No. of NaN values in Tenure: 16
No. of NaN values in MonthlyCharges: 13


In [49]:
mean_tenure = get_col_mean(cust_acc_df, 'Tenure')
cust_acc_df = replace_nan_in_col(cust_acc_df, 'Tenure', mean_tenure)

mean_monthlyc = get_col_mean(cust_acc_df, 'MonthlyCharges')
cust_acc_df = replace_nan_in_col(cust_acc_df, 'MonthlyCharges', mean_monthlyc)

## Imputing Total Charges feature:
As TotalCharges have a exponentially decreasing distribution, we can perform log transform on it to normalize the distribution.

<h4> Imputation Strategy: </h4> Replace with mean of transformed feature.

In [50]:
nan_count_totalc = get_nan_count(cust_acc_df, 'TotalCharges')

print("No. of NaN values in TotalCharges:", nan_count_totalc)

No. of NaN values in TotalCharges: 22


In [51]:
cust_acc_df = cust_acc_df.withColumn('logTotalCharges', log10(col('TotalCharges')))
mean_logtotalc = get_col_mean(cust_acc_df, 'logTotalCharges')
cust_acc_df = replace_nan_in_col(cust_acc_df, 'logTotalCharges', mean_logtotalc)

In [52]:
nan_count_logtotalc = get_nan_count(cust_acc_df, 'logTotalCharges')
print("No. of NaN values in logTotalCharges:", nan_count_logtotalc)

No. of NaN values in logTotalCharges: 0


## Imputing Account categorical feature:

In [53]:
spark.sql('SELECT CONTRACT, COUNT(*) CONTRACT_COUNT FROM ACC_DF GROUP BY CONTRACT').show()

+--------------+--------------+
|      CONTRACT|CONTRACT_COUNT|
+--------------+--------------+
|Month-to-month|         19693|
|      One year|          4890|
|      Two year|          9823|
|           NaN|             7|
+--------------+--------------+



In [54]:
cust_acc_df = replace_nan_in_col(cust_acc_df, 'Contract', 'Month-to-month')

In [55]:
mode_paperlessbilling = get_col_mode(cust_acc_df, 'PaperlessBilling')
cust_acc_df = replace_nan_in_col(cust_acc_df, 'PaperlessBilling', mode_paperlessbilling)

In [56]:
mode_paymentmethod = get_col_mode(cust_acc_df, 'PaymentMethod')
cust_acc_df = replace_nan_in_col(cust_acc_df, 'PaymentMethod', mode_paymentmethod)

## Imputing Demographic features

In [57]:
for demo_col in cust_demo_df.columns:
    mode_val = get_col_mode(cust_demo_df, demo_col)
    cust_demo_df = replace_nan_in_col(cust_demo_df, demo_col, mode_val)

## Imputing Services features

In [58]:
for serv_col in cust_serv_df.columns:
    mode_val = get_col_mode(cust_serv_df, serv_col)
    cust_serv_df = replace_nan_in_col(cust_serv_df, serv_col, mode_val)

# Feature Engineering

## OrdinalEncoding for InternetService and Contract
These two have an inherent ordering among their categories.

**Contract**: Month-to-month -> One year -> two year<br>
**InternetService**: No -> DSL -> Fiber Optic

In [59]:
spark.sql('SELECT InternetService, COUNT(*) INTERNET_SERV_COUNT FROM SERV_DF GROUP BY InternetService').show()

+---------------+-------------------+
|InternetService|INTERNET_SERV_COUNT|
+---------------+-------------------+
|    Fiber optic|              14181|
|             No|               5226|
|            DSL|              15006|
+---------------+-------------------+



In [60]:
internet_service_categories = ['No', 'DSL', 'Fiber optic']
internet_service_order = [1, 2, 3]
cust_serv_df = cust_serv_df.withColumn('InternetServiceOrdinal', when(col('InternetService')=='No', 1)\
                                            .otherwise(
                                                when(col('InternetService')=='DSL', 2)\
                                                .otherwise(
                                                    when(col('InternetService')=='Fiber optic', 3)\
                                                    .otherwise(-1)
                                                )
                                            )
                      )

cust_acc_df = cust_acc_df.withColumn('Contract', when(col('Contract')=='Month-to-month', 1)\
                                        .otherwise(
                                            when(col('Contract')=='One year', 2)\
                                            .otherwise(
                                                when(col('Contract')=='Two year', 3)\
                                                .otherwise(-1)
                                            )
                                        )
                                     )

## Binning Tenure and MonthlyCharges
As shown in task1.1 the distribution of Tenure differs a lot for churned and non-churned customers for their respective high and low values, so it can be beneficial to create bins for them.<br>
<br>
For **Tenure**: We can use equal width binning with a bin size of 20 months.<br>
For **MonthlyCharges**: We can again use equal width binning with a bin size of 40 units.

In [61]:
from pyspark.ml.feature import Bucketizer

In [62]:
tenure_bucketizer = Bucketizer(splits=[-np.float("inf"), 20, 40, 60, np.float("inf")],
                              inputCol='Tenure', outputCol='TenureBuckets', handleInvalid='keep')
cust_acc_df = tenure_bucketizer.transform(cust_acc_df)

In [63]:
monthlyc_bucketizer = Bucketizer(splits=[-np.float("inf"), 40, 80, np.float("inf")],
                                inputCol='MonthlyCharges', outputCol='MonthlyChargesBuckets', 
                                handleInvalid='keep')
cust_acc_df = monthlyc_bucketizer.transform(cust_acc_df)

## Standardizing Tenure and MonthlyCharges

In [64]:
mean_tenure = get_col_mean(cust_acc_df, 'Tenure')
std_tenure = cust_acc_df.agg({'Tenure': 'std'}).collect()[0][0]
cust_acc_df = cust_acc_df.withColumn('TenureScaled', 
                                     (cust_acc_df['Tenure']-lit(mean_tenure))/lit(std_tenure))

In [65]:
mean_monthlyc = get_col_mean(cust_acc_df, 'MonthlyCharges')
std_monthlyc = cust_acc_df.agg({'MonthlyCharges': 'std'}).collect()[0][0]
cust_acc_df = cust_acc_df.withColumn("MonthlyChargesScaled", 
                                    (cust_acc_df['MonthlyCharges']-lit(mean_monthlyc))/lit(std_monthlyc))

## Replacing No internet services with No in services dataframe

In [66]:
spark.sql("SELECT ONLINEBACKUP, COUNT(*) FROM SERV_DF GROUP BY ONLINEBACKUP").show()

+-------------------+--------+
|       ONLINEBACKUP|count(1)|
+-------------------+--------+
|                 No|   14064|
|                Yes|   15080|
|No internet service|    5269|
+-------------------+--------+



In [67]:
cust_serv_df = cust_serv_df.replace('No internet service', 'No')

## Aggregate for Tenure and MonthlyCharges
Tenure*MonthlyCharges<br>
Tenure/MonthlyCharges

In [68]:
cust_acc_df = cust_acc_df.withColumn('Prod_Tenure_MonthlyCharges', 
                                     cust_acc_df['Tenure']*cust_acc_df['MonthlyCharges'])
cust_acc_df = cust_acc_df.withColumn('Div_Tenure_MonthlyCharges',
                                    cust_acc_df['Tenure']/cust_acc_df['MonthlyCharges'])

## Aggregate of Streaming Services
streamingTV_StreamingMovies

In [69]:
cust_serv_df = cust_serv_df.withColumn('TV_Movies', 
                        F.concat(cust_serv_df['StreamingTV'], lit('_'), cust_serv_df['StreamingMovies']))

## Aggregate of Security related features
OnlineSecurity_DeviceProtection_TechSupport_OnlineBackup<br>
From task1.1 we observed more the services a customer enrolled with greater is the retaining chances, so we can encode this by ordinally encoding the aggregate based on no. of yes values.

In [70]:
cust_serv_df = cust_serv_df.withColumn('Security_Agg',
                                      F.concat(
                                              col('OnlineSecurity'), lit('_'),\
                                              col('DeviceProtection'), lit('_'),\
                                              col('TechSupport'), lit('_'),\
                                              col('OnlineBackup'), lit('_')
                                              )
                                      )



In [83]:
numeric_serv_columns = ['OnlineSecurity', 'OnlineBackup', 'TechSupport', 'DeviceProtection', 'StreamingTV', 'StreamingMovies']
for serv_col in numeric_serv_columns:
    new_serv_col = serv_col + 'Numeric'
    cust_serv_df = cust_serv_df.withColumn(new_serv_col, when(col(serv_col)=='Yes', 1).otherwise(0))

In [90]:
cust_serv_df = cust_serv_df.withColumn('ServicesCount', 
                        sum([cust_serv_df[serv_col + 'Numeric'] \
                             for serv_col in numeric_serv_columns]))

## Label Indexing

In [92]:
from pyspark.ml.feature import StringIndexer

In [98]:
cust_acc_df = cust_acc_df.withColumn('PaperlessBillingNumeric', 
                                     when(col('PaperlessBilling')=='Yes', 1).otherwise(0))

In [102]:
si = StringIndexer(inputCol='PaymentMethod', outputCol='PaymentMethodNumeric')
si_model = si.fit(cust_acc_df)
cust_acc_df = si_model.transform(cust_acc_df)

In [110]:
si_model.labels

['Mailed check',
 'Electronic check',
 'Bank transfer (automatic)',
 'Credit card (automatic)']

In [123]:
demo_si = StringIndexer(inputCol='Gender', outputCol='Gender'+'Numeric')
demo_si_model = demo_si.fit(cust_demo_df)
cust_demo_df = demo_si_model.transform(cust_demo_df)

In [124]:
demo_si_model.labels

['Male', 'Female']

In [115]:
cust_demo_df = cust_demo_df.withColumn('PartnerNumeric', when(col('Partner')=='Yes', 1).otherwise(0))
cust_demo_df = cust_demo_df.withColumn('DependentsNumeric', when(col('Dependents')=='Yes', 1).otherwise(0))

In [119]:
cust_serv_df = cust_serv_df.withColumn('PhoneServiceNumeric', when(col('PhoneService')=='Yes', 1).otherwise(0))
cust_serv_df = cust_serv_df.withColumn('MultipleLines', when(col('MultipleLines')=='No phone service', 'No'))
cust_serv_df = cust_serv_df.withColumn('MultipleLinesNumeric', when(col('MultipleLines')=='Yes', 1).otherwise(0))


## One-Hot-Encoding

In [125]:
from pyspark.ml.feature import OneHotEncoder

In [134]:
ohe = OneHotEncoder(dropLast=False, inputCol='PaymentMethodNumeric', outputCol='PaymentMethodVector')
cust_acc_df = ohe.transform(cust_acc_df)