In [1]:
import pandas as pd
import numpy as np
from pyspark.ml.classification import GBTClassifier
import findspark
findspark.init()
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession
from pyspark.sql import Row
import collections
from pyspark import SparkConf,SparkContext

from __future__ import print_function
from  pyspark.ml.feature import VectorAssembler
from sklearn.metrics import accuracy_score,precision_score,recall_score,f1_score
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [2]:
df=pd.read_csv('churn.csv')

In [3]:
df.head()

Unnamed: 0.1,Unnamed: 0,Names,Age,Total_Purchase,Account_Manager,Years,Num_Sites,Churn
0,0,Cameron Williams,42.0,11066.8,0,7.22,8.0,1
1,1,Kevin Mueller,41.0,11916.22,0,6.5,11.0,1
2,2,Eric Lozano,38.0,12884.75,0,6.67,12.0,1
3,3,Phillip White,42.0,8010.76,0,6.71,10.0,1
4,4,Cynthia Norton,37.0,9191.58,0,5.56,9.0,1


In [4]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 900 entries, 0 to 899
Data columns (total 8 columns):
 #   Column           Non-Null Count  Dtype  
---  ------           --------------  -----  
 0   Unnamed: 0       900 non-null    int64  
 1   Names            900 non-null    object 
 2   Age              900 non-null    float64
 3   Total_Purchase   900 non-null    float64
 4   Account_Manager  900 non-null    int64  
 5   Years            900 non-null    float64
 6   Num_Sites        900 non-null    float64
 7   Churn            900 non-null    int64  
dtypes: float64(4), int64(3), object(1)
memory usage: 56.4+ KB


In [5]:
df.drop(['Unnamed: 0','Names'],axis=1,inplace=True)

In [6]:
df['Account_Manager']=df['Account_Manager'].astype(float) 

In [7]:
df['Churn']=df['Churn'].astype(float)

In [8]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 900 entries, 0 to 899
Data columns (total 6 columns):
 #   Column           Non-Null Count  Dtype  
---  ------           --------------  -----  
 0   Age              900 non-null    float64
 1   Total_Purchase   900 non-null    float64
 2   Account_Manager  900 non-null    float64
 3   Years            900 non-null    float64
 4   Num_Sites        900 non-null    float64
 5   Churn            900 non-null    float64
dtypes: float64(6)
memory usage: 42.3 KB


In [9]:
df.to_csv('churn2.csv')

In [10]:
df

Unnamed: 0,Age,Total_Purchase,Account_Manager,Years,Num_Sites,Churn
0,42.0,11066.80,0.0,7.22,8.0,1.0
1,41.0,11916.22,0.0,6.50,11.0,1.0
2,38.0,12884.75,0.0,6.67,12.0,1.0
3,42.0,8010.76,0.0,6.71,10.0,1.0
4,37.0,9191.58,0.0,5.56,9.0,1.0
...,...,...,...,...,...,...
895,42.0,12800.82,1.0,3.62,8.0,0.0
896,52.0,9893.92,0.0,6.91,7.0,0.0
897,45.0,12056.18,0.0,5.46,4.0,0.0
898,51.0,6517.93,1.0,5.47,10.0,0.0


In [11]:
spark=SparkSession.builder.getOrCreate()

In [12]:
inputlines=df1=spark.read.options(header = True, inferSchema = True).csv("churn2.csv")

In [13]:
inputlines

DataFrame[_c0: int, Age: double, Total_Purchase: double, Account_Manager: double, Years: double, Num_Sites: double, Churn: double]

In [14]:
df1=inputlines.rdd.map(lambda x: x.split(","))

In [15]:
df1

PythonRDD[15] at RDD at PythonRDD.scala:53

In [16]:
colnames=['Age','Total_Purchase','Account_Manager','Years','Num_Sites','Churn']

In [17]:
df2=inputlines
df2

DataFrame[_c0: int, Age: double, Total_Purchase: double, Account_Manager: double, Years: double, Num_Sites: double, Churn: double]

In [18]:
vecAssembler=VectorAssembler(inputCols=['Age','Total_Purchase','Account_Manager','Years','Num_Sites'], outputCol="features")
df2=vecAssembler.transform(df2)
df2

DataFrame[_c0: int, Age: double, Total_Purchase: double, Account_Manager: double, Years: double, Num_Sites: double, Churn: double, features: vector]

In [19]:
df2=df2.drop("index", 'Age','Total_Purchase','Account_Manager','Years','Num_Sites')
df2

DataFrame[_c0: int, Churn: double, features: vector]

In [20]:
df2=df2.withColumnRenamed("Churn", "label")
df2

DataFrame[_c0: int, label: double, features: vector]

In [21]:
trainTest=df2.randomSplit([0.5,0.5])
trainingDF=trainTest[0]
testDF=trainTest[1]

In [22]:
trainingDF

DataFrame[_c0: int, label: double, features: vector]

In [23]:
testDF

DataFrame[_c0: int, label: double, features: vector]

In [24]:
gbt=GBTClassifier(featuresCol='features',maxIter=10)

In [25]:
gbt=gbt.fit(trainingDF)

In [26]:
predictions=gbt.transform(testDF)
predictions.show()

+---+-----+--------------------+--------------------+--------------------+----------+
|_c0|label|            features|       rawPrediction|         probability|prediction|
+---+-----+--------------------+--------------------+--------------------+----------+
|  0|  1.0|[42.0,11066.8,0.0...|[1.32579924018560...|[0.93410944365984...|       0.0|
|  3|  1.0|[42.0,8010.76,0.0...|[0.10196220831088...|[0.55080516428766...|       0.0|
|  5|  1.0|[48.0,10356.02,0....|[1.32579924018560...|[0.93410944365984...|       0.0|
|  7|  1.0|[32.0,9885.12,1.0...|[0.94599042300999...|[0.86898122042583...|       0.0|
|  9|  1.0|[40.0,8066.94,1.0...|[-0.4588779747993...|[0.28541535559901...|       1.0|
| 10|  1.0|[30.0,11575.37,1....|[1.32579924018560...|[0.93410944365984...|       0.0|
| 11|  1.0|[45.0,8771.02,1.0...|[0.06984608505870...|[0.53486636272760...|       0.0|
| 12|  1.0|[45.0,8988.67,1.0...|[-0.8471522801204...|[0.15521058700570...|       1.0|
| 14|  1.0|[41.0,6569.87,1.0...|[-0.4331704397773...|[

In [27]:
evaluator=MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy=evaluator.evaluate(predictions)
print("Accuracy:", accuracy)

Accuracy: 0.8509174311926605
