In [None]:
import findspark
findspark.init("C:/Spark")

In [None]:
findspark.find()

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

In [None]:
spark = SparkSession.builder \
    .master("local") \
    .appName("churn_model") \
    .enableHiveSupport() \
    .getOrCreate()

sc=spark.sparkContext

In [None]:
sc

In [None]:
spark_df=spark.read.csv("churn.csv",header=True,inferSchema=True)
spark_df.show(5)

In [None]:
spark_df=spark_df.toDF(*[c.lower() for c in spark_df.columns])
spark_df=spark_df.withColumnRenamed("_c0","index")
spark_df.show(5)

In [None]:
from pyspark.ml.feature import StringIndexer
stringIndexer = StringIndexer(inputCol="churn", outputCol="label")
mod = stringIndexer.fit(spark_df)
indexed = mod.transform(spark_df)
spark_df=indexed.withColumn("label",indexed["label"].cast("integer"))
spark_df.show(5)

In [None]:
from pyspark.ml.feature import VectorAssembler
bagımsız_degiskenler=["age","total_purchase","account_manager","years","num_sites"]
vectorAssembler=VectorAssembler(inputCols=bagımsız_degiskenler,outputCol="features")
va_df=vectorAssembler.transform(spark_df)
va_df.show(5)

In [None]:
final_df=va_df.select(["features","label"])
splits=final_df.randomSplit([0.7,0.3])
train_df=splits[0]
test_df=splits[1]

In [None]:
#gbm ile müşteri terk modeli

In [None]:
from pyspark.ml.classification import GBTClassifier

In [None]:
gbm= GBTClassifier(maxIter=10,featuresCol="features",labelCol="label")
gbm_model=gbm.fit(train_df)

In [None]:
sc

In [None]:
y_pred=gbm_model.transform(test_df)

In [None]:
y_pred

In [None]:
ac=y_pred.select("label","prediction")

In [None]:
ac.filter(ac.label==ac.prediction).count()/ac.count() #gercek degerlerle tahminlerin uyumu

In [None]:
# model tuning

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

evaluator=BinaryClassificationEvaluator()

paramGrid=(ParamGridBuilder()
              .addGrid(gbm.maxDepth,[2,4,6])
              .addGrid(gbm.maxBins,[20,30])
              .addGrid(gbm.maxIter,[10,20])
              .build())

cv=CrossValidator(estimator=gbm,estimatorParamMaps=paramGrid,evaluator=evaluator,numFolds=10)

In [None]:
cv_model=cv.fit(train_df)

In [None]:
y_pred=cv_model.transform(test_df)
ac = y_pred.select("label", "prediction")
ac.filter(ac.label == ac.prediction).count() / ac.count()

In [None]:
#yeni müşteriler için tahmin yapılması
import pandas as pd
names = pd.Series(["Ali Ahmetoğlu", "Berkcan Tanerbey", "Harika Gündüz", "Polat Alemdar", "Ata Bakmayan Ali"])
ages = pd.Series([38, 43, 34, 50, 40])
total_purchases = pd.Series([30000, 10000, 6000, 30000,100000])  # toplam harcama
account_managers = pd.Series([1, 0, 0, 1, 1])  # account manager
years = pd.Series([20, 10, 3, 8,30])  # firma ile çalışılan yıl
num_sites = pd.Series([30,8,8,6,50])  # web sayfa sayısı

yeni_musteriler = pd.DataFrame({
    'names': names,
    'ages': ages,
    'total_purchases': total_purchases,
    'account_managers': account_managers,
    'years': years,
    'num_sites': num_sites
})

yeni_musteriler.columns

In [None]:
yeni_sdf = spark.createDataFrame(yeni_musteriler)


In [None]:
yeni_sdf.show(5)

In [None]:
yeni_musteriler=vectorAssembler.transform(yeni_sdf)
results=cv_model.transform(yeni_musteriler)
results.select("names","prediction").show()

In [25]:
#sc.stop()