<a href="https://colab.research.google.com/github/Bksimon/Emory_MSBA/blob/main/Midterm_PartB.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop2.7.tgz
!tar xf spark-3.2.0-bin-hadoop2.7.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop2.7"

import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

In [2]:
from google.colab import drive
drive.mount('/content/drive/')

Mounted at /content/drive/


In [3]:
%cd /content/drive/My Drive/Emory MSBA/Machine Learning II/HW/

/content/drive/My Drive/Emory MSBA/Machine Learning II/HW


In [5]:
# Step 1
#!hdfs dfs -put churn_train.csv /user/training/mldata
#!hdfs dfs -put churn_test.csv /user/training/mldata

#Step 2 + 3
train = spark.read.csv("/content/drive/My Drive/Emory MSBA/Machine Learning II/HW/churn_train.csv", header='True', inferSchema='True')
test = spark.read.csv("/content/drive/My Drive/Emory MSBA/Machine Learning II/HW/churn_test.csv", header='True', inferSchema='True')

# Step 3 Cache the data
train.cache()

DataFrame[State: string, Account length: int, Area code: int, International plan: string, Voice mail plan: string, Number vmail messages: int, Total day minutes: double, Total day calls: int, Total day charge: double, Total eve minutes: double, Total eve calls: int, Total eve charge: double, Total night minutes: double, Total night calls: int, Total night charge: double, Total intl minutes: double, Total intl calls: int, Total intl charge: double, Customer service calls: int, Churn: boolean]

In [6]:
# Step 4
print(train.printSchema())
print(train.show(5))

root
 |-- State: string (nullable = true)
 |-- Account length: integer (nullable = true)
 |-- Area code: integer (nullable = true)
 |-- International plan: string (nullable = true)
 |-- Voice mail plan: string (nullable = true)
 |-- Number vmail messages: integer (nullable = true)
 |-- Total day minutes: double (nullable = true)
 |-- Total day calls: integer (nullable = true)
 |-- Total day charge: double (nullable = true)
 |-- Total eve minutes: double (nullable = true)
 |-- Total eve calls: integer (nullable = true)
 |-- Total eve charge: double (nullable = true)
 |-- Total night minutes: double (nullable = true)
 |-- Total night calls: integer (nullable = true)
 |-- Total night charge: double (nullable = true)
 |-- Total intl minutes: double (nullable = true)
 |-- Total intl calls: integer (nullable = true)
 |-- Total intl charge: double (nullable = true)
 |-- Customer service calls: integer (nullable = true)
 |-- Churn: boolean (nullable = true)

None
+-----+--------------+--------

In [8]:
### Step 5
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import UserDefinedFunction

# drop the following columns: 'State', 'Area code', 'Total day charge', 'Total eve charge', 'Total night charge', and 'Total intl charge'.
columns_to_drop = ['State', 'Area code', 'Total day charge', 'Total eve charge', 'Total night charge', 'Total intl charge']
train = train.drop(*columns_to_drop)
test = test.drop(*columns_to_drop)

In [14]:
# categorical indexers
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler, VectorIndexer, OneHotEncoder, StringIndexer
from pyspark.sql import Row

# map yes and no to 1 and 0 along with True/False
binary_map = {'Yes':1.0, 'No':0.0, True:1.0, False:0.0}
toNum = UserDefinedFunction(lambda k: binary_map[k], DoubleType())

train = train.drop('State').drop('Area code') \
    .drop('Total day charge').drop('Total eve charge') \
    .drop('Total night charge').drop('Total intl charge') \
    .withColumn('Churn', toNum(train['Churn'])) \
    .withColumn('International plan', toNum(train['International plan'])) \
    .withColumn('Voice mail plan', toNum(train['Voice mail plan'])).cache()

test = test.drop('State').drop('Area code') \
    .drop('Total day charge').drop('Total eve charge') \
    .drop('Total night charge').drop('Total intl charge') \
    .withColumn('Churn', toNum(test['Churn'])) \
    .withColumn('International plan', toNum(test['International plan'])) \
    .withColumn('Voice mail plan', toNum(test['Voice mail plan'])).cache()


