In [1]:
"""Logistic Regression Project: Used to predict the liklihood of existing customers churning, and validating the model on new customers to advise higher customer
service attention to the customers at a higher risk of churning"""

In [2]:
#Load PySpark
import pyspark

In [3]:
#Load and explore data
df = sqlContext.sql("SELECT * FROM customer_churn")
df.show()

In [4]:
#Explore data
[print(item) for item in df.head(1)[0]]

In [5]:
df.printSchema()

In [6]:
#Check for missing data
[df.where(df[column].isNull()).count() for column in df.columns]

In [7]:
df.describe().show()

In [8]:
print((df.count(), len(df.columns)))

In [9]:
from pyspark.ml.feature import VectorAssembler

In [10]:
df.columns

In [11]:
#Assemble features
assembler = VectorAssembler(inputCols=['Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites'], outputCol='features')

In [12]:
output = assembler.transform(df)
output.show()

In [13]:
#Create modeling dataframe
data = output.select(['features','churn'])

In [14]:
data.show()

In [15]:
#Split data into train, test sets
train, test = data.randomSplit([0.7,0.3])

In [16]:
from pyspark.ml.classification import LogisticRegression

In [17]:
#Initiate logistic regression
model = LogisticRegression(labelCol='churn')

In [18]:
fitted = model.fit(train)

In [19]:
summary = fitted.summary

In [20]:
summary.predictions.describe().show()

In [21]:
 from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [22]:
#Evaluate performance of predictions and their respective probabilities (compared to actual churn)
pred = fitted.evaluate(test)

In [23]:
pred.predictions.show()

In [24]:
#Initiate BinaryClassificationEvaluator to compare predictions with actual churn
churn = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='churn')

In [25]:
#Evaluate with AUC
auc = churn.evaluate(pred.predictions)
print(auc)

In [26]:
#Run model
val_model = model.fit(data)

In [27]:
#Load in new, unseen validation data
new_cust = sqlContext.sql("SELECT * FROM customers")

In [28]:
#Assemble new data features
test_new_cust = assembler.transform(new_cust)

In [29]:
test_new_cust.printSchema()

In [30]:
#Transform unseen data with model
final_results = val_model.transform(test_new_cust)

In [31]:
#Final results
final_results.select('Company', 'prediction','probability').show()