In [4]:
import findspark

In [5]:
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')

In [6]:
import pyspark

In [7]:
import pandas

In [8]:
import numpy

In [9]:
from pyspark.sql import SparkSession

# Creating a SparkSession object - spark
spark = SparkSession.builder.getOrCreate()

In [10]:
# Loading the data through spark.read.csv function
train = spark.read.csv('tr2.csv', inferSchema= True, header = True)

In [11]:
train.show()

+------+--------+-----------------+-----------------+---------------+------------------+-------------+----+---+------+--------------+-----------+----------+------------+-----------+------------------+-----------------+-------------------+----------------------+---------------------+----------------+
|  msno|is_churn|payment_method_id|payment_plan_days|plan_list_price|actual_amount_paid|is_auto_renew|city| bd|gender|registered_via|trans_month|trans_year|expire_month|expire_year|registration_month|registration_year|membership_duration|autorenew_._not_cancel|registration_duration|reg_mem_duration|
+------+--------+-----------------+-----------------+---------------+------------------+-------------+----+---+------+--------------+-----------+----------+------------+-----------+------------------+-----------------+-------------------+----------------------+---------------------+----------------+
|179055|       0|               41|               30|            129|               129|         

In [12]:
train.printSchema()

root
 |-- msno: integer (nullable = true)
 |-- is_churn: integer (nullable = true)
 |-- payment_method_id: integer (nullable = true)
 |-- payment_plan_days: integer (nullable = true)
 |-- plan_list_price: integer (nullable = true)
 |-- actual_amount_paid: integer (nullable = true)
 |-- is_auto_renew: integer (nullable = true)
 |-- city: integer (nullable = true)
 |-- bd: integer (nullable = true)
 |-- gender: integer (nullable = true)
 |-- registered_via: integer (nullable = true)
 |-- trans_month: integer (nullable = true)
 |-- trans_year: integer (nullable = true)
 |-- expire_month: integer (nullable = true)
 |-- expire_year: integer (nullable = true)
 |-- registration_month: integer (nullable = true)
 |-- registration_year: integer (nullable = true)
 |-- membership_duration: integer (nullable = true)
 |-- autorenew_._not_cancel: integer (nullable = true)
 |-- registration_duration: integer (nullable = true)
 |-- reg_mem_duration: integer (nullable = true)



In [13]:
train.count()

188

In [14]:
from pyspark.ml.feature import VectorAssembler

In [15]:
#Setting the input-output feature names
assembler_bhai = VectorAssembler(inputCols = ['msno',
 'payment_method_id',
 'payment_plan_days',
 'plan_list_price',
 'actual_amount_paid',
 'is_auto_renew',
 'city',
 'bd',
 'gender',
 'registered_via',
 'trans_month',
 'trans_year',
 'expire_month',
 'expire_year',
 'registration_month',
 'registration_year',
 'membership_duration',
 'registration_duration',
 'reg_mem_duration'], outputCol = 'features')

In [16]:
output_bhai = assembler_bhai.transform(train)

In [18]:
#This gives all the features including the target variable and features as a vector which only need to pas to the model
output_bhai.printSchema()

