In [11]:
import findspark
findspark.init("C:\spark")

import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf  # Conf = configuration

import matplotlib.pyplot as plt
import seaborn as sns

In [12]:
# Spark bağlantısının yapıldığı bilgiler

spark = SparkSession.builder.master("local").appName("pyspark giris").getOrCreate()

sc = spark.sparkContext # Bağlantı bilgisi

sc

In [13]:
# Veri setinin okunması

spark_df = spark.read.csv("churn.csv" , header=True , inferSchema=True)

spark_df.show()

+---+-------------------+----+--------------+---------------+-----+---------+-----+
|_c0|              Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|Churn|
+---+-------------------+----+--------------+---------------+-----+---------+-----+
|  0|   Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|    1|
|  1|      Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|    1|
|  2|        Eric Lozano|38.0|      12884.75|              0| 6.67|     12.0|    1|
|  3|      Phillip White|42.0|       8010.76|              0| 6.71|     10.0|    1|
|  4|     Cynthia Norton|37.0|       9191.58|              0| 5.56|      9.0|    1|
|  5|   Jessica Williams|48.0|      10356.02|              0| 5.12|      8.0|    1|
|  6|        Eric Butler|44.0|      11331.58|              1| 5.23|     11.0|    1|
|  7|      Zachary Walsh|32.0|       9885.12|              1| 6.92|      9.0|    1|
|  8|        Ashlee Carr|43.0|       14062.6|              1| 5.46|     11.0

# Veri Setinin Düzenlenmesi ve  Betimsel İstatistikleri

In [14]:
# Sütunlarımızın isimlerini standartlaştırdık

spark_df = spark_df.toDF(*[c.lower() for c in spark_df.columns])

spark_df.show()

+---+-------------------+----+--------------+---------------+-----+---------+-----+
|_c0|              names| age|total_purchase|account_manager|years|num_sites|churn|
+---+-------------------+----+--------------+---------------+-----+---------+-----+
|  0|   Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|    1|
|  1|      Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|    1|
|  2|        Eric Lozano|38.0|      12884.75|              0| 6.67|     12.0|    1|
|  3|      Phillip White|42.0|       8010.76|              0| 6.71|     10.0|    1|
|  4|     Cynthia Norton|37.0|       9191.58|              0| 5.56|      9.0|    1|
|  5|   Jessica Williams|48.0|      10356.02|              0| 5.12|      8.0|    1|
|  6|        Eric Butler|44.0|      11331.58|              1| 5.23|     11.0|    1|
|  7|      Zachary Walsh|32.0|       9885.12|              1| 6.92|      9.0|    1|
|  8|        Ashlee Carr|43.0|       14062.6|              1| 5.46|     11.0

In [15]:
# _c0 sütununun ismini değiştirdik

spark_df = spark_df.withColumnRenamed("_c0" , "index")

spark_df.show()

+-----+-------------------+----+--------------+---------------+-----+---------+-----+
|index|              names| age|total_purchase|account_manager|years|num_sites|churn|
+-----+-------------------+----+--------------+---------------+-----+---------+-----+
|    0|   Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|    1|
|    1|      Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|    1|
|    2|        Eric Lozano|38.0|      12884.75|              0| 6.67|     12.0|    1|
|    3|      Phillip White|42.0|       8010.76|              0| 6.71|     10.0|    1|
|    4|     Cynthia Norton|37.0|       9191.58|              0| 5.56|      9.0|    1|
|    5|   Jessica Williams|48.0|      10356.02|              0| 5.12|      8.0|    1|
|    6|        Eric Butler|44.0|      11331.58|              1| 5.23|     11.0|    1|
|    7|      Zachary Walsh|32.0|       9885.12|              1| 6.92|      9.0|    1|
|    8|        Ashlee Carr|43.0|       14062.6|       

In [16]:
# Veri setindeki gözlem sayısı

spark_df.count()

900

In [17]:
# Sütun sayımız

len(spark_df.columns)

8

In [18]:
# Sütun isimleri

spark_df.columns

['index',
 'names',
 'age',
 'total_purchase',
 'account_manager',
 'years',
 'num_sites',
 'churn']

In [19]:
# Veriyi tanımak için

spark_df.describe().show()

