In [8]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer, IndexToString
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.linalg import Vectors
from pyspark import SparkContext

In [9]:
spark = SparkSession \
.builder \
.appName("customer churn example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()


In [10]:
df = spark.read.csv("/Users/Mudit/churn_train.csv", header = True, inferSchema = True)
df.show(4)

+---+-------+-------+---------+--------+------+----------+------+------+-------+------+------+-------+------+-------+-------+------+------+--------+----+-------+
| st| acclen| arcode|    phnum| intplan| voice|nummailmes| tdmin| tdcal| tdchar| temin| tecal| tecahr| tnmin| tn cal| tnchar| timin| tical| tichar |ncsc|  label|
+---+-------+-------+---------+--------+------+----------+------+------+-------+------+------+-------+------+-------+-------+------+------+--------+----+-------+
| KS|    128|    415| 382-4657|      no|   yes|        25| 265.1|   110|  45.07| 197.4|    99|  16.78| 244.7|     91|  11.01|  10.0|     3|     2.7|   1| False.|
| OH|    107|    415| 371-7191|      no|   yes|        26| 161.6|   123|  27.47| 195.5|   103|  16.62| 254.4|    103|  11.45|  13.7|     3|     3.7|   1| False.|
| NJ|    137|    415| 358-1921|      no|    no|         0| 243.4|   114|  41.38| 121.2|   110|   10.3| 162.6|    104|   7.32|  12.2|     5|    3.29|   0| False.|
| OH|     84|    408| 375-99

In [11]:
df.printSchema()
from pyspark.ml.feature import StringIndexer


root
 |-- st: string (nullable = true)
 |--  acclen: integer (nullable = true)
 |--  arcode: integer (nullable = true)
 |--  phnum: string (nullable = true)
 |--  intplan: string (nullable = true)
 |--  voice: string (nullable = true)
 |-- nummailmes: integer (nullable = true)
 |--  tdmin: double (nullable = true)
 |--  tdcal: integer (nullable = true)
 |--  tdchar: double (nullable = true)
 |--  temin: double (nullable = true)
 |--  tecal: integer (nullable = true)
 |--  tecahr: double (nullable = true)
 |--  tnmin: double (nullable = true)
 |--  tn cal: integer (nullable = true)
 |--  tnchar: double (nullable = true)
 |--  timin: double (nullable = true)
 |--  tical: integer (nullable = true)
 |--  tichar : double (nullable = true)
 |-- ncsc: integer (nullable = true)
 |--  label: string (nullable = true)



In [12]:
indexer = StringIndexer(inputCol="st", outputCol="indSt")
indexed_df = indexer.fit(df).transform(df)

indexer = StringIndexer(inputCol=" phnum", outputCol="indPhnum")
indexed_df = indexer.fit(indexed_df).transform(indexed_df)

indexer = StringIndexer(inputCol=" intplan", outputCol="indIntplan")
indexed_df = indexer.fit(indexed_df).transform(indexed_df)

indexer = StringIndexer(inputCol=" voice", outputCol="indVoice")
indexed_df = indexer.fit(indexed_df).transform(indexed_df)
    
indexed_df.printSchema()

root
 |-- st: string (nullable = true)
 |--  acclen: integer (nullable = true)
 |--  arcode: integer (nullable = true)
 |--  phnum: string (nullable = true)
 |--  intplan: string (nullable = true)
 |--  voice: string (nullable = true)
 |-- nummailmes: integer (nullable = true)
 |--  tdmin: double (nullable = true)
 |--  tdcal: integer (nullable = true)
 |--  tdchar: double (nullable = true)
 |--  temin: double (nullable = true)
 |--  tecal: integer (nullable = true)
 |--  tecahr: double (nullable = true)
 |--  tnmin: double (nullable = true)
 |--  tn cal: integer (nullable = true)
 |--  tnchar: double (nullable = true)
 |--  timin: double (nullable = true)
 |--  tical: integer (nullable = true)
 |--  tichar : double (nullable = true)
 |-- ncsc: integer (nullable = true)
 |--  label: string (nullable = true)
 |-- indSt: double (nullable = false)
 |-- indPhnum: double (nullable = false)
 |-- indIntplan: double (nullable = false)
 |-- indVoice: double (nullable = false)



In [13]:
vectorAss = VectorAssembler(inputCols=[" acclen",
                                       " arcode",
                                       "nummailmes",
                                       " tdmin",
                                      " tdcal"," tdchar"," temin",
                                      " tecal"," tecahr"," tnmin"
                                      ]
                            ,outputCol="features")

fi_train_df = vectorAss.transform(df)
fi_train_df.show(100)

+---+-------+-------+---------+--------+------+----------+------+------+-------+------+------+-------+------+-------+-------+------+------+--------+----+-------+--------------------+
| st| acclen| arcode|    phnum| intplan| voice|nummailmes| tdmin| tdcal| tdchar| temin| tecal| tecahr| tnmin| tn cal| tnchar| timin| tical| tichar |ncsc|  label|            features|
+---+-------+-------+---------+--------+------+----------+------+------+-------+------+------+-------+------+-------+-------+------+------+--------+----+-------+--------------------+
| KS|    128|    415| 382-4657|      no|   yes|        25| 265.1|   110|  45.07| 197.4|    99|  16.78| 244.7|     91|  11.01|  10.0|     3|     2.7|   1| False.|[128.0,415.0,25.0...|
| OH|    107|    415| 371-7191|      no|   yes|        26| 161.6|   123|  27.47| 195.5|   103|  16.62| 254.4|    103|  11.45|  13.7|     3|     3.7|   1| False.|[107.0,415.0,26.0...|
| NJ|    137|    415| 358-1921|      no|    no|         0| 243.4|   114|  41.38| 121.

In [14]:
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=20).fit(fi_train_df)
    
fi_df = featureIndexer.transform(fi_train_df)

In [15]:
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=100, maxBins=64)

In [16]:
labelIndexer = StringIndexer(inputCol=" label", outputCol="indexedLabel").fit(fi_df)
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

In [17]:
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

In [19]:
model = pipeline.fit(fi_df)
TEST = spark.read.csv("churn_test.csv",inferSchema=True, header=True)

TEST.printSchema()

root
 |-- st: string (nullable = true)
 |--  acclen: integer (nullable = true)
 |--  arcode: integer (nullable = true)
 |--  phnum: string (nullable = true)
 |--  intplan: string (nullable = true)
 |--  voice: string (nullable = true)
 |-- nummailmes: integer (nullable = true)
 |--  tdmin: double (nullable = true)
 |--  tdcal: integer (nullable = true)
 |--  tdchar: double (nullable = true)
 |--  temin: double (nullable = true)
 |--  tecal: integer (nullable = true)
 |--  tecahr: double (nullable = true)
 |--  tnmin: double (nullable = true)
 |--  tn cal: integer (nullable = true)
 |--  tnchar: double (nullable = true)
 |--  timin: double (nullable = true)
 |--  tical: integer (nullable = true)
 |--  tichar : double (nullable = true)
 |-- ncsc: integer (nullable = true)
 |--  label: string (nullable = true)



In [20]:
indexer = StringIndexer(inputCol="st", outputCol="indSt")
TEST = indexer.fit(TEST).transform(TEST)

indexer = StringIndexer(inputCol=" phnum", outputCol="indPhnum")
TEST = indexer.fit(TEST).transform(TEST)

indexer = StringIndexer(inputCol=" intplan", outputCol="indIntplan")
TEST = indexer.fit(TEST).transform(TEST)

indexer = StringIndexer(inputCol=" voice", outputCol="indVoice")
TEST = indexer.fit(TEST).transform(TEST)



TEST.count()

1667

In [21]:
newvectorAss = VectorAssembler(inputCols=[" acclen",
                                       " arcode",
                                       "nummailmes",
                                       " tdmin",
                                      " tdcal"," tdchar"," temin",
                                      " tecal"," tecahr"," tnmin"
                                      ]
                            ,outputCol="features")

TEST = newvectorAss.transform(TEST)

In [22]:
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(TEST)

    
TEST = featureIndexer.transform(TEST)

In [23]:
predictions = model.transform(TEST)
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
print("Accuracy = %g" % (accuracy))

rfModel = model.stages[2]
print(rfModel)

Test Error = 0.0935813
Accuracy = 0.906419
RandomForestClassificationModel (uid=RandomForestClassifier_4634a9eefdc562b86416) with 100 trees


In [24]:
tp = predictions[(predictions.indexedLabel == 1) & (predictions.prediction == 1)].count()
tn = predictions[(predictions.indexedLabel == 0) & (predictions.prediction == 0)].count()
fp = predictions[(predictions.indexedLabel == 0) & (predictions.prediction == 1)].count()
fn = predictions[(predictions.indexedLabel == 1) & (predictions.prediction == 0)].count()
print ("True Positives:", tp)
print ("True Negatives:", tn)
print ("False Positives:", fp)
print ("False Negatives:", fn)
print ("Total", predictions.count())

r = float(tp)/(tp + fn)
print ("recall", r)

p = float(tp) / (tp + fp)
print ("precision - true", p)

p1 = float(tn) / (tn + fn)
print ("precision - false", p1)

True Positives: 71
True Negatives: 1440
False Positives: 3
False Negatives: 153
Total 1667
recall 0.3169642857142857
precision - true 0.9594594594594594
precision - false 0.903954802259887
