## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [2]:
# File location and type
file_location = "/FileStore/tables/WA_Fn_UseC__Telco_Customer_Churn-89c80.csv"
file_type = "csv"

# CSV options
infer_schema = "True"
first_row_is_header = "True"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .option("nanValue", ' ') \
  .option("nullValue", ' ') \
  .load(file_location)

display(df)

customerID,gender,SeniorCitizen,Partner,Dependents,tenure,PhoneService,MultipleLines,InternetService,OnlineSecurity,OnlineBackup,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,Churn
7590-VHVEG,Female,0,Yes,No,1,No,No phone service,DSL,No,Yes,No,No,No,No,Month-to-month,Yes,Electronic check,29.85,29.85,No
5575-GNVDE,Male,0,No,No,34,Yes,No,DSL,Yes,No,Yes,No,No,No,One year,No,Mailed check,56.95,1889.5,No
3668-QPYBK,Male,0,No,No,2,Yes,No,DSL,Yes,Yes,No,No,No,No,Month-to-month,Yes,Mailed check,53.85,108.15,Yes
7795-CFOCW,Male,0,No,No,45,No,No phone service,DSL,Yes,No,Yes,Yes,No,No,One year,No,Bank transfer (automatic),42.3,1840.75,No
9237-HQITU,Female,0,No,No,2,Yes,No,Fiber optic,No,No,No,No,No,No,Month-to-month,Yes,Electronic check,70.7,151.65,Yes
9305-CDSKC,Female,0,No,No,8,Yes,Yes,Fiber optic,No,No,Yes,No,Yes,Yes,Month-to-month,Yes,Electronic check,99.65,820.5,Yes
1452-KIOVK,Male,0,No,Yes,22,Yes,Yes,Fiber optic,No,Yes,No,No,Yes,No,Month-to-month,Yes,Credit card (automatic),89.1,1949.4,No
6713-OKOMC,Female,0,No,No,10,No,No phone service,DSL,Yes,No,No,No,No,No,Month-to-month,No,Mailed check,29.75,301.9,No
7892-POOKP,Female,0,Yes,No,28,Yes,Yes,Fiber optic,No,No,Yes,Yes,Yes,Yes,Month-to-month,Yes,Electronic check,104.8,3046.05,Yes
6388-TABGU,Male,0,No,Yes,62,Yes,No,DSL,Yes,Yes,No,No,No,No,One year,No,Bank transfer (automatic),56.15,3487.95,No


In [3]:
from pyspark.sql.functions import isnan,when,count,col
display(df.select([count(when(isnan(c) | col(c).isNull(),c)).alias(c) for c in df.columns]))

customerID,gender,SeniorCitizen,Partner,Dependents,tenure,PhoneService,MultipleLines,InternetService,OnlineSecurity,OnlineBackup,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,Churn
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,11,0


In [4]:
# Create a view or table

temp_table_name = "churn_analysis"

df.createOrReplaceTempView(temp_table_name)

In [5]:
%sql

/* Query the created temp table in a SQL cell */

select gender,count(churn) from churn_analysis group by gender

gender,count(churn)
Female,3488
Male,3555


In [6]:
%sql
select gender,avg(MonthlyCharges) from churn_analysis group by gender


gender,avg(MonthlyCharges)
Female,65.20424311926602
Male,64.32748241912773


In [7]:
df.printSchema()

In [8]:
%sql
select * from churn_analysis

