In [1]:
from pyspark.sql.types import *

schema = StructType([
    StructField("state_code", StringType(), True),
    StructField("account_length", IntegerType(), True),
    StructField("area_code", StringType(), True),
    StructField("international_plan", StringType(), True),
    StructField("voice_mail_plan", StringType(), True),
    StructField("num_voice_mail", DoubleType(), True),
    StructField("total_day_mins", DoubleType(), True),
    StructField("total_day_calls", DoubleType(), True),
    StructField("total_day_charge", DoubleType(), True),
    StructField("total_evening_mins", DoubleType(), True),
    StructField("total_evening_calls", DoubleType(), True),
    StructField("total_evening_charge", DoubleType(), True),
    StructField("total_night_mins", DoubleType(), True),
    StructField("total_night_calls", DoubleType(), True),
    StructField("total_night_charge", DoubleType(), True),
    StructField("total_international_mins", DoubleType(), True),
    StructField("total_international_calls", DoubleType(), True),
    StructField("total_international_charge", DoubleType(), True),
    StructField("total_international_num_calls", DoubleType(), True),
    StructField("churn", StringType(), True)
])

fileName = "dbfs:/FileStore/tables/orangeTelecom/churn_bigml_80.csv"

trainSet = (spark.read          # Our DataFrameReader
  .option("header", "true")      # Let Spark know we have a header
  .option("inferSchema", "false") # Infering the schema (it is a small dataset)
  .format("com.databricks.spark.csv")
  .csv(fileName, schema=schema, nullValue='NA') # Enforce the Schema 
  .cache()                       # Mark the DataFrame as cached.
)

fileName = "dbfs:/FileStore/tables/orangeTelecom/churn_bigml_20.csv"

testSet = (spark.read          # Our DataFrameReader
  .option("header", "true")      # Let Spark know we have a header
  .option("inferSchema", "false") # Infering the schema (it is a small dataset)
  .format("com.databricks.spark.csv")
  .csv(fileName, schema=schema, nullValue='NA') # Enforce the Schema 
  .cache()                       # Mark the DataFrame as cached.
)

trainDF = trainSet.drop("account_length", "state_code", "area_code", "voice_mail_plan", "total_day_mins", "total_evening_mins", "total_night_mins", "total_international_mins")
trainDF.na.drop()

In [2]:

major_df = trainDF.filter(trainDF.churn == False)
minor_df = trainDF.filter(trainDF.churn == True)
ratio = major_df.count()/minor_df.count()
print("ratio: {}".format(ratio))

minor_df_overampled = minor_df.sample(withReplacement=True, fraction=ratio, seed=1)
TrainDFbalanced = major_df.unionAll(minor_df_overampled)


In [3]:
print(TrainDFbalanced.filter(TrainDFbalanced.churn == False).count())
print(TrainDFbalanced.filter(TrainDFbalanced.churn == True).count())

In [4]:
TrainDFbalanced.schema

In [5]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import LogisticRegression, LinearSVC, DecisionTreeClassifier, RandomForestClassifier, GBTClassifier


# StringIndexer for categorical columns (OneHotEncoder should be evaluated as well)
ipindexer =  StringIndexer(
    inputCol="international_plan",
    outputCol="iplanIndex")

labelindexer =  StringIndexer(
    inputCol="churn",
    outputCol="label")


assemblerInputs  = [
                        "iplanIndex",  # Our new categorical features
                        "num_voice_mail", "total_day_charge", 
                        "total_day_calls", "total_evening_charge", 
                        "total_evening_calls", "total_night_charge", 
                        "total_night_calls", "total_international_charge", 
                        "total_international_calls", "total_international_num_calls"]        

vectorAssembler = VectorAssembler(
  inputCols=assemblerInputs, 
  outputCol="features")

 
lr = ( LogisticRegression()
        .setFeaturesCol("features")
        .setLabelCol("label")
     )

lsvc = ( LinearSVC()
        .setFeaturesCol("features")
        .setLabelCol("label")
     )

