# Configure Spark Environment

In [24]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [25]:
for item in sorted(sc._conf.getAll()): print(item)

(u'spark.app.id', u'app-20210708173228-0034')
(u'spark.app.name', u'my-notebook')
(u'spark.cores.max', u'2')
(u'spark.driver.extraClassPath', u'/usr/local/spark/jars/commons-lang-2.6.jar:/dbdrivers/*')
(u'spark.driver.extraJavaOptions', u'-Djavax.net.ssl.trustStore=/user-home/_global_/security/customer-truststores/cacerts')
(u'spark.driver.host', u'10.1.110.63')
(u'spark.driver.port', u'38674')
(u'spark.dynamicAllocation.enabled', u'true')
(u'spark.dynamicAllocation.executorIdleTimeout', u'300')
(u'spark.dynamicAllocation.initialExecutors', u'1')
(u'spark.eventLog.dir', u'/tmp/spark-events')
(u'spark.eventLog.enabled', u'true')
(u'spark.executor.extraJavaOptions', u'-Djavax.net.ssl.trustStore=/user-home/_global_/security/customer-truststores/cacerts')
(u'spark.executor.id', u'driver')
(u'spark.executor.memory', u'4g')
(u'spark.master', u'spark://spark-master-svc:7077')
(u'spark.port.maxRetries', u'100')
(u'spark.rdd.compress', u'True')
(u'spark.serializer.objectStreamReset', u'100')
(u

In [2]:
sc.stop()

In [3]:
sc.stop()
from pyspark import SparkConf, SparkContext
conf = (SparkConf()
    .set("spark.cores.max", "6")
    .set("spark.dynamicAllocation.initialExecutors", "6")
    .set("spark.executor.cores", "1")
    .set("spark.executor.memory", "3g"))
sc=SparkContext(conf=conf)

In [4]:
for item in sorted(sc._conf.getAll()): print(item)

(u'spark.app.id', u'app-20210708182016-0038')
(u'spark.app.name', u'pyspark-shell')
(u'spark.cores.max', u'6')
(u'spark.driver.extraClassPath', u'/dbdrivers/*')
(u'spark.driver.extraJavaOptions', u'-Djavax.net.ssl.trustStore=/user-home/_global_/security/customer-truststores/cacerts')
(u'spark.driver.host', u'10.1.110.6')
(u'spark.driver.port', u'40264')
(u'spark.dynamicAllocation.enabled', u'true')
(u'spark.dynamicAllocation.executorIdleTimeout', u'300')
(u'spark.dynamicAllocation.initialExecutors', u'6')
(u'spark.eventLog.dir', u'/tmp/spark-events')
(u'spark.eventLog.enabled', u'true')
(u'spark.executor.cores', u'1')
(u'spark.executor.extraJavaOptions', u'-Djavax.net.ssl.trustStore=/user-home/_global_/security/customer-truststores/cacerts')
(u'spark.executor.id', u'driver')
(u'spark.executor.memory', u'3g')
(u'spark.master', u'spark://spark-master-svc:7077')
(u'spark.port.maxRetries', u'100')
(u'spark.rdd.compress', u'True')
(u'spark.serializer.objectStreamReset', u'100')
(u'spark.shu

## <font color='maroon'>This Notebook shall build and train a Spark model to predict customer churn using 1 million records as training data. </font>
### <font color='navyblue'> The training data is available as a local file in WSL and also in a remote db2 database </font>   
### <font color='navyblue'> The notebook shall load both sources and compare loading time </font>   

This notebook contains steps and code to develop a predictive model, and start scoring new data. This notebook introduces commands for getting data and for basic data cleaning and exploration, pipeline creation, model training, model persistance to Watson Machine Learning repository, model deployment, and scoring.

Some familiarity with Python is helpful. This notebook uses Python 3.6 and Apache® Spark 2.4.

You will use a data set, <B>Telco Customer Churn</B>, which details anonymous customer data from a telecommunication company. Use the details of this data set to predict customer churn which is very critical to business as it's easier to retain existing customers rather than acquiring new ones.

## Learning goals

The learning goals of this notebook are:

-  Load a CSV file into an Apache® Spark DataFrame.
-  Explore data.
-  Prepare data for training and evaluation.
-  Create an Apache® Spark machine learning pipeline.
-  Train and evaluate a model.
-  Persist a pipeline and model in Watson Machine Learning repository.
-  Explore and visualize prediction results using the plotly package.
-  Deploy a model for batch scoring using Wastson Machine Learning API.


## Contents

This notebook contains the following parts:

1.	[Set up the environment](#setup)
2.	[Load and explore data](#load)
3.	[Create spark ml model](#model)
4.	[Persist model](#persistence)
5.	[Predict locally and visualize](#visualization)
6.	[Deploy and score](#scoring)
7.  [Clean up](#cleanup)
8.	[Summary and next steps](#summary)



In [16]:
# Step 1: Load local file

import os
from pyspark.sql import SQLContext
from datetime import datetime


ts1 = datetime.now()
print("Start time:", ts1.strftime("%d/%m/%Y %H:%M:%S"))

#
# This sample code may not be suitable for large data sets
#
# Add asset from file system
churn1mlocalcsv = SQLContext(sc).read.csv(os.environ['DSX_PROJECT_DIR']+'/datasets/cust_churn_1m.csv', header='true', inferSchema = 'true')
churn1mlocalcsv.show(5)

te1 = datetime.now()
print("End time:", te1.strftime("%d/%m/%Y %H:%M:%S"))
print("Processing time", te1-ts1)

('Start time:', '09/07/2021 01:41:03')
+-----------+-----------+------+---+----------+-------+--------+--------+------------+-------+----------+---------+-----+----------------+-----+
|CONTRACT_NR|CUSTOMER_NR|GENDER|AGE|INVESTMENT| INCOME|ACTIVITY|YRLY_AMT|AVG_DAILY_TX|YRLY_TX|AVG_TX_AMT|NEGTWEETS|STATE|       EDUCATION|label|
+-----------+-----------+------+---+----------+-------+--------+--------+------------+-------+----------+---------+-----+----------------+-----+
|   93011551|    2141912|     F| 84|    114368|3852862|       5|700259.0|    0.917808|    335|   2090.32|        3|   TX|Bachelors degree|    0|
|   99352651|    4970498|     F| 44|     90298|3849843|       1|726977.0|    0.950685|    347|   2095.04|        2|   CA|Bachelors degree|    0|
|   97002068|     755732|     F| 23|     94881|3217364|       1|579084.0|    0.920548|    336|   1723.46|        5|   CA|Bachelors degree|    1|
|   96734455|    2887915|     F| 24|    112099|2438218|       4|470964.0|    0.994521|    3

In [17]:
# Step 2: Load Db2 data

import dsx_core_utils, requests, os, io
from pyspark.sql import SparkSession

ts2 = datetime.now()
print("Start time:", ts2.strftime("%d/%m/%Y %H:%M:%S"))

#
# This sample code may not be suitable for large data sets
#
# Add asset from remote connection
df1 = None
dataSet = dsx_core_utils.get_remote_data_set_info('CUST_CHURN_1M')
dataSource = dsx_core_utils.get_data_source_info(dataSet['datasource'])
sparkSession = SparkSession(sc).builder.getOrCreate()
# Load JDBC data to Spark dataframe
dbTableOrQuery = '"' + (dataSet['schema'] + '"."' if(len(dataSet['schema'].strip()) != 0) else '') + dataSet['table'] + '"'
if (dataSet['query']):
    dbTableOrQuery = "(" + dataSet['query'] + ") TBL"
churnspark1mdb2 = sparkSession.read.format("jdbc").option("url", dataSource['URL']).option("dbtable", dbTableOrQuery).option("user",dataSource['user']).option("password",dataSource['password']).load()
churnspark1mdb2.show(5)

te2 = datetime.now()
print("End time:", te2.strftime("%d/%m/%Y %H:%M:%S"))
print("Processing time", te2-ts2)


('Start time:', '09/07/2021 01:41:10')
+-----------+-----------+------+---+----------+-------+--------+-----------+------------+-------+-----------+---------+-----+----------------+-----+
|CONTRACT_NR|CUSTOMER_NR|GENDER|AGE|INVESTMENT| INCOME|ACTIVITY|   YRLY_AMT|AVG_DAILY_TX|YRLY_TX| AVG_TX_AMT|NEGTWEETS|STATE|       EDUCATION|label|
+-----------+-----------+------+---+----------+-------+--------+-----------+------------+-------+-----------+---------+-----+----------------+-----+
|   93011551|    2141912|     F| 84|    114368|3852862|       5|700259.0000|    0.917808|    335|2090.320000|        3|   TX|Bachelors degree|    0|
|   99352651|    4970498|     F| 44|     90298|3849843|       1|726977.0000|    0.950685|    347|2095.040000|        2|   CA|Bachelors degree|    0|
|   97002068|     755732|     F| 23|     94881|3217364|       1|579084.0000|    0.920548|    336|1723.460000|        5|   CA|Bachelors degree|    1|
|   96734455|    2887915|     F| 24|    112099|2438218|       4|470

Explore the loaded data by using the following Apache® Spark DataFrame methods:
-  print schema
-  count all records
-  show distribution of label classes

In [18]:
churn1mlocalcsv.printSchema()

root
 |-- CONTRACT_NR: integer (nullable = true)
 |-- CUSTOMER_NR: integer (nullable = true)
 |-- GENDER: string (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- INVESTMENT: integer (nullable = true)
 |-- INCOME: integer (nullable = true)
 |-- ACTIVITY: integer (nullable = true)
 |-- YRLY_AMT: double (nullable = true)
 |-- AVG_DAILY_TX: double (nullable = true)
 |-- YRLY_TX: integer (nullable = true)
 |-- AVG_TX_AMT: double (nullable = true)
 |-- NEGTWEETS: integer (nullable = true)
 |-- STATE: string (nullable = true)
 |-- EDUCATION: string (nullable = true)
 |-- label: integer (nullable = true)



As you can see, the data contains 21 fields. "Churn" field is the one we would like to predict (label).

In [19]:
print("Total number of records: " + str(churn1mlocalcsv.count()))

Total number of records: 1025740


Now you will check if all records have complete data.

In [20]:
churn1mlocal_df = churn1mlocalcsv.dropna()

print("Number of records with complete data: %3g" % churn1mlocal_df.count())

Number of records with complete data: 1.02574e+06


You can see that there are some missing values you can investigate that all missing values are present in TotalCharges feature. We will use dataset with missing values removed for model training and evaluation.
Now you will inspect distribution of classes in label column.

In [21]:
churn1mlocal_df.groupBy('label').count().show()

+-----+------+
|label| count|
+-----+------+
|    1|261351|
|    0|764389|
+-----+------+



<a id="model"></a>
## 3. Create an Apache® Spark machine learning model

In this section you will learn how to prepare data, create an Apache® Spark machine learning pipeline, and train a model.

### 3.1: Prepare data

In this subsection you will split your data into: train, test and predict datasets.

In [22]:
(train_data, test_data, predict_data) = churn1mlocal_df.randomSplit([0.8, 0.18, 0.02], 24)

print("Number of records for training: " + str(train_data.count()))
print("Number of records for evaluation: " + str(test_data.count()))
print("Number of records for prediction: " + str(predict_data.count()))

Number of records for training: 820724
Number of records for evaluation: 184454
Number of records for prediction: 20562


As you can see our data has been successfully split into three datasets: 

-  The train data set, which is the largest group, is used for training.
-  The test data set will be used for model evaluation and is used to test the assumptions of the model.
-  The predict data set will be used for prediction.

### 3.2: Create pipeline and train a model

In this section you will create an Apache® Spark machine learning pipeline and then train the model.
In the first step you need to import the Apache® Spark machine learning packages that will be needed in the subsequent steps.

In [23]:
from pyspark.ml.feature import StringIndexer, IndexToString, RFormula
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline, Model

In [24]:
lab = StringIndexer(inputCol = 'label', outputCol = 'labelpredict')
features = RFormula(formula = "~ GENDER + AGE +  INVESTMENT + INCOME + ACTIVITY + YRLY_AMT + AVG_DAILY_TX + YRLY_TX + AVG_TX_AMT + NEGTWEETS + STATE + EDUCATION - 1")

Next, define estimators you want to use for classification. Logistic Regression is used in the following example.

In [25]:
lr = LogisticRegression(maxIter = 10)

Let's build the pipeline now. A pipeline consists of transformers and an estimator.

In [26]:
pipeline_lr = Pipeline(stages = [features, lab , lr])

Now, you can train your Logistic Regression model using the previously defined **pipeline** and **train data**."

In [27]:
model_lr = pipeline_lr.fit(train_data)

You can check your **model accuracy** now. To evaluate the model, use **test data**.

In [28]:
predictions = model_lr.transform(test_data)
evaluator = MulticlassClassificationEvaluator(labelCol="labelpredict", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print("Test dataset:")
print("Accuracy = %3.2f" % accuracy)

Test dataset:
Accuracy = 0.98


You can tune your model now to achieve better accuracy. For simplicity of this example tuning section is omitted.

<a id="persistence"></a>
## 4. Persist model


In this section you will learn how to store your pipeline and model in Watson Machine Learning repository using Python client libraries.

**Note**: Apache® Spark 2.4 is required.
    
### 4.1: Save pipeline and model

In this subsection you will learn how to save pipeline and model artifacts to your Watson Machine Learning instance.

In [29]:
from dsx_ml.ml import save

save(name = 'ChurnPredict',
     model = model_lr,
     test_data = test_data,
     algorithm_type = 'Classification',
     source='Cust_churn_1m.ipynb',
     description='This is a sample description for a spark model')

{'path': '/user-home/999/DSX_Projects/CustomerChurnLab/models/ChurnPredict/4',
 'scoring_endpoint': 'https://dsxl-api/v3/project/score/Python27/spark-2.0/CustomerChurnLab/ChurnPredict/4'}

In [30]:
te2 = datetime.now()
print("End time:", te2.strftime("%d/%m/%Y %H:%M:%S"))
print("Processing time", te2-ts2)

('End time:', '09/07/2021 01:42:35')
('Processing time', datetime.timedelta(0, 84, 795467))
