### Read the data file

In [1]:
import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()

'D:\\Application\\spark-2.4.7-bin-hadoop2.7'

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('churn_log_reg').getOrCreate()
df = spark.read.csv('customer_churn - customer_churn.csv', inferSchema = True, header = True)
df.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: integer (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [3]:
df.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company',
 'Churn']

### Summary Statistics

In [4]:
df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
Names,799,,,Aaron King,Zachary Walsh
Age,799,41.83729662077597,6.127800220285043,25,65
Total_Purchase,799,10070.314317897375,2400.0031427711797,100.0,16955.76
Account_Manager,799,0.4881101376720901,0.5001717077443761,0,1
Years,799,5.300375469336666,1.277305712783269,1.0,9.15
Num_Sites,799,8.67459324155194,1.756261352518546,3,14
Location,799,,,"00103 Jeffrey Crest Apt. 205 Padillaville, IA ...",Unit 9800 Box 2878 DPO AA 75157
Company,799,,,Abbott-Thompson,"Zuniga, Clark and Shaffer"
Churn,799,0.18773466833541927,0.3907447418973289,0,1


### Build the Pipeline

In [5]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

assembler = VectorAssembler(inputCols = ['Age', 'Total_Purchase', 'Account_Manager', 'Years', 'Num_Sites',], outputCol = 'features')
log_reg = LogisticRegression(featuresCol = 'features', labelCol = 'Churn', maxIter=10)
pipeline = Pipeline(stages = [assembler, log_reg])

In [6]:
train, test = df.randomSplit([0.7, 0.3])
lrModel = pipeline.fit(train)
predictions = lrModel.transform(test)

In [7]:
predictions.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: integer (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



### Make Predictions

In [8]:
predictions.select('Churn', 'prediction', 'probability', 'rawPrediction').show(4)

+-----+----------+--------------------+--------------------+
|Churn|prediction|         probability|       rawPrediction|
+-----+----------+--------------------+--------------------+
|    0|       0.0|[0.94205607182741...|[2.78858900974603...|
|    0|       0.0|[0.87804116659858...|[1.97400992266791...|
|    1|       0.0|[0.96547100560787...|[3.33081668272753...|
|    0|       0.0|[0.70120294114685...|[0.85303273747158...|
+-----+----------+--------------------+--------------------+
only showing top 4 rows



### Performance Evaluation

In [9]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

eval = BinaryClassificationEvaluator(labelCol = 'Churn', rawPredictionCol = 'rawPrediction')
eval.evaluate(predictions)

0.7442508710801381

### Predict on the new data

In [11]:
new_data = spark.read.csv('ikantongkol.csv', inferSchema=True, header=True)
new_data.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: integer (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [12]:
lrModel_new = pipeline.fit(df)

In [13]:
predictions_new = lrModel_new.transform(new_data)

In [14]:
predictions_new.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: integer (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [15]:
predictions_new.select('Names', 'Company', 'prediction').show()

+-------------------+--------------------+----------+
|              Names|             Company|prediction|
+-------------------+--------------------+----------+
|      Kenneth Johns|      Campbell-Moore|       0.0|
|      Jason Dickson|           Ayers Ltd|       0.0|
|       Johnny Jones|        Taylor Group|       0.0|
|      Rodney Miller|Leonard, Martinez...|       0.0|
|        Tracy Jones|Thompson, Bailey ...|       0.0|
|       Greg Swanson|      Jackson-Garcia|       0.0|
|    Bradley Bennett|Fisher, Moore and...|       0.0|
|   Michael Anderson|Matthews, Burns a...|       0.0|
| Katherine Sandoval|Monroe, Ferguson ...|       0.0|
|      James Hensley|      Leblanc-Torres|       0.0|
|Christopher Beasley|          Harris Inc|       0.0|
|        Kayla Lopez|          Castro PLC|       0.0|
|       Jonathan Ali|Osborne, Bailey a...|       0.0|
|  Kenneth Wilkerson|Aguilar, Kelly an...|       0.0|
|         Alex Banks|        Jones-Meyers|       0.0|
|    Brendan Higgins|       