+-------+------------------+-------------+-----------------+-----------------+------------------+-----------------+------------------+-------------------+
|summary|             index|        names|              age|   total_purchase|   account_manager|            years|         num_sites|              churn|
+-------+------------------+-------------+-----------------+-----------------+------------------+-----------------+------------------+-------------------+
|  count|               900|          900|              900|              900|               900|              900|               900|                900|
|   mean|             449.5|         NULL|41.81666666666667|10062.82403333334|0.4811111111111111| 5.27315555555555| 8.587777777777777|0.16666666666666666|
| stddev|259.95191863111916|         NULL|6.127560416916251|2408.644531858096|0.4999208935073339|1.274449013194616|1.7648355920350969| 0.3728852122772358|
|    min|                 0|   Aaron King|             22.0|          

In [20]:
# Üstteki aynı işlemi pandas df'i olarak görüntülemek istersek

spark_df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
index,900,449.5,259.95191863111916,0,899
names,900,,,Aaron King,Zachary Walsh
age,900,41.81666666666667,6.127560416916251,22.0,65.0
total_purchase,900,10062.82403333334,2408.644531858096,100.0,18026.01
account_manager,900,0.4811111111111111,0.4999208935073339,0,1
years,900,5.27315555555555,1.274449013194616,1.0,9.15
num_sites,900,8.587777777777777,1.7648355920350969,3.0,14.0
churn,900,0.16666666666666666,0.3728852122772358,0,1


# Büyük Veride Veri Ön İşleme

In [21]:
# Eksik gözlemlerden kurtulmak istersek

spark_df = spark_df.dropna()

In [22]:
# Yeni bir değişken (sütun) eklemek-türetmek istersek

spark_df = spark_df.withColumn("age_kare" , spark_df.age**2)

spark_df.show()

+-----+-------------------+----+--------------+---------------+-----+---------+-----+--------+
|index|              names| age|total_purchase|account_manager|years|num_sites|churn|age_kare|
+-----+-------------------+----+--------------+---------------+-----+---------+-----+--------+
|    0|   Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|    1|  1764.0|
|    1|      Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|    1|  1681.0|
|    2|        Eric Lozano|38.0|      12884.75|              0| 6.67|     12.0|    1|  1444.0|
|    3|      Phillip White|42.0|       8010.76|              0| 6.71|     10.0|    1|  1764.0|
|    4|     Cynthia Norton|37.0|       9191.58|              0| 5.56|      9.0|    1|  1369.0|
|    5|   Jessica Williams|48.0|      10356.02|              0| 5.12|      8.0|    1|  2304.0|
|    6|        Eric Butler|44.0|      11331.58|              1| 5.23|     11.0|    1|  1936.0|
|    7|      Zachary Walsh|32.0|       9885.12|   

In [23]:
# Bağımlı(churn) değişkeni ifade etmek istersek

from pyspark.ml.feature import StringIndexer    # Churn değişkeni int olmasına rağmen string gibi davrandık

stringIndexer = StringIndexer(inputCol="churn" , outputCol="label")

mod = stringIndexer.fit(spark_df)

indexed = mod.transform(spark_df)

spark_df = indexed.withColumn("label"  , indexed["label"].cast("integer"))

spark_df.show()


# churn değişkeninin ikamesi olan "label" değişkenini oluşturduk

+-----+-------------------+----+--------------+---------------+-----+---------+-----+--------+-----+
|index|              names| age|total_purchase|account_manager|years|num_sites|churn|age_kare|label|
+-----+-------------------+----+--------------+---------------+-----+---------+-----+--------+-----+
|    0|   Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|    1|  1764.0|    1|
|    1|      Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|    1|  1681.0|    1|
|    2|        Eric Lozano|38.0|      12884.75|              0| 6.67|     12.0|    1|  1444.0|    1|
|    3|      Phillip White|42.0|       8010.76|              0| 6.71|     10.0|    1|  1764.0|    1|
|    4|     Cynthia Norton|37.0|       9191.58|              0| 5.56|      9.0|    1|  1369.0|    1|
|    5|   Jessica Williams|48.0|      10356.02|              0| 5.12|      8.0|    1|  2304.0|    1|
|    6|        Eric Butler|44.0|      11331.58|              1| 5.23|     11.0|    1|  1936