customerID,gender,SeniorCitizen,Partner,Dependents,tenure,PhoneService,MultipleLines,InternetService,OnlineSecurity,OnlineBackup,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,Churn
7590-VHVEG,Female,0,Yes,No,1,No,No phone service,DSL,No,Yes,No,No,No,No,Month-to-month,Yes,Electronic check,29.85,29.85,No
5575-GNVDE,Male,0,No,No,34,Yes,No,DSL,Yes,No,Yes,No,No,No,One year,No,Mailed check,56.95,1889.5,No
3668-QPYBK,Male,0,No,No,2,Yes,No,DSL,Yes,Yes,No,No,No,No,Month-to-month,Yes,Mailed check,53.85,108.15,Yes
7795-CFOCW,Male,0,No,No,45,No,No phone service,DSL,Yes,No,Yes,Yes,No,No,One year,No,Bank transfer (automatic),42.3,1840.75,No
9237-HQITU,Female,0,No,No,2,Yes,No,Fiber optic,No,No,No,No,No,No,Month-to-month,Yes,Electronic check,70.7,151.65,Yes
9305-CDSKC,Female,0,No,No,8,Yes,Yes,Fiber optic,No,No,Yes,No,Yes,Yes,Month-to-month,Yes,Electronic check,99.65,820.5,Yes
1452-KIOVK,Male,0,No,Yes,22,Yes,Yes,Fiber optic,No,Yes,No,No,Yes,No,Month-to-month,Yes,Credit card (automatic),89.1,1949.4,No
6713-OKOMC,Female,0,No,No,10,No,No phone service,DSL,Yes,No,No,No,No,No,Month-to-month,No,Mailed check,29.75,301.9,No
7892-POOKP,Female,0,Yes,No,28,Yes,Yes,Fiber optic,No,No,Yes,Yes,Yes,Yes,Month-to-month,Yes,Electronic check,104.8,3046.05,Yes
6388-TABGU,Male,0,No,Yes,62,Yes,No,DSL,Yes,Yes,No,No,No,No,One year,No,Bank transfer (automatic),56.15,3487.95,No


In [9]:
%sql
select churn,count(*) from churn_analysis group by churn

churn,count(1)
No,5174
Yes,1869


In [10]:
df.select('tenure','TotalCharges','MonthlyCharges').describe().show()

In [11]:
%sql
select gender,churn,count(*) as count from churn_analysis group by gender,churn

gender,churn,count
Male,No,2625
Male,Yes,930
Female,No,2549
Female,Yes,939


In [12]:
%sql
select SeniorCitizen,churn,count(churn)as count from churn_analysis group by SeniorCitizen, churn

SeniorCitizen,churn,count
1,No,666
0,No,4508
0,Yes,1393
1,Yes,476


In [13]:
%sql
select cast (tenure as int),churn,count(churn)  from churn_analysis group by tenure, churn order by cast (tenure as int)

tenure,churn,count(churn)
0,No,11
1,No,233
1,Yes,380
2,No,115
2,Yes,123
3,Yes,94
3,No,106
4,No,93
4,Yes,83
5,No,69


In [14]:
%sql
select PaymentMethod,churn,count(*) as count from churn_analysis group by PaymentMethod,churn

PaymentMethod,churn,count
Credit card (automatic),No,1290
Bank transfer (automatic),No,1286
Mailed check,Yes,308
Credit card (automatic),Yes,232
Electronic check,No,1294
Electronic check,Yes,1071
Bank transfer (automatic),Yes,258
Mailed check,No,1304


In [15]:
df.groupBy('churn').count().show()

In [16]:
df.stat.crosstab('SeniorCitizen','InternetService').show()

In [17]:
df.stat.freqItems(['PhoneService','MultipleLines','InternetService','OnlineSecurity','OnlineBackup','DeviceProtection','TechSupport','StreamingTV','StreamingMovies'],0.6).collect()

In [18]:
train, test = df.randomSplit([0.7, 0.3], seed = 24)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

In [19]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler



CategoricalColumns=['gender','SeniorCitizen','Partner','Dependents','PhoneService','MultipleLines','InternetService',
           'OnlineSecurity','OnlineBackup','DeviceProtection','TechSupport','StreamingTV','StreamingMovies','Contract','PaperlessBilling','PaymentMethod']

In [20]:
stages = []

for categoricalCol in CategoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "catVec"])
    stages += [stringIndexer, encoder]

In [21]:
stages

In [22]:
from pyspark.ml.feature import Imputer 

imputer=Imputer(inputCols=['TotalCharges'],outputCols=['TotalCharges'])
stages += [imputer]

In [23]:
label_Idx=StringIndexer(inputCol='Churn',outputCol='label')
stages += [label_Idx]

In [24]:
from pyspark.ml.feature import QuantileDiscretizer

