In [None]:
!pip install psycopg2-binary | tail -n 1
!pip install ibm-ai-openscale==2.1.14 --no-cache | tail -n 1
!pip install --upgrade numpy --no-cache | tail -n 1
!pip install --upgrade lime --no-cache | tail -n 1
!pip install --upgrade SciPy --no-cache | tail -n 1
!pip install --user -U pyspark==2.1.2 --no-cache | tail -n 1

In [None]:
!pip install --upgrade scikit-learn==0.20.3

# Restart the Kernel Now
Always restart the kernel after installing new packages

# Load and Clean Data

In [None]:
# Place cursor below and insert the Pandas DataFrame for your WA_Fn-UseC_-Telco-Customer-Churn.csv data
# Make sure the variable is named `df_data_1` for the line `df_data_1 = pd.read_csv(body)`

import dsx_core_utils, requests, jaydebeapi, os, io, sys
from pyspark.sql import SparkSession
import pandas as pd
df1 = None
dataSet = dsx_core_utils.get_remote_data_set_info('USER1066.billingProductCustomers')
dataSource = dsx_core_utils.get_data_source_info(dataSet['datasource'])
if (sys.version_info >= (3, 0)):
  conn = jaydebeapi.connect(dataSource['driver_class'], dataSource['URL'], [dataSource['user'], dataSource['password']])
else:
  conn = jaydebeapi.connect(dataSource['driver_class'], [dataSource['URL'], dataSource['user'], dataSource['password']])
query = 'select * from "' + (dataSet['schema'] + '"."' if (len(dataSet['schema'].strip()) != 0) else '') +  dataSet['table'] + '"'

if (dataSet['query']):
    query = dataSet['query']
df1 = pd.read_sql(query, con=conn)
df1.head()




In [None]:
df_data_1 = df1
# Drop customerID column
df_data_1 = df_data_1.drop('customerID', axis=1)
df_data_1.head(5)

In [None]:
df_data_1.info()

In [None]:
# Check if we have any NaN values
df_data_1.isnull().values.any()

In [None]:
# Handle missing values for column 8, TotalCharges
from sklearn.preprocessing import Imputer

imp = Imputer(missing_values="NaN", strategy="mean")

df_data_1.iloc[:, 8] = imp.fit_transform(df_data_1.iloc[:, 8].values.reshape(-1, 1))
df_data_1.iloc[:, 8] = pd.Series(df_data_1.iloc[:, 8])

In [None]:
# Check if we have any NaN values
df_data_1.isnull().values.any()

# Create a model

In [None]:
from pyspark.sql import SparkSession
import pandas as pd
import json

spark = SparkSession.builder.getOrCreate()
df_data = spark.createDataFrame(df_data_1)
df_data.head()

### Modify the MODEL_NAME and DEPLOY_NAME if you are re-running this notebook, and want multiple versions

In [None]:
MODEL_NAME = "TelcoChurn-v2"
DEPLOYMENT_NAME = "TelcoChurn-v2"

In [None]:
spark_df = df_data
(train_data, test_data) = spark_df.randomSplit([0.8, 0.2], 24)


print("Number of records for training: " + str(train_data.count()))
print("Number of records for evaluation: " + str(test_data.count()))

spark_df.printSchema()

In [None]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, IndexToString, VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline, Model


si_gender = StringIndexer(inputCol = 'gender', outputCol = 'gender_IX')
si_Partner = StringIndexer(inputCol = 'Partner', outputCol = 'Partner_IX')
si_Dependents = StringIndexer(inputCol = 'Dependents', outputCol = 'Dependents_IX')
si_PhoneService = StringIndexer(inputCol = 'PhoneService', outputCol = 'PhoneService_IX')
si_MultipleLines = StringIndexer(inputCol = 'MultipleLines', outputCol = 'MultipleLines_IX')
si_InternetService = StringIndexer(inputCol = 'InternetService', outputCol = 'InternetService_IX')
si_OnlineSecurity = StringIndexer(inputCol = 'OnlineSecurity', outputCol = 'OnlineSecurity_IX')
si_OnlineBackup = StringIndexer(inputCol = 'OnlineBackup', outputCol = 'OnlineBackup_IX')
si_DeviceProtection = StringIndexer(inputCol = 'DeviceProtection', outputCol = 'DeviceProtection_IX')
si_TechSupport = StringIndexer(inputCol = 'TechSupport', outputCol = 'TechSupport_IX')
si_StreamingTV = StringIndexer(inputCol = 'StreamingTV', outputCol = 'StreamingTV_IX')
si_StreamingMovies = StringIndexer(inputCol = 'StreamingMovies', outputCol = 'StreamingMovies_IX')
si_Contract = StringIndexer(inputCol = 'Contract', outputCol = 'Contract_IX')
si_PaperlessBilling = StringIndexer(inputCol = 'PaperlessBilling', outputCol = 'PaperlessBilling_IX')
si_PaymentMethod = StringIndexer(inputCol = 'PaymentMethod', outputCol = 'PaymentMethod_IX')


In [None]:
si_Label = StringIndexer(inputCol="Churn", outputCol="label").fit(spark_df)
label_converter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=si_Label.labels)

In [None]:
va_features = VectorAssembler(inputCols=['gender_IX',  'SeniorCitizen', 'Partner_IX', 'Dependents_IX', 'PhoneService_IX', 'MultipleLines_IX', 'InternetService_IX', \
                                         'OnlineSecurity_IX', 'OnlineBackup_IX', 'DeviceProtection_IX', 'TechSupport_IX', 'StreamingTV_IX', 'StreamingMovies_IX', \
                                         'Contract_IX', 'PaperlessBilling_IX', 'PaymentMethod_IX', 'TotalCharges', 'MonthlyCharges'], outputCol="features")

In [None]:
from pyspark.ml.classification import RandomForestClassifier

classifier = RandomForestClassifier(featuresCol="features")

pipeline = Pipeline(stages=[si_gender, si_Partner, si_Dependents, si_PhoneService, si_MultipleLines, si_InternetService, si_OnlineSecurity, si_OnlineBackup, si_DeviceProtection, \
                            si_TechSupport, si_StreamingTV, si_StreamingMovies, si_Contract, si_PaperlessBilling, si_PaymentMethod, si_Label, va_features, \
                            classifier, label_converter])

model = pipeline.fit(train_data)

In [None]:
predictions = model.transform(test_data)
evaluatorDT = BinaryClassificationEvaluator(rawPredictionCol="prediction")
area_under_curve = evaluatorDT.evaluate(predictions)

#default evaluation is areaUnderROC
print("areaUnderROC = %g" % area_under_curve)

# Save and deploy the model

In [None]:
from dsx_ml.ml import save

In [None]:
save(name=MODEL_NAME,
    model=model,
    test_data = test_data,
    algorithm_type='Classification',
    description='This is a SparkML Model to Classify Telco Customer Churn Risk')

In [None]:
# Write the test data without label to a .csv so that we can later use it for batch scoring
write_score_CSV=test_data.toPandas().drop(['Churn'], axis=1)
write_score_CSV.to_csv('../datasets/TelcoCustomerSparkMLBatchScore.csv', sep=',', index=False)

In [None]:
# Write the test data to a .csv so that we can later use it for evaluation
write_eval_CSV=test_data.toPandas()
write_eval_CSV.to_csv('../datasets/TelcoCustomerSparkMLEval.csv', sep=',', index=False)