In [24]:
# Bağımsız değişkeni ifade etmek istersek

from pyspark.ml.feature import VectorAssembler # sadece sayısal değişkenleri kabul ediyor

bagımsiz_degiskenler = ["age" , "total_purchase" , "account_manager" , "years" , "num_sites"]

vectorAssembler = VectorAssembler(inputCols= bagımsiz_degiskenler , outputCol="features")   
# bagımsiz_degiskenlerde belirttiğimiz tüm değişlenlerimizi tek bir vektör tipinde alıp (her bir satır için) "features" şeklinde isimlendiriyor.
# Bizim bunu bir standarda sokmamız gerekiyor.

va_df = vectorAssembler.transform(spark_df)

va_df.show(truncate=False)


# Her bir sırayı incelediğimizde features sütununda sırasıyla "age" , "total_purchase" , "account_manager" , "years" , "num_sites" değişkenlerinin bir dizi şeklinde toplandığını görüyoruz
# Şimdi bu ıluşturduğumuz bağımlı ve bağımsız değişkenlerimizi bunun içerisinden seçme işlemine sokmamız gerekiyor

+-----+-------------------+----+--------------+---------------+-----+---------+-----+--------+-----+-----------------------------+
|index|names              |age |total_purchase|account_manager|years|num_sites|churn|age_kare|label|features                     |
+-----+-------------------+----+--------------+---------------+-----+---------+-----+--------+-----+-----------------------------+
|0    |Cameron Williams   |42.0|11066.8       |0              |7.22 |8.0      |1    |1764.0  |1    |[42.0,11066.8,0.0,7.22,8.0]  |
|1    |Kevin Mueller      |41.0|11916.22      |0              |6.5  |11.0     |1    |1681.0  |1    |[41.0,11916.22,0.0,6.5,11.0] |
|2    |Eric Lozano        |38.0|12884.75      |0              |6.67 |12.0     |1    |1444.0  |1    |[38.0,12884.75,0.0,6.67,12.0]|
|3    |Phillip White      |42.0|8010.76       |0              |6.71 |10.0     |1    |1764.0  |1    |[42.0,8010.76,0.0,6.71,10.0] |
|4    |Cynthia Norton     |37.0|9191.58       |0              |5.56 |9.0      |1   

In [25]:
# Final veri setimizi oluşturma işlemini gerçekleştirdik

final_df = va_df.select(["features" , "label"])

final_df.show(truncate=False)

+-----------------------------+-----+
|features                     |label|
+-----------------------------+-----+
|[42.0,11066.8,0.0,7.22,8.0]  |1    |
|[41.0,11916.22,0.0,6.5,11.0] |1    |
|[38.0,12884.75,0.0,6.67,12.0]|1    |
|[42.0,8010.76,0.0,6.71,10.0] |1    |
|[37.0,9191.58,0.0,5.56,9.0]  |1    |
|[48.0,10356.02,0.0,5.12,8.0] |1    |
|[44.0,11331.58,1.0,5.23,11.0]|1    |
|[32.0,9885.12,1.0,6.92,9.0]  |1    |
|[43.0,14062.6,1.0,5.46,11.0] |1    |
|[40.0,8066.94,1.0,7.11,11.0] |1    |
|[30.0,11575.37,1.0,5.22,8.0] |1    |
|[45.0,8771.02,1.0,6.64,11.0] |1    |
|[45.0,8988.67,1.0,4.84,11.0] |1    |
|[40.0,8283.32,1.0,5.1,13.0]  |1    |
|[41.0,6569.87,1.0,4.3,11.0]  |1    |
|[38.0,10494.82,1.0,6.81,12.0]|1    |
|[45.0,8213.41,1.0,7.35,11.0] |1    |
|[43.0,11226.88,0.0,8.08,12.0]|1    |
|[53.0,5515.09,0.0,6.85,8.0]  |1    |
|[46.0,8046.4,1.0,5.69,8.0]   |1    |
+-----------------------------+-----+
only showing top 20 rows



In [26]:
# Test-train ayrımı

splits = final_df.randomSplit([0.70 , 0.30]) # 70-30 ayırdık

