In [1]:
import findspark
findspark.init("/home/gagandeep/spark/spark-2.4.5-bin-hadoop2.7")
from pyspark.sql import SparkSession
import os

In [2]:
spark = SparkSession.builder.appName('logr').getOrCreate()

In [3]:
pathToFile = "/media/gagandeep/2E92405C92402AA3/Work/Codes/PythonCodes/SparkLesson/DataFiles/Spark_for_Machine_Learning/Logistic_Regression/titanic.csv"
data = spark.read.csv(pathToFile, inferSchema=True, header=True)

In [4]:
data.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [5]:
data.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [6]:
myCols = ['Survived',
 'Pclass',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Fare',
 'Embarked']

In [7]:
final_data = data.select(myCols).na.drop()

In [8]:
from pyspark.ml.feature import VectorAssembler, VectorIndexer, OneHotEncoder, StringIndexer

In [9]:
genderIndexer = StringIndexer(inputCol='Sex', outputCol='sex_indexed')
genderEncoder = OneHotEncoder(inputCol='sex_indexed', outputCol='sex_vec')
embarkedIndexer = StringIndexer(inputCol='Embarked', outputCol='embarked_indexed')
embarkedEncoder = OneHotEncoder(inputCol='embarked_indexed', outputCol='embarked_vec')
assembler = VectorAssembler(inputCols=['Pclass',
 'sex_vec',
 'Age',
 'SibSp',
 'Parch',
 'Fare',
 'embarked_vec'],outputCol='features')

In [10]:
# Build Spark Pipeline
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

In [11]:
log_reg_titanic = LogisticRegression(featuresCol='features', labelCol='Survived')

In [12]:
pipeline = Pipeline(stages=[genderIndexer,embarkedIndexer,
                           genderEncoder,embarkedEncoder,
                           assembler,log_reg_titanic])

In [13]:
train, test = final_data.randomSplit([0.7,0.3])

In [14]:
fit_model = pipeline.fit(train)

In [15]:
results = fit_model.transform(test)

In [16]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [21]:
my_eval = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="Survived")

In [22]:
results.select('Survived','prediction').show()

+--------+----------+
|Survived|prediction|
+--------+----------+
|       0|       0.0|
|       0|       1.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       1.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
+--------+----------+
only showing top 20 rows



In [23]:
AUC = my_eval.evaluate(results)
AUC

0.8115915697674418

In [24]:
#problem 2
pathToFile = "/media/gagandeep/2E92405C92402AA3/Work/Codes/PythonCodes/SparkLesson/DataFiles/Spark_for_Machine_Learning/Logistic_Regression/customer_churn.csv"
data = spark.read.csv(pathToFile, inferSchema=True, header=True)

In [25]:
data.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [26]:
data.describe().show()

+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+--------------------+--------------------+-------------------+
|summary|        Names|              Age|   Total_Purchase|   Account_Manager|            Years|         Num_Sites|            Location|             Company|              Churn|
+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+--------------------+--------------------+-------------------+
|  count|          900|              900|              900|               900|              900|               900|                 900|                 900|                900|
|   mean|         null|41.81666666666667|10062.82403333334|0.4811111111111111| 5.27315555555555| 8.587777777777777|                null|                null|0.16666666666666666|
| stddev|         null|6.127560416916251|2408.644531858096|0.4999208935073339|1.274449013194616|1.764835592035

In [28]:
data.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company',
 'Churn']

In [29]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites'], outputCol='features')

In [30]:
output = assembler.transform(data)

In [31]:
final_data = output.select('features', 'churn')

In [32]:
train, test = final_data.randomSplit([0.7,0.3])

In [34]:
from pyspark.ml.classification import LogisticRegression
lgr = LogisticRegression(labelCol='churn')
fit_data = lgr.fit(train)

In [37]:
fit_data.summary.predictions.describe().show()

+-------+-------------------+-------------------+
|summary|              churn|         prediction|
+-------+-------------------+-------------------+
|  count|                648|                648|
|   mean| 0.1743827160493827| 0.1419753086419753|
| stddev|0.37973136418191983|0.34929443478456074|
|    min|                0.0|                0.0|
|    max|                1.0|                1.0|
+-------+-------------------+-------------------+



In [42]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
pred_labels = fit_data.evaluate(test)

In [43]:
pred_labels.predictions.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|churn|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[28.0,8670.98,0.0...|    0|[8.37690824463786...|[0.99976993262070...|       0.0|
|[28.0,9090.43,1.0...|    0|[1.36065164443400...|[0.79586558659857...|       0.0|
|[29.0,5900.78,1.0...|    0|[4.09430656701825...|[0.98360594470461...|       0.0|
|[29.0,9378.24,0.0...|    0|[5.06781644847795...|[0.99374324039972...|       0.0|
|[29.0,9617.59,0.0...|    0|[4.63612658198162...|[0.99039791545319...|       0.0|
|[30.0,6744.87,0.0...|    0|[3.71361521564169...|[0.97619147906467...|       0.0|
|[30.0,7960.64,1.0...|    1|[3.56626558222971...|[0.97251554752342...|       0.0|
|[30.0,10183.98,1....|    0|[2.90412169057924...|[0.94804981110381...|       0.0|
|[30.0,10960.52,1....|    0|[2.25293399813136...|[0.90490331545542...|       0.0|
|[31.0,8829.83,1

In [44]:
churn_eval = BinaryClassificationEvaluator(rawPredictionCol = 'prediction', labelCol='churn')

In [47]:
auc = churn_eval.evaluate(pred_labels.predictions)
auc

0.7358265241986173

In [48]:
final_model = lgr.fit(final_data)

In [50]:
new_customers = spark.read.csv("/media/gagandeep/2E92405C92402AA3/Work/Codes/PythonCodes/SparkLesson/DataFiles/Spark_for_Machine_Learning/Logistic_Regression/new_customers.csv",inferSchema=True, header=True)

In [51]:
new_customers.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company']

In [54]:
test_new_customers = assembler.transform(new_customers)
test_new_customers.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company',
 'features']

In [55]:
new_predictions = final_model.transform(test_new_customers)

In [56]:
new_predictions.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [59]:
new_predictions.select('Company', 'prediction').show()

+----------------+----------+
|         Company|prediction|
+----------------+----------+
|        King Ltd|       0.0|
|   Cannon-Benson|       1.0|
|Barron-Robertson|       1.0|
|   Sexton-Golden|       1.0|
|        Wood LLC|       0.0|
|   Parks-Robbins|       1.0|
+----------------+----------+