tenure_bin=QuantileDiscretizer(numBuckets=3,inputCol='tenure',outputCol='tenure_bin')
stages += [tenure_bin]

In [25]:
stages

In [26]:
numericCols = ['MonthlyCharges','TotalCharges','tenure_bin']
assemblerInputs = [c + "catVec" for c in CategoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [27]:

from pyspark.ml import Pipeline
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(train)

In [28]:
trainprepdf=pipelineModel.transform(train)
testprepdf=pipelineModel.transform(test)


In [29]:
display(trainprepdf.head(1))

customerID,gender,SeniorCitizen,Partner,Dependents,tenure,PhoneService,MultipleLines,InternetService,OnlineSecurity,OnlineBackup,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,Churn,genderIndex,gendercatVec,SeniorCitizenIndex,SeniorCitizencatVec,PartnerIndex,PartnercatVec,DependentsIndex,DependentscatVec,PhoneServiceIndex,PhoneServicecatVec,MultipleLinesIndex,MultipleLinescatVec,InternetServiceIndex,InternetServicecatVec,OnlineSecurityIndex,OnlineSecuritycatVec,OnlineBackupIndex,OnlineBackupcatVec,DeviceProtectionIndex,DeviceProtectioncatVec,TechSupportIndex,TechSupportcatVec,StreamingTVIndex,StreamingTVcatVec,StreamingMoviesIndex,StreamingMoviescatVec,ContractIndex,ContractcatVec,PaperlessBillingIndex,PaperlessBillingcatVec,PaymentMethodIndex,PaymentMethodcatVec,label,tenure_bin,features
0004-TLHLJ,Male,0,No,No,4,Yes,No,Fiber optic,No,No,Yes,No,No,No,Month-to-month,Yes,Electronic check,73.9,280.85,Yes,1.0,"List(0, 1, List(), List())",0.0,"List(0, 1, List(0), List(1.0))",0.0,"List(0, 1, List(0), List(1.0))",0.0,"List(0, 1, List(0), List(1.0))",0.0,"List(0, 1, List(0), List(1.0))",0.0,"List(0, 2, List(0), List(1.0))",0.0,"List(0, 2, List(0), List(1.0))",0.0,"List(0, 2, List(0), List(1.0))",0.0,"List(0, 2, List(0), List(1.0))",1.0,"List(0, 2, List(1), List(1.0))",0.0,"List(0, 2, List(0), List(1.0))",0.0,"List(0, 2, List(0), List(1.0))",0.0,"List(0, 2, List(0), List(1.0))",0.0,"List(0, 2, List(0), List(1.0))",0.0,"List(0, 1, List(0), List(1.0))",0.0,"List(0, 3, List(0), List(1.0))",1.0,0.0,"List(0, 30, List(1, 2, 3, 4, 5, 7, 9, 11, 14, 15, 17, 19, 21, 23, 24, 27, 28), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 73.9, 280.85))"


In [30]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10)
lrModel = lr.fit(trainprepdf)

In [31]:
print('Coefficents:  ' + str(lrModel.coefficients))
print('Intercept:  ' + str(lrModel.intercept))

In [32]:
summary=lrModel.summary

In [33]:
accuracy=summary.accuracy
FPR=summary.weightedFalsePositiveRate
TPR=summary.weightedTruePositiveRate
FMeasure=summary.weightedFMeasure()
Precision=summary.weightedPrecision
Recall=summary.weightedRecall

print('Accuracy: %s\nFPR: %s\nTPR: %s\nF-Measure: %s\nPrecision: %s\nRecall :%s\nAUC :%s'
%(accuracy,FPR,TPR,FMeasure,Precision,Recall,summary.areaUnderROC))




In [34]:
display(lrModel,trainprepdf,'ROC')

