# Telco Churn (Livy) - Remote HDP

IBM DSX Local provides the interface for Python notebooks to work with an existing remote Spark through HTTP connection and user-friendly sparkmagic commands. This sample notebook shows how to work with remote Spark using the Livy Spark kernel.

The installation of the remote Spark in this sample is using Horton Data Platform (HDP), which utilizes Livy HTTP REST API. Livy is an open source REST interface for interacting with [Apache Spark](http://spark.apache.org) from anywhere. It supports executing snippets of code or programs in a Spark context that runs locally or in [Apache Hadoop YARN](http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html).

This notebook runs on Python2 with Livy Spark.

![CRISP-DM](https://raw.githubusercontent.com/dhananjaymehta/IoTtrucking/master/HDP.png)

## 1. Load sparkmagic

Sparkmagic is a set of tools for interactively working with remote Spark clusters through Livy, a Spark REST server, in Jupyter notebooks. The Sparkmagic project includes a set of magics for interactively running Spark code in multiple languages, as well as some kernels that you can use to turn Jupyter into an integrated Spark environment.


In [1]:
%load_ext sparkmagic.magics
import dsx_core_utils
dsx_core_utils.setup_livy_sparkmagic()
#%reload_ext sparkmagic.magics

success configuring sparkmagic livy.


##  2. Create a connection to remote Spark
Creating a spark livy connection to remote Hadoop Cluster
![CRISP-DM](https://raw.githubusercontent.com/dhananjaymehta/IoTtrucking/master/ambari.png)

In [1]:
# Create a livy session (this might not be needed after v1.2)
%manage_spark

In [3]:
# Set the path for server and demo
demo_path = '/tmp/'
hdfs_server = 'http://HDP_Cluster:50070/webhdfs/v1/'

## 3. View the dataset

In [2]:
# Chustomer data
#!curl -i -L "http://HDP_Cluster:50070/webhdfs/v1/user/customer.csv?op=OPEN" | tail -n 5
!wget -i -L "https://raw.githubusercontent.com/hortonworks-gallery/dsx/master/Telco-Churn-Prediction/dataset/customer.csv" | tail -n 5

In [5]:
# Churn data
#!curl -i -L "http://HDP_Cluster:50070/webhdfs/v1/user/dsx_datasets/churn.csv?op=OPEN" | tail -n 5
!wget -i -L "https://raw.githubusercontent.com/hortonworks-gallery/dsx/master/Telco-Churn-Prediction/dataset/churn.csv" | tail -n 5

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100 20079  100 20079    0     0   286k      0 --:--:-- --:--:-- --:--:--  286k
3821,"T"
3822,"T"
3823,"T"
3824,"T"
3825,"T"


### Setting DSX Configuration File.
Setting DSX configuration file that will be used in HDP cluster for for connecting to DSX Cluster. This file contains following details -
- DSX_PROJECT_NAME
- DSX_PROJECT_ID
- DSX_TOKEN
- DSX_USER_ID

In [9]:
from dsx_core_utils import hdfs_util

# uploading data file to be used to create model - > this has been taken care of 
#hdfs_util.upload_file(hdfs_server,"./customer.csv" ,"/tmp/customer.csv") 

# uploading config file for dsx credentials and data -> This has to be done
hdfs_util.upload_dsx_config_file(hdfs_server, demo_path + "dsx_config_file.txt" ) 

In [10]:
%%spark
# now that we are starting to run in remote spark. We must have a reference to the same demo path in order to know where to get the files back from. 
demo_path = '/tmp/'

## 4. Creating Dataframes

In [11]:
%%spark
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.sql import SQLContext, SparkSession
#sc = SparkContext()
# Customer Information
customer = SQLContext(sc).read.csv('./customer.csv', header='true', inferSchema = 'true')
#customer = SQLContext(sc).read.csv('hdfs://HDP_Cluster:8020/user/customer.csv', header=True, inferSchema=True)
customer.show(5)

#Churn information    
customer = SQLContext(sc).read.csv('./churn.csv', header='true', inferSchema = 'true')
#customer_churn = SQLContext(sc).read.csv('hdfs://HDP_Cluster:8020/user/churn.csv', header=True, inferSchema=True)
customer_churn.show(5)

+---+------+------+--------+----------+---------+---------+------------+-------------+------+-------+---------+-------------+--------------------+------+--------+
| ID|Gender|Status|Children|Est Income|Car Owner|      Age|LongDistance|International| Local|Dropped|Paymethod|LocalBilltype|LongDistanceBilltype| Usage|RatePlan|
+---+------+------+--------+----------+---------+---------+------------+-------------+------+-------+---------+-------------+--------------------+------+--------+
|  1|     F|     S|     1.0|   38000.0|        N|24.393333|       23.56|          0.0|206.08|    0.0|       CC|       Budget|      Intnl_discount|229.64|     3.0|
|  6|     M|     M|     2.0|   29616.0|        N|49.426667|       29.78|          0.0|  45.5|    0.0|       CH|    FreeLocal|            Standard| 75.29|     2.0|
|  8|     M|     M|     0.0|   19732.8|        N|50.673333|       24.81|          0.0| 22.44|    0.0|       CC|    FreeLocal|            Standard| 47.25|     3.0|
| 11|     M|     S|   

## 5. Performing Data Wrangling 
In this section we - 
- Join Customer and Churn datasets
- modify names of the columns

In [3]:
%%spark
data =customer.join(customer_churn,customer['ID']==customer_churn['ID']).select(customer['*'],customer_churn['CHURN'])
data = data.withColumnRenamed("Est Income", "EstIncome").withColumnRenamed("Car Owner","CarOwner")
data.show(5)

In [None]:
%%spark
data.printSchema()

## 6. Build a Classification Model
This model is built on HDP, any calculations and computations performed by Spark is utilizing the resources of HDP cluster.

In [None]:
%%spark
import shutil
import os
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext
from pyspark.ml.feature import OneHotEncoder, StringIndexer, IndexToString, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline, Model


#df_data = spark.read.format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat').option('header', 'true').option('inferSchema', 'true').load(data_file_path)

# Importing data to dataframe

splitted_data = data.randomSplit([0.8, 0.18, 0.02], 24)
train_data = splitted_data[0]
test_data = splitted_data[1]
predict_data = splitted_data[2]

print "Number of training records: " + str(train_data.count())
print "Number of testing records : " + str(test_data.count())
print "Number of prediction records : " + str(predict_data.count())

In [None]:
%%spark
# Prepare string variables so that they can be used by the decision tree algorithm
# StringIndexer encodes a string column of labels to a column of label indices
SI1 = StringIndexer(inputCol='Gender', outputCol='GenderEncoded')
SI2 = StringIndexer(inputCol='Status',outputCol='StatusEncoded')
SI3 = StringIndexer(inputCol='CarOwner',outputCol='CarOwnerEncoded')
SI4 = StringIndexer(inputCol='Paymethod',outputCol='PaymethodEncoded')
SI5 = StringIndexer(inputCol='LocalBilltype',outputCol='LocalBilltypeEncoded')
SI6 = StringIndexer(inputCol='LongDistanceBilltype',outputCol='LongDistanceBilltypeEncoded')

#encode the Label column
labelIndexer = StringIndexer(inputCol='CHURN', outputCol='label').fit(train_data)


# Pipelines API requires that input variables are passed in  a vector
assembler = VectorAssembler(inputCols=["GenderEncoded", "StatusEncoded", "CarOwnerEncoded", "PaymethodEncoded", "LocalBilltypeEncoded", \
                                       "LongDistanceBilltypeEncoded", "Children", "EstIncome", "Age", "LongDistance", "International", "Local",\
                                      "Dropped","Usage","RatePlan"], outputCol="features")

# instantiate the algorithm, take the default settings
rf=RandomForestClassifier(labelCol="label", featuresCol="features")

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labelIndexer.labels)

# Create a pipeline 
pipeline = Pipeline(stages=[SI1,SI2,SI3,SI4,SI5,SI6,labelIndexer, assembler, rf, labelConverter])

# Build model. The fitted model from a Pipeline is a PipelineModel, 
# which consists of fitted models and transformers, corresponding to the pipeline stages.
model_rf = pipeline.fit(train_data)

## 7. Testing & Validating the model

In [None]:
%%spark
# testing the model accuracy
predictions = model_rf.transform(test_data)
evaluatorRF = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluatorRF.evaluate(predictions)
print("Accuracy = %g" % accuracy)
print("Test Error = %g" % (1.0 - accuracy))

In [None]:
%%spark
results = model_rf.transform(test_data)
results=results.select(results["ID"],results["CHURN"],results["label"],results["predictedLabel"],results["prediction"],results["probability"])
results.show(5)

## 8. Saving model Pipeline
The model that was developed on hadoop cluster is now saved Local file system of the Spark Node. This File is than moved from local filesystem to HDFS.

In [None]:
%%spark
import os
os.chdir("/tmp")

# Save model
pipeline_model_name = "PipelineModel_TelcoChurn_Spark_v1.0"
pipeline_model_path = "/tmp/"+pipeline_model_name

model_rf.save(pipeline_model_path)

from subprocess import Popen, PIPE, STDOUT 

cmd = "hdfs dfs -copyToLocal "+pipeline_model_path +" " +  pipeline_model_path
p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
output = p.stdout.read()
print (output)

# compress into tar.gz files, 
# IMPORTANT: please compress exactly in the structure here. The archived model must not have ANY prefix directories in the archive. 
shutil.make_archive( base_name =pipeline_model_name, format= 'gztar', root_dir = pipeline_model_name +'/',  base_dir  =  './' )

In [None]:
%%spark
from subprocess import Popen, PIPE, STDOUT
dsx_data_path = demo_path+'dsx_config_file.txt'

cmd = "hdfs dfs -copyToLocal "+dsx_data_path +" " +  dsx_data_path
p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
output = p.stdout.read()
print (output)

In [None]:
%%spark
#pipeline_file_path = '/tmp/' + pipeline_name +'.tar.gz'
pipeline_model_file_path ='/tmp/'+pipeline_model_name +'.tar.gz'

dsx_data = [line.rstrip('\n') for line in open(dsx_data_path)]
projectGuid = dsx_data[0]
projectId = dsx_data[1]
auth_header = dsx_data[2] 
user_id = dsx_data[3]

print dsx_data

## 9. Deploying model on DSX
Once the model has been created it is sent over HTTP to DSX Cluster where this model is hosted against a REST endpoint.

In [None]:
%%spark
import json
import requests 
#NOTE: requires request library dependency, or any other way of submitting a REST call

headers = {
    'Authorization': auth_header
}
author = {
    "name": "Project Owner",
    "projectName" : projectGuid,
    "uid" : user_id
}

r = requests.post('https://load_balancer_ip/v3/project/' + author['projectName'] + '/models/saveSparkModel',
                  files={
                      'pipelineModelFile': open(pipeline_model_file_path,'rb'),
                      "trainingDataSchema": train_data.schema.json(),
                      "pipelineModelName": "Name_the_model",
                      "modelType" : "spark-2.1",
                      "labelColumnName" : 'CHURN',
                      "runtime": 'Python35',
                      "algorithmType": 'Classification',
                      "algorithm": 'PipelineModel'
                  },
                  headers=headers,
                  verify=False)

print(r.status_code)
print(r.content)

## 10. Testing the deployment

In [4]:
# insert your code here
!curl -k -X GET https://Load_balancer_ip/v2/identity/token -H "username: user" -H "password: password"    

### Testing over UI
1. Save the notebook and switch to the **Models** tab of the project or go under **Model Management** 
2. Under **Models**, find and click your model. Now, click on publish. 
3. Now use the **Test API** option to test the model on UI.

You can use the following data for testing: ID=99, Gender=M, Status=S, Children=0, Est Income=60000, Car Owner=Y, Age=34, LongDistance=68, International=50, Local=100, Dropped=0, Paymethod=CC, LocalBilltype=Budget, LongDistanceBilltype=Intnl_discount, Usage=334, RatePlan=3
![CRISP-DM](https://raw.githubusercontent.com/dhananjaymehta/IoTtrucking/master/Test_GUI.png)

### Testing over CURL
This step is to demonstrate that you can make an external REST API call to test the model. Create and execute this command to invoke the model remotely from terminal or your program: 
!curl -i -k -X POST <Scoring Endpoint> -d '{fields}' "content-type: application/json" -H "authorization: Bearer <generate bearer token>"

In [5]:
# Call the model
!curl -i -k -X POST scoring_endpoint -d '{"ID":99,"Gender":"M","Status":"S","Children":0,"EstIncome":60000,"CarOwner":"Y","Age":34,"LongDistance":68,"International":50,"Local":100,"Dropped":0,"Paymethod":"CC","LocalBilltype":"Budget","LongDistanceBilltype":"Intnl_discount","Usage":334,"RatePlan":3}' -H "content-type: application/json" -H "authorization: Bearer Bearer_Token" 