root
 |-- msno: integer (nullable = true)
 |-- is_churn: integer (nullable = true)
 |-- payment_method_id: integer (nullable = true)
 |-- payment_plan_days: integer (nullable = true)
 |-- plan_list_price: integer (nullable = true)
 |-- actual_amount_paid: integer (nullable = true)
 |-- is_auto_renew: integer (nullable = true)
 |-- city: integer (nullable = true)
 |-- bd: integer (nullable = true)
 |-- gender: integer (nullable = true)
 |-- registered_via: integer (nullable = true)
 |-- trans_month: integer (nullable = true)
 |-- trans_year: integer (nullable = true)
 |-- expire_month: integer (nullable = true)
 |-- expire_year: integer (nullable = true)
 |-- registration_month: integer (nullable = true)
 |-- registration_year: integer (nullable = true)
 |-- membership_duration: integer (nullable = true)
 |-- autorenew_._not_cancel: integer (nullable = true)
 |-- registration_duration: integer (nullable = true)
 |-- reg_mem_duration: integer (nullable = true)
 |-- features: vector (null

In [19]:
final_data_bhai = output_bhai.select('features', 'is_churn')

In [20]:
# Splitting the data into train and test
train_is_churn, test_is_churn = final_data_bhai.randomSplit([0.7,0.3])

In [21]:
# importing the Logistic Regression function
from pyspark.ml.classification import LogisticRegression

In [22]:
#Assigning the target variable as is_churn
lr_churn = LogisticRegression(labelCol = 'is_churn')

In [23]:
fitted_is_churn_model = lr_churn.fit(train_is_churn)

In [25]:
#importing the evaluator function
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [26]:
#Evaluating the learned model to be evaluated on the test data
pred_and_labels = fitted_is_churn_model.evaluate(test_is_churn)

In [51]:
pred_and_labels

<pyspark.ml.classification.BinaryLogisticRegressionSummary at 0x7f586206bf28>

In [27]:
pred_and_labels.predictions.show()

+--------------------+--------+--------------------+--------------------+----------+
|            features|is_churn|       rawPrediction|         probability|prediction|
+--------------------+--------+--------------------+--------------------+----------+
|[733.0,39.0,30.0,...|       1|[-0.0645938833622...|[0.48385714160035...|       1.0|
|[1248.0,38.0,30.0...|       1|[-0.0646676042740...|[0.48383873060540...|       1.0|
|[3254.0,39.0,30.0...|       0|[-0.0645459789933...|[0.48386910521835...|       1.0|
|[8905.0,41.0,30.0...|       1|[-0.0645784202072...|[0.48386100336049...|       1.0|
|[10012.0,38.0,30....|       1|[-0.0646014152026...|[0.48385526060322...|       1.0|
|[11173.0,34.0,30....|       0|[-0.0644323208247...|[0.48389749023798...|       1.0|
|[16197.0,39.0,30....|       0|[-0.0644954197925...|[0.48388173187304...|       1.0|
|[17533.0,41.0,30....|       1|[-0.0646247453167...|[0.48384943415795...|       1.0|
|[30604.0,38.0,30....|       1|[-0.0646253859617...|[0.4838492741

In [28]:
churn_eval = BinaryClassificationEvaluator(rawPredictionCol = 'prediction',labelCol='is_churn')

In [None]:
churn_eval_acc = BinaryClassificationEvaluator(rawPredictionCol = 'prediction',labelCol='is_churn',
                                          metrics = "accuracy")

In [29]:
#auc of the model
auc = churn_eval.evaluate(pred_and_labels.predictions)

In [30]:
auc

0.5

In [31]:
# Importing the RandomForestClassifier
from pyspark.ml.classification import RandomForestClassifier

In [32]:
# Creating the instant of RandomForestClassifier with labels and features
rfc = RandomForestClassifier(labelCol = 'is_churn', featuresCol = 'features')

In [33]:
# Training the RandomForest Classifier model on the training data set
rfc_model = rfc.fit(train_is_churn)

In [34]:
rfc_model.featureImportances

SparseVector(19, {0: 0.0809, 1: 0.0322, 2: 0.0021, 3: 0.0238, 4: 0.0626, 5: 0.0802, 6: 0.0512, 7: 0.0373, 8: 0.0155, 9: 0.0092, 10: 0.0391, 11: 0.0376, 12: 0.0109, 13: 0.0451, 14: 0.0395, 15: 0.0359, 16: 0.1842, 17: 0.1032, 18: 0.1094})

In [36]:
# Evaluating the learned model to be evaluated on the test data
pred_and_labels_rfc = rfc_model.transform(test_is_churn)

In [55]:
pred_and_labels_rfc.select("prediction", "is_churn", "features").show(5)

+----------+--------+--------------------+
|prediction|is_churn|            features|
+----------+--------+--------------------+
|       0.0|       1|[733.0,39.0,30.0,...|
|       1.0|       1|[1248.0,38.0,30.0...|
|       1.0|       0|[3254.0,39.0,30.0...|
|       1.0|       1|[8905.0,41.0,30.0...|
|       0.0|       1|[10012.0,38.0,30....|
+----------+--------+--------------------+
only showing top 5 rows



In [50]:
pred_and_labels_rfc

DataFrame[features: vector, is_churn: int, rawPrediction: vector, probability: vector, prediction: double]

In [38]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [41]:
# Computing test error
evaluator = MulticlassClassificationEvaluator(labelCol = 'is_churn', 
                                          predictionCol = 'prediction',
                                          metricName = "accuracy")

In [42]:
accuracy = evaluator.evaluate(pred_and_labels_rfc)

In [43]:
accuracy

0.59375

In [81]:
test_is_churn.count()

64

In [94]:
test_is_churn

DataFrame[features: vector, is_churn: int]

In [98]:
test_is_churn.select('features').show(2)

+--------------------+
|            features|
+--------------------+
|[733.0,39.0,30.0,...|
|[1248.0,38.0,30.0...|
+--------------------+
only showing top 2 rows

