In [34]:
import findspark
findspark.init('/home/ubuntu/Spark/spark-3.3.0-bin-hadoop3')

In [35]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Logistic_Regression_Test').getOrCreate()

In [36]:
df = spark.read.format("csv").option("header", "true").load("customer_churn.csv")

In [37]:
df.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company',
 'Churn']

In [38]:
df.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Total_Purchase: string (nullable = true)
 |-- Account_Manager: string (nullable = true)
 |-- Years: string (nullable = true)
 |-- Num_Sites: string (nullable = true)
 |-- Onboard_date: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: string (nullable = true)



In [39]:
df.describe().show()

+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+-------------------+--------------------+--------------------+-------------------+
|summary|        Names|              Age|   Total_Purchase|   Account_Manager|            Years|         Num_Sites|       Onboard_date|            Location|             Company|              Churn|
+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+-------------------+--------------------+--------------------+-------------------+
|  count|          900|              900|              900|               900|              900|               900|                900|                 900|                 900|                900|
|   mean|         null|41.81666666666667|10062.82403333334|0.4811111111111111| 5.27315555555555| 8.587777777777777|               null|                null|                null|0.16666666666666666|
| stddev| 

In [40]:
df.head(1)

[Row(Names='Cameron Williams', Age='42.0', Total_Purchase='11066.8', Account_Manager='0', Years='7.22', Num_Sites='8.0', Onboard_date='2013-08-30 07:00:40', Location='10265 Elizabeth Mission Barkerburgh, AK 89518', Company='Harvey LLC', Churn='1')]

In [41]:
df = df.selectExpr("cast(Names as string) Names", "cast(Age as float) Age", "cast(Total_Purchase as float) Total_Purchase", 
                   "cast(Account_Manager as int) Account_Manager",
                     "cast(Years as float) Years", 
                  "cast(Num_Sites as float) Num_Sites", "cast(Onboard_date as string) Onboard_date", 
                  "cast(Location as string) Location", "cast(Company as string) Company", 
                  "cast(Churn as int) Churn")
df.head(1)

[Row(Names='Cameron Williams', Age=42.0, Total_Purchase=11066.7998046875, Account_Manager=0, Years=7.21999979019165, Num_Sites=8.0, Onboard_date='2013-08-30 07:00:40', Location='10265 Elizabeth Mission Barkerburgh, AK 89518', Company='Harvey LLC', Churn=1)]

In [42]:
my_cols = df.select([
    'Age',
    'Total_Purchase',
    'Years',
    'Num_Sites',
    'Onboard_date',
    'Churn',
    'Account_Manager',
    'Location',
    'Company'
])

In [43]:
#Deal with missing data - keeping it simple
my_final_data = my_cols.na.drop()

In [44]:
from pyspark.ml.feature import VectorAssembler, VectorIndexer, StringIndexer, OneHotEncoder

In [45]:
date_indexer = StringIndexer(inputCol='Onboard_date', outputCol='Onboard_dateIndex')
date_indexer.setHandleInvalid('keep')
date_encoder = OneHotEncoder(inputCol='Onboard_dateIndex', outputCol='Onboard_dateVec')

In [46]:
# location_indexer = StringIndexer(inputCol='Location', outputCol='LocationIndex')
# location_indexer.setHandleInvalid('keep')
# location_encoder = OneHotEncoder(inputCol='LocationIndex', outputCol='LocationVec')

In [47]:
# company_indexer = StringIndexer(inputCol='Company', outputCol='CompanyIndex')
# company_indexer.setHandleInvalid('keep')
# company_encoder = OneHotEncoder(inputCol='CompanyIndex', outputCol='CompanyVec')

In [48]:
assembler = VectorAssembler(inputCols=[  #'LocationVec', 'CompanyVec',
                                        'Onboard_dateVec', 'Age'
                                       ,'Total_Purchase','Years','Num_Sites','Account_Manager',], outputCol='features')

In [49]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

In [50]:
log_reg_churn = LogisticRegression(featuresCol='features', labelCol='Churn')

In [51]:
pipeline = Pipeline(stages=[date_indexer, date_encoder, 
                            assembler, log_reg_churn,
#                            company_encoder, company_indexer,
#                             location_indexer, location_encoder
                           ])

In [52]:
train_data, test_data = my_final_data.randomSplit([0.7, 0.3])

In [53]:
fit_model = pipeline.fit(train_data)

In [54]:
results = fit_model.transform(test_data)

In [55]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [56]:
myeval = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='Churn')

In [57]:
myeval.evaluate(results)

0.5

In [58]:
results.select('Churn', 'prediction').show()

+-----+----------+
|Churn|prediction|
+-----+----------+
|    0|       0.0|
|    1|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    1|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
+-----+----------+
only showing top 20 rows



In [59]:
AUC = myeval.evaluate(results)

In [60]:
AUC

0.5

In [61]:
df_new_customers = spark.read.format("csv").option("header", "true").load("new_customers.csv")

In [62]:
df_new_customers.head(1)

[Row(Names='Andrew Mccall', Age='37.0', Total_Purchase='9935.53', Account_Manager='1', Years='7.71', Num_Sites='8.0', Onboard_date='2011-08-29 18:37:54', Location='38612 Johnny Stravenue Nataliebury, WI 15717-8316', Company='King Ltd')]