train_df = splits[0]
test_df = splits[1]

# GBM ile Müşteri Terk Modellemesi

In [27]:
import pyspark
from pyspark.ml.classification import GBTClassifier #GBM algoritması

In [37]:
# Model

gbm_model = GBTClassifier(maxIter=10 , featuresCol="features" , labelCol="label").fit(train_df)

In [30]:
# Tahmin

y_pred = gbm_model.transform(test_df)
y_pred

DataFrame[features: vector, label: int, rawPrediction: vector, probability: vector, prediction: double]

In [33]:
# İlkel test hatamız

ac = y_pred.select("label" , "prediction")  # y_pred'de birden fazla sütun oldu biz bunlardan sadece bağımlı değişkeni ve tahmin değerlerini seçmek istedik

ac.filter(ac.label == ac.prediction).count() / ac.count()   # ACCURACY değerini hesapladık. %85 doğru tahminde bulunmuş


0.8507462686567164

In [43]:
# Model Tuning (Doğrulama)

from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder , CrossValidator

gbm = GBTClassifier(maxIter=10 , featuresCol="features" , labelCol="label")

evaluator = BinaryClassificationEvaluator()

paramGrid = (ParamGridBuilder() .addGrid(gbm.maxDepth, [2, 4, 6]) .addGrid(gbm.maxBins, [20, 30]) .addGrid(gbm.maxIter, [10, 20]) .build())

gbm_cv_model = CrossValidator(estimator=gbm , estimatorParamMaps=paramGrid ,   evaluator=evaluator ,   numFolds=10).fit(train_df)

In [45]:
# Final hata oranımız

y_pred = gbm_cv_model.transform(test_df)

ac = y_pred.select("label" , "prediction")

ac.filter(ac.label == ac.prediction).count() / ac.count()   # Son tahminde %88'lik bir tahmin oranı bulduk

0.8731343283582089

# Yeni müşteri geldiğinde terk eder mi etmez mi tahmin etmek

In [47]:


import pandas as pd

names = pd.Series(["Ali Ahmetoğlu" , "Taner Gün" , "Berkay Yurt" , "Polat Konak" , "Kamil Atasoy"])
age = pd.Series([38,43,34,50,40])
total_purchase = pd.Series([30000,10000,6000,30000,100000])
account_manager = pd.Series([1,0,0,1,1])
years = pd.Series([20,10,3,8,30])
num_sites = pd.Series([30,8,8,6,50])

yeni_musteriler = pd.DataFrame({
    "names":names,
    "age":age,
    "total_purchase":total_purchase,
    "account_manager":account_manager,
    "years":years,
    "num_sites":num_sites,
})

In [50]:
# Üstte oluşturuduğumuz pandas df'i kurmuş olduğumuz spark makine öğrenmesi modeli ile kullanabilmek için bunu spark'ın anlayacağı dile çevirmeliyiz

yeni_sdf = spark.createDataFrame(yeni_musteriler)

yeni_sdf.show()

+-------------+---+--------------+---------------+-----+---------+
|        names|age|total_purchase|account_manager|years|num_sites|
+-------------+---+--------------+---------------+-----+---------+
|Ali Ahmetoğlu| 38|         30000|              1|   20|       30|
|    Taner Gün| 43|         10000|              0|   10|        8|
|  Berkay Yurt| 34|          6000|              0|    3|        8|
|  Polat Konak| 50|         30000|              1|    8|        6|
| Kamil Atasoy| 40|        100000|              1|   30|       50|
+-------------+---+--------------+---------------+-----+---------+



In [53]:
# Bir de bunu model nesnesinin anlayacağı formata da çevirmeliyiz (features , label)

yeni_musteriler = vectorAssembler.transform(yeni_sdf)

# Son olarak sonucu elde edelim
results = gbm_cv_model.transform(yeni_musteriler)
results.select("names" , "prediction").show()

+-------------+----------+
|        names|prediction|
+-------------+----------+
|Ali Ahmetoğlu|       1.0|
|    Taner Gün|       0.0|
|  Berkay Yurt|       0.0|
|  Polat Konak|       0.0|
| Kamil Atasoy|       1.0|
+-------------+----------+