In [15]:
### Step 6
assembler = VectorAssembler(inputCols=['Account length',
					'international_Vec',
					'voice_Vec',
					'Number vmail messages',
					'Total day minutes',
					'Total day calls',
					'Total eve minutes',
          'Total eve calls',
          'Total day calls',
          'Total night minutes',
          'Total night calls',
          'Total intl minutes',
          'Total intl calls',
          'Customer service calls',
					], outputCol='features')

In [16]:
# optional step: stratified sampling
stratified_train = train.sampleBy('Churn', fractions={0: 388./2278, 1: 1.0}).cache()

# Create dataframe
def vectorizeData(data):
  #return data.rdd.map(lambda r: [r[-1], Vectors.dense(r[:-1])]).toDF(['label','features'])
  return data.rdd.map(lambda x: Row(label=x[-1], features=Vectors.dense(x[:-1]))).toDF()

#vectorized_CV_data = vectorizeData(stratified_CV_data)
vectorized_train = vectorizeData(train)

# Index labels, adding metadata to the label column
labelIndexer = StringIndexer(inputCol='label',
                             outputCol='indexedLabel').fit(vectorized_train)

# Automatically identify categorical features and index them
featureIndexer = VectorIndexer(inputCol='features',
                               outputCol='indexedFeatures',
                               maxCategories=2).fit(vectorized_train)

In [17]:
train.show(3)

+--------------+------------------+---------------+---------------------+-----------------+---------------+-----------------+---------------+-------------------+-----------------+------------------+----------------+----------------------+-----+
|Account length|International plan|Voice mail plan|Number vmail messages|Total day minutes|Total day calls|Total eve minutes|Total eve calls|Total night minutes|Total night calls|Total intl minutes|Total intl calls|Customer service calls|Churn|
+--------------+------------------+---------------+---------------------+-----------------+---------------+-----------------+---------------+-------------------+-----------------+------------------+----------------+----------------------+-----+
|         117.0|               0.0|            0.0|                  0.0|            184.5|           97.0|            351.6|           80.0|              215.8|             90.0|               8.7|             4.0|                   1.0|  0.0|
|          65.0|    

In [18]:
# Step 7
# Imports
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.sql import Row
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier

In [19]:
# decision tree model
dTree = DecisionTreeClassifier(labelCol='indexedLabel', featuresCol='indexedFeatures')

In [20]:
# Step 8

pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dTree])

In [21]:
train.printSchema()

root
 |-- Account length: double (nullable = true)
 |-- International plan: double (nullable = true)
 |-- Voice mail plan: double (nullable = true)
 |-- Number vmail messages: double (nullable = true)
 |-- Total day minutes: double (nullable = true)
 |-- Total day calls: double (nullable = true)
 |-- Total eve minutes: double (nullable = true)
 |-- Total eve calls: double (nullable = true)
 |-- Total night minutes: double (nullable = true)
 |-- Total night calls: double (nullable = true)
 |-- Total intl minutes: double (nullable = true)
 |-- Total intl calls: double (nullable = true)
 |-- Customer service calls: double (nullable = true)
 |-- Churn: double (nullable = true)



In [22]:
# Step 9
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Search through decision tree's maxDepth parameter for best model
paramGrid = ParamGridBuilder().addGrid(dTree.maxDepth, [2,3,4,5,6,7]).build()

# Set F-1 score as evaluation metric for best model selection
evaluator = MulticlassClassificationEvaluator(labelCol='indexedLabel',
                                              predictionCol='prediction', metricName='f1')    

# Set up 3-fold cross validation
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

CV_model = crossval.fit(vectorized_train)

# Fetch best model
tree_model = CV_model.bestModel.stages[2]
print(tree_model)
#print(tree_model.toDebugString())

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_493fdabc4795, depth=7, numNodes=55, numClasses=2, numFeatures=13


In [24]:
# Step 10
# transform test data
vectorized_test_data = vectorizeData(test)

transformed_data = CV_model.transform(vectorized_test_data)
print(evaluator.getMetricName(), 'accuracy:', evaluator.evaluate(transformed_data))

f1 accuracy: 0.9735853631177753
