In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('logreg').getOrCreate()


In [5]:
data = spark.read.csv('customer_churn.csv',inferSchema=True,header=True)

In [6]:
data.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [7]:
data.describe().show()

23/11/15 23:12:22 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+--------------------+--------------------+-------------------+
|summary|        Names|              Age|   Total_Purchase|   Account_Manager|            Years|         Num_Sites|            Location|             Company|              Churn|
+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+--------------------+--------------------+-------------------+
|  count|          900|              900|              900|               900|              900|               900|                 900|                 900|                900|
|   mean|         NULL|41.81666666666667|10062.82403333334|0.4811111111111111| 5.27315555555555| 8.587777777777777|                NULL|                NULL|0.16666666666666666|
| stddev|         NULL|6.127560416916251|2408.644531858096|0.4999208935073339|1.274449013194616|1.764835592035

In [8]:
data.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company',
 'Churn']

In [9]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites'],outputCol='features')


In [10]:
output = assembler.transform(data)


In [11]:
final_data = output.select('features','churn')

In [12]:
train_churn,test_churn = final_data.randomSplit([0.7,0.3])


In [13]:
from pyspark.ml.classification import LogisticRegression


In [14]:
lr_churn = LogisticRegression(labelCol='churn')

In [15]:
fitted_churn_model = lr_churn.fit(train_churn)


23/11/15 23:14:03 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS


In [16]:
fitted_churn_model.summary.predictions.describe().show()

+-------+-------------------+-------------------+
|summary|              churn|         prediction|
+-------+-------------------+-------------------+
|  count|                623|                623|
|   mean|0.15890850722311398|0.12199036918138041|
| stddev| 0.3658845112032528| 0.3275376612257682|
|    min|                0.0|                0.0|
|    max|                1.0|                1.0|
+-------+-------------------+-------------------+



In [17]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [23]:
predictions_and_labels = fitted_churn_model.evaluate(test_churn)

In [24]:
predictions_and_labels .predictions.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|churn|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[27.0,8628.8,1.0,...|    0|[5.01552739085956...|[0.99340958932288...|       0.0|
|[28.0,11128.95,1....|    0|[3.86109818059670...|[0.97938888264967...|       0.0|
|[29.0,8688.17,1.0...|    1|[2.33654899721165...|[0.91185911487499...|       0.0|
|[29.0,10203.18,1....|    0|[3.42463544780733...|[0.96846564574552...|       0.0|
|[29.0,11274.46,1....|    0|[4.21265700469789...|[0.98540907363820...|       0.0|
|[29.0,12711.15,0....|    0|[5.17150227434791...|[0.99435600009489...|       0.0|
|[30.0,13473.35,0....|    0|[2.67783279253369...|[0.93570586657960...|       0.0|
|[31.0,5304.6,0.0,...|    0|[3.08766926153373...|[0.95638123893240...|       0.0|
|[31.0,7073.61,0.0...|    0|[2.80882219715338...|[0.94315070135031...|       0.0|
|[31.0,9574.89,0

In [25]:


churn_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',
                                           labelCol='churn')



In [27]:
auc = churn_eval.evaluate(predictions_and_labels .predictions)

In [28]:
auc

0.7840100642026723

In [29]:
final_lr_model = lr_churn.fit(final_data)

In [30]:
new_customers = spark.read.csv('new_customers.csv',inferSchema=True,header=True)

In [31]:
new_customers.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)



In [32]:
test_new_customers = assembler.transform(new_customers)

In [33]:
test_new_customers.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- features: vector (nullable = true)



In [34]:
final_results = final_lr_model.transform(test_new_customers)

In [35]:
final_results.select('Company','prediction').show()


+----------------+----------+
|         Company|prediction|
+----------------+----------+
|        King Ltd|       0.0|
|   Cannon-Benson|       1.0|
|Barron-Robertson|       1.0|
|   Sexton-Golden|       1.0|
|        Wood LLC|       0.0|
|   Parks-Robbins|       1.0|
+----------------+----------+



23/11/16 22:41:02 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 291840 ms exceeds timeout 120000 ms
23/11/16 22:41:02 WARN SparkContext: Killing executors is not supported by current scheduler.
23/11/16 22:41:02 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$