False Positive Rate,True Positive Rate,Threshold
0.0,0.0,0.8331148512857081
0.0,0.0384615384615384,0.8331148512857081
0.0153846153846153,0.0384615384615384,0.779705751736382
0.0153846153846153,0.0769230769230769,0.7586357194568452
0.0153846153846153,0.1153846153846153,0.6987165453214431
0.0307692307692307,0.1153846153846153,0.6915118402949199
0.0307692307692307,0.1538461538461538,0.6860604944548897
0.0461538461538461,0.1538461538461538,0.6450038522648784
0.0461538461538461,0.1923076923076923,0.6429123845502271
0.0461538461538461,0.2307692307692307,0.62952495803386


In [35]:
predictions = lrModel.transform(testprepdf)

In [36]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction')
area_under_curve=evaluator.evaluate(predictions)

print('AUC = %g'% area_under_curve)

In [37]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics

results=predictions.select(['prediction','label'])

results_collect = results.collect()
results_list = [(float(i[0]),float(i[1]))  for i in results_collect ]
predictionAndLabels = sc.parallelize(results_list)

metrics = BinaryClassificationMetrics(predictionAndLabels)

print('Area under ROC = %s'% metrics.areaUnderROC)

print('Area under PR = %s'% metrics.areaUnderPR)




In [38]:
results=predictions.select(['prediction','label'])
count=predictions.count()
correct=results.filter(results.prediction == results.label).count()
wrong=results.filter(results.prediction != results.label).count()
tp=results.filter(results.prediction == 1.0).filter(results.prediction == results.label).count()
fp=results.filter(results.prediction == 1.0).filter(results.prediction != results.label).count()
tn=results.filter(results.prediction == 0.0).filter(results.prediction == results.label).count()
fn=results.filter(results.prediction == 0.0).filter(results.prediction != results.label).count()

accuracy= (tp+tn)/count

precision = tp/(tp+fp)

recall= tp/(tp+fn)


print('correct: %s\nwrong: %s\nTP: %s\nFP: %s\nTN: %s\nFN: %s\nAccuracy: %s\nPrecision: %s\nRcall: %s'
%(correct,wrong,tp,fp,tn,fn,accuracy,precision,recall))


In [39]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())



In [40]:
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

cvModel = cv.fit(trainprepdf)


In [41]:
predictions = cvModel.bestModel.transform(testprepdf)


In [42]:
print('Test Area Under ROC', evaluator.evaluate(predictions))

In [43]:
results=predictions.select(['prediction','label'])
count=predictions.count()
correct=results.filter(results.prediction == results.label).count()
wrong=results.filter(results.prediction != results.label).count()
tp=results.filter(results.prediction == 1.0).filter(results.prediction == results.label).count()
fp=results.filter(results.prediction == 1.0).filter(results.prediction != results.label).count()
tn=results.filter(results.prediction == 0.0).filter(results.prediction == results.label).count()
fn=results.filter(results.prediction == 0.0).filter(results.prediction != results.label).count()

accuracy= (tp+tn)/count

precision = tp/(tp+fp)

recall= tp/(tp+fn)


print('correct: %s\nwrong: %s\nTP: %s\nFP: %s\nTN: %s\nFN: %s\nAccuracy: %s\nPrecision: %s\nRcall: %s'
%(correct,wrong,tp,fp,tn,fn,accuracy,precision,recall))

In [44]:
cvModel.explainParams()

In [45]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
rfModel = rf.fit(trainprepdf)
predictions = rfModel.transform(testprepdf)

In [46]:
results=predictions.select(['prediction','label'])
count=predictions.count()
correct=results.filter(results.prediction == results.label).count()
wrong=results.filter(results.prediction != results.label).count()
tp=results.filter(results.prediction == 1.0).filter(results.prediction == results.label).count()
fp=results.filter(results.prediction == 1.0).filter(results.prediction != results.label).count()
tn=results.filter(results.prediction == 0.0).filter(results.prediction == results.label).count()
fn=results.filter(results.prediction == 0.0).filter(results.prediction != results.label).count()

accuracy= (tp+tn)/count

precision = tp/(tp+fp)

recall= tp/(tp+fn)


print('correct: %s\nwrong: %s\nTP: %s\nFP: %s\nTN: %s\nFN: %s\nAccuracy: %s\nPrecision: %s\nRcall: %s'
%(correct,wrong,tp,fp,tn,fn,accuracy,precision,recall))