In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-eu.apache.org/dist/spark/spark-2.3.3/spark-2.3.3-bin-hadoop2.7.tgz
!tar xf spark-2.3.3-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.3-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

In [0]:
spark = SparkSession.builder.appName('mylogreg').getOrCreate()

In [0]:
from pyspark.ml.classification import LogisticRegression

In [0]:
my_data = spark.read.format('libsvm').load('sample_libsvm_data.txt')

In [7]:
my_data.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(692,[127,128,129...|
|  1.0|(692,[158,159,160...|
|  1.0|(692,[124,125,126...|
|  1.0|(692,[152,153,154...|
|  1.0|(692,[151,152,153...|
|  0.0|(692,[129,130,131...|
|  1.0|(692,[158,159,160...|
|  1.0|(692,[99,100,101,...|
|  0.0|(692,[154,155,156...|
|  0.0|(692,[127,128,129...|
|  1.0|(692,[154,155,156...|
|  0.0|(692,[153,154,155...|
|  0.0|(692,[151,152,153...|
|  1.0|(692,[129,130,131...|
|  0.0|(692,[154,155,156...|
|  1.0|(692,[150,151,152...|
|  0.0|(692,[124,125,126...|
|  0.0|(692,[152,153,154...|
|  1.0|(692,[97,98,99,12...|
|  1.0|(692,[124,125,126...|
+-----+--------------------+
only showing top 20 rows



In [0]:
my_log_reg_model = LogisticRegression()

In [0]:
fitted_logreg = my_log_reg_model.fit(my_data)

In [0]:
log_summary = fitted_logreg.summary

In [12]:
log_summary.predictions.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(692,[127,128,129...|[19.8534775947478...|[0.99999999761359...|       0.0|
|  1.0|(692,[158,159,160...|[-20.377398194908...|[1.41321555111056...|       1.0|
|  1.0|(692,[124,125,126...|[-27.401459284891...|[1.25804865126979...|       1.0|
|  1.0|(692,[152,153,154...|[-18.862741612668...|[6.42710509170303...|       1.0|
|  1.0|(692,[151,152,153...|[-20.483011833009...|[1.27157209200604...|       1.0|
|  0.0|(692,[129,130,131...|[19.8506078990277...|[0.99999999760673...|       0.0|
|  1.0|(692,[158,159,160...|[-20.337256674833...|[1.47109814695581...|       1.0|
|  1.0|(692,[99,100,101,...|[-19.595579753418...|[3.08850168102631...|       1.0|
|  0.0|(692,[154,155,156...|[19.2708803215613...|[0.99999999572670...|       0.0|
|  0.0|(692,[127

In [0]:
lr_train,lr_test = my_data.randomSplit([0.7,0.3])

In [0]:
final_model = LogisticRegression()

In [0]:
fit_final = final_model.fit(lr_train)

In [0]:
prediction_and_labels = fit_final.evaluate(lr_test)

In [18]:
prediction_and_labels.predictions.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(692,[95,96,97,12...|[23.4706906845332...|[0.99999999993590...|       0.0|
|  0.0|(692,[100,101,102...|[10.7816138335672...|[0.99997922239037...|       0.0|
|  0.0|(692,[122,123,124...|[20.3754981196014...|[0.99999999858409...|       0.0|
|  0.0|(692,[124,125,126...|[38.1505699977623...|[1.0,2.7003370083...|       0.0|
|  0.0|(692,[124,125,126...|[32.7128113696880...|[0.99999999999999...|       0.0|
|  0.0|(692,[124,125,126...|[22.4460510537293...|[0.99999999982143...|       0.0|
|  0.0|(692,[124,125,126...|[24.1401724547417...|[0.99999999996718...|       0.0|
|  0.0|(692,[126,127,128...|[19.0325586282103...|[0.99999999457668...|       0.0|
|  0.0|(692,[126,127,128...|[34.2432786950352...|[0.99999999999999...|       0.0|
|  0.0|(692,[126

### Using Evaluator object

In [0]:
from pyspark.ml.evaluation import (BinaryClassificationEvaluator,
                                  MulticlassClassificationEvaluator)

In [0]:
my_eval = BinaryClassificationEvaluator()

In [0]:
my_final_roc = my_eval.evaluate(prediction_and_labels.predictions)

In [22]:
my_final_roc

1.0

### Consulting Project

In [0]:
spark = SparkSession.builder.appName('logregconsult').getOrCreate()

In [0]:
data = spark.read.csv('customer_churn.csv',inferSchema=True,header=True)

In [25]:
data.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [26]:
data.describe().show()

+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+--------------------+--------------------+-------------------+
|summary|        Names|              Age|   Total_Purchase|   Account_Manager|            Years|         Num_Sites|            Location|             Company|              Churn|
+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+--------------------+--------------------+-------------------+
|  count|          900|              900|              900|               900|              900|               900|                 900|                 900|                900|
|   mean|         null|41.81666666666667|10062.82403333334|0.4811111111111111| 5.27315555555555| 8.587777777777777|                null|                null|0.16666666666666666|
| stddev|         null|6.127560416916251|2408.644531858096|0.4999208935073339|1.274449013194616|1.764835592035

In [27]:
data.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company',
 'Churn']

In [0]:
from pyspark.ml.feature import VectorAssembler

In [0]:
assembler = VectorAssembler(inputCols=['Age','Total_Purchase','Account_Manager',
                                       'Years','Num_Sites'],outputCol='features')

In [0]:
output = assembler.transform(data)

In [0]:
final_data = output.select('features','churn')

In [0]:
train_churn,test_churn = final_data.randomSplit([0.7,0.3])

In [0]:
from pyspark.ml.classification import LogisticRegression

In [0]:
lr_churn = LogisticRegression(labelCol='churn')

In [0]:
fitted_churn_model = lr_churn.fit(train_churn)

In [0]:
training_sum = fitted_churn_model.summary

In [37]:
training_sum.predictions.describe().show()

+-------+-------------------+-------------------+
|summary|              churn|         prediction|
+-------+-------------------+-------------------+
|  count|                656|                656|
|   mean|0.17378048780487804|0.13871951219512196|
| stddev|  0.379209753615349| 0.3459173540933844|
|    min|                0.0|                0.0|
|    max|                1.0|                1.0|
+-------+-------------------+-------------------+



In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [0]:
pred_and_labels = fitted_churn_model.evaluate(test_churn)

In [41]:
pred_and_labels.predictions.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|churn|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[25.0,9672.03,0.0...|    0|[4.49197935228333...|[0.98892556045734...|       0.0|
|[28.0,9090.43,1.0...|    0|[1.20666474700528...|[0.76970828251006...|       0.0|
|[30.0,6744.87,0.0...|    0|[3.27927817144655...|[0.96371104823905...|       0.0|
|[30.0,8403.78,1.0...|    0|[5.85393059451661...|[0.99713960367790...|       0.0|
|[30.0,11575.37,1....|    1|[3.89012181236200...|[0.97996668252449...|       0.0|
|[30.0,13473.35,0....|    0|[3.01076549917122...|[0.95305811336833...|       0.0|
|[31.0,5304.6,0.0,...|    0|[3.12003875302943...|[0.95771179768002...|       0.0|
|[31.0,5387.75,0.0...|    0|[2.03441817482296...|[0.88436366852759...|       0.0|
|[31.0,10182.6,1.0...|    0|[4.85918017203605...|[0.99230286488992...|       0.0|
|[31.0,11743.24,

In [0]:
churn_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='churn')

In [0]:
auc = churn_eval.evaluate(pred_and_labels.predictions)

In [44]:
auc

0.7863247863247863

In [0]:
final_lr_model = lr_churn.fit(final_data)

### Prediction on new data

In [0]:
new_customers = spark.read.csv('new_customers.csv',inferSchema=True,header=True)

In [47]:
new_customers.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)



In [0]:
test_new_customers = assembler.transform(new_customers)

In [50]:
test_new_customers.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- features: vector (nullable = true)



In [0]:
final_results = final_lr_model.transform(test_new_customers)

In [53]:
final_results.select('Company','prediction').show()

+----------------+----------+
|         Company|prediction|
+----------------+----------+
|        King Ltd|       0.0|
|   Cannon-Benson|       1.0|
|Barron-Robertson|       1.0|
|   Sexton-Golden|       1.0|
|        Wood LLC|       0.0|
|   Parks-Robbins|       1.0|
+----------------+----------+

