In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('customers').getOrCreate()

22/06/05 22:21:14 WARN Utils: Your hostname, pop-os resolves to a loopback address: 127.0.1.1; using 192.168.8.100 instead (on interface wlp1s0)
22/06/05 22:21:14 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/06/05 22:21:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/06/05 22:21:15 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [90]:
df = spark.read.csv('customer_churn.csv' , inferSchema= True , header= True)

In [91]:
df.show(2)

+----------------+----+--------------+---------------+-----+---------+-------------------+--------------------+----------+-----+
|           Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|       Onboard_date|            Location|   Company|Churn|
+----------------+----+--------------+---------------+-----+---------+-------------------+--------------------+----------+-----+
|Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|2013-08-30 07:00:40|10265 Elizabeth M...|Harvey LLC|    1|
|   Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|2013-08-13 00:38:46|6157 Frank Garden...|Wilson PLC|    1|
+----------------+----+--------------+---------------+-----+---------+-------------------+--------------------+----------+-----+
only showing top 2 rows



In [92]:
df.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [93]:
# selecting the needed columns 

df_new = df.select([
    'Age' , 
    'Total_Purchase' , 
    'Years' , 
    'Num_Sites' , 
    'Churn'
])

In [94]:
df_new.show(1)

+----+--------------+-----+---------+-----+
| Age|Total_Purchase|Years|Num_Sites|Churn|
+----+--------------+-----+---------+-----+
|42.0|       11066.8| 7.22|      8.0|    1|
+----+--------------+-----+---------+-----+
only showing top 1 row



In [95]:
df_new.columns

['Age', 'Total_Purchase', 'Years', 'Num_Sites', 'Churn']

In [96]:
from pyspark.sql.functions import isnan, when, count, col

In [97]:
# count of nulls 
df_new.select([count(when(isnan(c), c)).alias(c) for c in df_new.columns]).show()

+---+--------------+-----+---------+-----+
|Age|Total_Purchase|Years|Num_Sites|Churn|
+---+--------------+-----+---------+-----+
|  0|             0|    0|        0|    0|
+---+--------------+-----+---------+-----+



In [102]:
from pyspark.ml.feature import VectorAssembler

In [103]:
assumbler = VectorAssembler(inputCols=[
    'Age',
    'Total_Purchase', 
    'Years', 
    'Num_Sites'
] , outputCol= 'feature')

In [104]:
output = assumbler.transform(df)

In [105]:
selected_df = output.select('feature' , 'Churn')

In [106]:
# splitting the data 

train_data , test_data = selected_df.randomSplit([0.7 , 0.3]) 

In [108]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline 

In [109]:
log_reg_model = LogisticRegression(featuresCol= 'feature' ,
                                     labelCol= 'Churn')


In [112]:
# creating a pipeline 

pipeline  = Pipeline(stages=[
    assumbler , 
    output , 
    log_reg_model
])

In [113]:
# fit the model 
fit_model = pipeline.fit(train_data)

TypeError: Cannot recognize a pipeline stage of type <class 'pyspark.sql.dataframe.DataFrame'>.

In [77]:
# transform 

results = fit_model.transform(test_data)

In [78]:
# evaluating the model

from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [79]:
evaluation = BinaryClassificationEvaluator(
    rawPredictionCol='prediction' , 
    labelCol='Churn'
)

In [80]:
# area unver the curve 

AUC = evaluation.evaluate(results)

In [81]:
AUC

0.7607516614467954

In [82]:
# using a new customers data to test the model 

In [86]:
# apply the model to all of the data

final_model = log_reg_model.fit(df)

IllegalArgumentException: feature does not exist. Available: Names, Age, Total_Purchase, Account_Manager, Years, Num_Sites, Onboard_date, Location, Company, Churn

In [48]:
new_customers = spark.read.csv('new_customers.csv' , inferSchema= True , 
                                    header= True)

In [50]:
new_customers.show(1)

+-------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------+
|        Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|       Onboard_date|            Location| Company|
+-------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------+
|Andrew Mccall|37.0|       9935.53|              1| 7.71|      8.0|2011-08-29 18:37:54|38612 Johnny Stra...|King Ltd|
+-------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------+
only showing top 1 row



In [51]:
testing_new_customers = assumbler.transform(new_customers)

In [52]:
testing_new_customers.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- feature: vector (nullable = true)



In [55]:
new_customers_result = log_reg_model(testing_new_customers)

TypeError: 'LogisticRegression' object is not callable