dt = ( DecisionTreeClassifier()
        .setFeaturesCol("features")
        .setLabelCol("label")
     )

rf = ( RandomForestClassifier()
        .setFeaturesCol("features")
        .setLabelCol("label")
     )

gbt = ( GBTClassifier()
        .setFeaturesCol("features")
        .setLabelCol("label")
     )

pipeline = Pipeline().setStages([
  ipindexer,  
  labelindexer,         
  vectorAssembler,         # assemble the feature vector for all columns
  rf])

#pipelineModel = pipeline.fit(TrainDFbalanced)

#predictionsDF = pipelineModel.transform(testSet)

#predictionsDF.select("churn", "label","prediction", "features", "probability", "account_length", "area_code", "international_plan").show()




In [6]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

numFolds = 3
MaxIter = 5
numberoftrees = [10, 50,100]
maximumdepth = [10, 20, 30]
RegParam = [0.1, 0.01] # L2 regularization param, set 1.0 with L1 regularization
Tol=1e-8 # for convergence tolerance for iterative algorithms
ElasticNetParam = [0.0, 0.5, 1.0] #Combination of L1 & L2

evaluator = ( BinaryClassificationEvaluator()
    .setLabelCol("label")
    .setRawPredictionCol("prediction"))

paramGrid = ParamGridBuilder()\
    .addGrid(rf.numTrees, numberoftrees)\
    .addGrid(rf.maxDepth, maximumdepth)\
    .build()                                               
#.addGrid(lr.regParam, RegParam) 
#    .addGrid(lr.elasticNetParam, ElasticNetParam)
#.addGrid(lr.fitIntercept, [False, True])    

cv = ( CrossValidator()
    .setEstimator(pipeline)
    .setEvaluator(evaluator)
    .setEstimatorParamMaps(paramGrid)
    .setNumFolds(numFolds))

cvModel = cv.fit(TrainDFbalanced)
predictions = cvModel.transform(testSet)
accuracy = evaluator.evaluate(predictions)
print("Classification accuracy: ", accuracy)



In [7]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
print('Area under ROC Curve', evaluator.evaluate(predictions))

# metrics
auroc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})
auprc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderPR"})

# format the output 
print("Area under receiver operating characteristic (ROC) - Curve: {:.4f}".format(auroc))
print("Area under precision-recall (PR) - Curve: {:.4f}".format(auprc))

In [8]:
# convert into RDD for Spark - MLlib
predictionAndLabels = predictions \
     .select("prediction","label") \
     .rdd \


In [9]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics

bcm = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC Curve: {:.4f}".format(bcm.areaUnderROC))
print("Area under PR Curve: {:.4f}".format(bcm.areaUnderPR))

In [10]:
cvModel.bestModel.extractParamMap()

In [11]:
lp = predictions.select("label", "prediction")
counttotal = predictions.count()
correct = lp.filter(predictions.label == predictions.prediction).count()

wrong = lp.filter((predictions.label != predictions.prediction)).count()
ratioWrong = wrong / counttotal
ratioCorrect = correct / counttotal

truep = lp.filter(predictions.prediction == 0.0).filter(predictions.label == predictions.prediction).count() / counttotal

truen = lp.filter(predictions.prediction == 1.0).filter(predictions.label == predictions.prediction).count() / counttotal

falsep = lp.filter(predictions.prediction == 1.0).filter((predictions.label != predictions.prediction)).count() / counttotal

falsen = lp.filter(predictions.prediction == 0.0).filter((predictions.label != predictions.prediction)).count() / counttotal

print("Total Count : ", counttotal)
print("Correct : ",  correct)
print("Wrong: ",  wrong)
print("Ratio wrong: " , ratioWrong)
print("Ratio correct: ",  ratioCorrect)
print("Ratio true positive : ",  truep)
print("Ratio false posiive : ",falsep)

In [12]:
fileName = "/tmp/churn/LR_Pipeline"
cvModel.bestModel.write().overwrite().save(fileName)