## <font color="purple">Importing the libraries

In [51]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [52]:
# churn : müşteri kaybı

In [53]:
from pyspark import SparkConf,SparkContext
import collections #başka pclerde yapılan işlemleri topluyor

In [54]:
from pyspark.ml.classification import GBTClassifier


## <font color="purple"> Spark session oluşturma

* Spark session: bir spark uygulaması çalıştırmak için gerekli olan spark core üzerinde yüksek düzey bir API'dir. Spark session, spark uygulamasında kullanılacak tüm spark işlevlerine erişmek için bir arayüz sağlar ve ayrıca spark contextini psark sql işlevlerini spark streaming işlevlerini ve MLlib işlevlerini içeren diğer spark bileşenlerini de yönetir

In [55]:
from pyspark.sql import SparkSession

#spark session oluşturma
spark= SparkSession.builder.appName("churn_prediction").getOrCreate()


#veri setini dataframe olarak yükleme

df= spark.read.format("csv").option("header","true").load("churn.csv")

In [56]:
df.show()

+---+-------------------+----+--------------+---------------+-----+---------+-----+
|_c0|              Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|Churn|
+---+-------------------+----+--------------+---------------+-----+---------+-----+
|  0|   Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|    1|
|  1|      Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|    1|
|  2|        Eric Lozano|38.0|      12884.75|              0| 6.67|     12.0|    1|
|  3|      Phillip White|42.0|       8010.76|              0| 6.71|     10.0|    1|
|  4|     Cynthia Norton|37.0|       9191.58|              0| 5.56|      9.0|    1|
|  5|   Jessica Williams|48.0|      10356.02|              0| 5.12|      8.0|    1|
|  6|        Eric Butler|44.0|      11331.58|              1| 5.23|     11.0|    1|
|  7|      Zachary Walsh|32.0|       9885.12|              1| 6.92|      9.0|    1|
|  8|        Ashlee Carr|43.0|       14062.6|              1| 5.46|     11.0

## <font color="purple"> Exploratory Data Analysis

In [57]:
df.count()

900

In [58]:
len(df.columns)

8

In [59]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- Names: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Total_Purchase: string (nullable = true)
 |-- Account_Manager: string (nullable = true)
 |-- Years: string (nullable = true)
 |-- Num_Sites: string (nullable = true)
 |-- Churn: string (nullable = true)



In [60]:
df=df.withColumnRenamed("_c0","index")

In [61]:
df.printSchema()

root
 |-- index: string (nullable = true)
 |-- Names: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Total_Purchase: string (nullable = true)
 |-- Account_Manager: string (nullable = true)
 |-- Years: string (nullable = true)
 |-- Num_Sites: string (nullable = true)
 |-- Churn: string (nullable = true)



In [62]:
#distinct() metodu sadece benzersiz değerleri getirir
df.select("names").distinct().count() #names sütununda benzesiz olan değerlerin sayısını hesaplar

899

In [63]:
df.groupBy("names").count().sort("count",ascending=False).show(5)

+----------------+-----+
|           names|count|
+----------------+-----+
|   Jennifer Wood|    2|
|    Patrick Bell|    1|
|   Chelsea Marsh|    1|
|Patrick Robinson|    1|
|     John Barber|    1|
+----------------+-----+
only showing top 5 rows



In [64]:
df.filter(df["names"] == "Jennifer Wood").show()

+-----+-------------+----+--------------+---------------+-----+---------+-----+
|index|        Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|Churn|
+-----+-------------+----+--------------+---------------+-----+---------+-----+
|   22|Jennifer Wood|35.0|       9381.12|              1| 6.78|     11.0|    1|
|  439|Jennifer Wood|48.0|      11585.16|              0| 4.61|      9.0|    0|
+-----+-------------+----+--------------+---------------+-----+---------+-----+



In [65]:
#Jennifer Wood isimli iki farklı kişi var

In [66]:
df.describe().show()

+-------+------------------+-------------+-----------------+-----------------+------------------+-----------------+------------------+-------------------+
|summary|             index|        Names|              Age|   Total_Purchase|   Account_Manager|            Years|         Num_Sites|              Churn|
+-------+------------------+-------------+-----------------+-----------------+------------------+-----------------+------------------+-------------------+
|  count|               900|          900|              900|              900|               900|              900|               900|                900|
|   mean|             449.5|         null|41.81666666666667|10062.82403333334|0.4811111111111111| 5.27315555555555| 8.587777777777777|0.16666666666666666|
| stddev|259.95191863111916|         null|6.127560416916251|2408.644531858096|0.4999208935073339|1.274449013194616|1.7648355920350969| 0.3728852122772358|
|    min|                 0|   Aaron King|             22.0|          

In [67]:
#transpose(): pandas dataframesinde sütunları satırlara ve satırları sütunlara dönüştürür

In [68]:
df.describe().toPandas().transpose() # bu pyspark kodu bir dataframedeki sayısal sütunların temel istatistiksel özetlerini hesaplar ve bir pandas dataframesine dönüştürür

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
index,900,449.5,259.95191863111916,0,99
Names,900,,,Aaron King,Zachary Walsh
Age,900,41.81666666666667,6.127560416916251,22.0,65.0
Total_Purchase,900,10062.82403333334,2408.644531858096,100.0,9993.5
Account_Manager,900,0.4811111111111111,0.4999208935073339,0,1
Years,900,5.27315555555555,1.274449013194616,1.0,9.15
Num_Sites,900,8.587777777777777,1.7648355920350969,10.0,9.0
Churn,900,0.16666666666666666,0.3728852122772358,0,1


In [69]:
df.summary().show()

+-------+------------------+-------------+-----------------+-----------------+------------------+-----------------+------------------+-------------------+
|summary|             index|        Names|              Age|   Total_Purchase|   Account_Manager|            Years|         Num_Sites|              Churn|
+-------+------------------+-------------+-----------------+-----------------+------------------+-----------------+------------------+-------------------+
|  count|               900|          900|              900|              900|               900|              900|               900|                900|
|   mean|             449.5|         null|41.81666666666667|10062.82403333334|0.4811111111111111| 5.27315555555555| 8.587777777777777|0.16666666666666666|
| stddev|259.95191863111916|         null|6.127560416916251|2408.644531858096|0.4999208935073339|1.274449013194616|1.7648355920350969| 0.3728852122772358|
|    min|                 0|   Aaron King|             22.0|          

In [70]:
df.dtypes

[('index', 'string'),
 ('Names', 'string'),
 ('Age', 'string'),
 ('Total_Purchase', 'string'),
 ('Account_Manager', 'string'),
 ('Years', 'string'),
 ('Num_Sites', 'string'),
 ('Churn', 'string')]

In [71]:
df.columns

['index',
 'Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Churn']

In [72]:
df=df.dropna()

In [73]:
df.count()

900

## <font color="purple"> Preparing the Model

* Bağımlı değişken, genellikle hedef değişken olarak adlandırılır ve modelin öğrenmek istediği şeydir. Bu durumda "Churn" sütunu, müşterilerin ayrılma durumunu belirttiği için bağımlı değişken olarak seçilmiştir. MOdel, diğer değişkenlerin müiterilerin ayrılma kararları üzerindeki etkisini öğrenirken "Churn" sütunundaki verileri tahmin etmek için eğitilecektir


* StringIndexer, kategorik bir değişkeni numerik bir değişkene döndürmek için kullanılır. Bu durumda "Churn" sütunu "label" olarak adlandırılan bir numerik değikene dönüştürülecektir. BU, modelin veriyi işlemesine ve bağımlı değişken üzerinde tahmin yapmasına olanak tanıyacaktır

#### Bağımlı değişkeni label olarak atama


In [74]:
#bağımlı değişkeni label olarak atama

from pyspark.ml.feature import StringIndexer

stringIndexer=StringIndexer(inputCol="Churn", outputCol="label")

indexed=stringIndexer.fit(df).transform(df)

In [75]:
indexed.dtypes

[('index', 'string'),
 ('Names', 'string'),
 ('Age', 'string'),
 ('Total_Purchase', 'string'),
 ('Account_Manager', 'string'),
 ('Years', 'string'),
 ('Num_Sites', 'string'),
 ('Churn', 'string'),
 ('label', 'double')]

In [76]:
df = indexed.withColumn("label",indexed.label.cast("integer"))

Bu kod, spark dataframe API'si kullanılarak DataFrame üzerinde işlemler yapmak için yazılmıştır

* "cast()" bir dataframe sütunundaki veri tiplerini dönüştürmeyi sağlar.

* "withColumn()", dataframe üzerinde yeni bir sütun eklemek için kullanılır. withColumn() kullanılarak "label" sütunu üzerine yeni bir sütun eklenmiştir ve bu yeni sütunun adı "label" olarak belirlenmiştir. "cast()" ile birlikte kullanılarak "label" sütunun veri tipini string'den "integer"a dönüştütülmüştür

In [77]:
df.dtypes

[('index', 'string'),
 ('Names', 'string'),
 ('Age', 'string'),
 ('Total_Purchase', 'string'),
 ('Account_Manager', 'string'),
 ('Years', 'string'),
 ('Num_Sites', 'string'),
 ('Churn', 'string'),
 ('label', 'int')]

In [78]:
df.show(5)

+-----+----------------+----+--------------+---------------+-----+---------+-----+-----+
|index|           Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|Churn|label|
+-----+----------------+----+--------------+---------------+-----+---------+-----+-----+
|    0|Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|    1|    1|
|    1|   Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|    1|    1|
|    2|     Eric Lozano|38.0|      12884.75|              0| 6.67|     12.0|    1|    1|
|    3|   Phillip White|42.0|       8010.76|              0| 6.71|     10.0|    1|    1|
|    4|  Cynthia Norton|37.0|       9191.58|              0| 5.56|      9.0|    1|    1|
+-----+----------------+----+--------------+---------------+-----+---------+-----+-----+
only showing top 5 rows



In [79]:
# veride int olmasu gerekenleri pyspark kodları ile düzenleme

#Age,Total_Purchase,Account_Manager,Years,Num_Sites,Churn

In [80]:
from pyspark.sql.functions import col

df=df.select([col(column).cast("integer").alias(column)
              if column in ["index","Age",'Total_Purchase','Account_Manager','Years','Num_Sites','Churn'] else col(column)
              for column in df.columns])

In [81]:
df.dtypes

[('index', 'int'),
 ('Names', 'string'),
 ('Age', 'int'),
 ('Total_Purchase', 'int'),
 ('Account_Manager', 'int'),
 ('Years', 'int'),
 ('Num_Sites', 'int'),
 ('Churn', 'int'),
 ('label', 'int')]

#### Bağımsız değişkenlere vektör atama


* Tablo halinde saklanan verileri PySPark kullanarak işlemek için verileri kullanılacak özellik vektörüne dönüştürmek gerekebilir. **VectorAssembler** sınıfı, birden fazla sayısal sütunu tek bir özellik vektörüne birleştirmek için kullanılır

In [82]:
from pyspark.ml.feature import VectorAssembler

In [83]:
depended=["Age",'Total_Purchase','Account_Manager','Years','Num_Sites']

In [84]:
#bu kod satırı depended listesi içinde belirtilen girdi sütunları ile "features" adlı bir çıktı sütunu oluşturur

vectorAssembler=VectorAssembler(inputCols=depended, outputCol="features")


In [86]:
features_vect=vectorAssembler.transform(df) #features sütunu ekledik

In [90]:
features_vect.show()

+-----+-------------------+---+--------------+---------------+-----+---------+-----+-----+--------------------+
|index|              Names|Age|Total_Purchase|Account_Manager|Years|Num_Sites|Churn|label|            features|
+-----+-------------------+---+--------------+---------------+-----+---------+-----+-----+--------------------+
|    0|   Cameron Williams| 42|         11066|              0|    7|        8|    1|    1|[42.0,11066.0,0.0...|
|    1|      Kevin Mueller| 41|         11916|              0|    6|       11|    1|    1|[41.0,11916.0,0.0...|
|    2|        Eric Lozano| 38|         12884|              0|    6|       12|    1|    1|[38.0,12884.0,0.0...|
|    3|      Phillip White| 42|          8010|              0|    6|       10|    1|    1|[42.0,8010.0,0.0,...|
|    4|     Cynthia Norton| 37|          9191|              0|    5|        9|    1|    1|[37.0,9191.0,0.0,...|
|    5|   Jessica Williams| 48|         10356|              0|    5|        8|    1|    1|[48.0,10356.0,

In [87]:
final=features_vect.select("features","label")

final.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[42.0,11066.0,0.0...|    1|
|[41.0,11916.0,0.0...|    1|
|[38.0,12884.0,0.0...|    1|
|[42.0,8010.0,0.0,...|    1|
|[37.0,9191.0,0.0,...|    1|
|[48.0,10356.0,0.0...|    1|
|[44.0,11331.0,1.0...|    1|
|[32.0,9885.0,1.0,...|    1|
|[43.0,14062.0,1.0...|    1|
|[40.0,8066.0,1.0,...|    1|
|[30.0,11575.0,1.0...|    1|
|[45.0,8771.0,1.0,...|    1|
|[45.0,8988.0,1.0,...|    1|
|[40.0,8283.0,1.0,...|    1|
|[41.0,6569.0,1.0,...|    1|
|[38.0,10494.0,1.0...|    1|
|[45.0,8213.0,1.0,...|    1|
|[43.0,11226.0,0.0...|    1|
|[53.0,5515.0,0.0,...|    1|
|[46.0,8046.0,1.0,...|    1|
+--------------------+-----+
only showing top 20 rows



In [88]:
final.printSchema()

root
 |-- features: vector (nullable = true)
 |-- label: integer (nullable = true)



#### Train ve Test datası oluşturma

In [91]:
(trainData, testData)=final.randomSplit([0.7,0.3],seed=42)

In [92]:
trainData.count()

667

In [93]:
testData.count()

233

## <font color="purple"> Build the Model

* **GBTClassifier**, Gradient Boosted Trees algoritmasını uygulayan bir sınıflandırıcıdır. Bu algoritma, bir ağaç yapılandırması kullanarak tahmin yapar ve ardışık iterasyonlar yoluyla hataları azaltarak daha iyi bir tahmin yapmak için ağacın yapısını iyileştirir.

* **maxIter** parametresi, ağaçların sayısını belirler.

* **maxDepth** parametresi, ağaçların maksimum derinliğini belirler ve varsayılan değeri 5'tir. Bu değeri arttırarak veya azaltarak modelin performansını etkileyebiliriz



In [119]:
from pyspark.ml.classification import GBTClassifier

In [127]:
gbm=GBTClassifier(maxIter=70,maxDepth=20,featuresCol="features",labelCol="label")

In [128]:
gbm_model=gbm.fit(trainData)

In [129]:
y_pred = gbm_model.transform(testData)

In [130]:
y_pred.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[26.0,8787.0,1.0,...|    1|[2.05777319966183...|[0.98394494827477...|       0.0|
|[28.0,9090.0,1.0,...|    0|[-2.0577731996618...|[0.01605505172522...|       1.0|
|[28.0,11204.0,0.0...|    0|[2.05777319966183...|[0.98394494827477...|       0.0|
|[28.0,11245.0,0.0...|    0|[2.05777319966183...|[0.98394494827477...|       0.0|
|[29.0,9617.0,0.0,...|    0|[2.05777319966183...|[0.98394494827477...|       0.0|
|[29.0,10203.0,1.0...|    0|[2.05777319966183...|[0.98394494827477...|       0.0|
|[29.0,11274.0,1.0...|    0|[2.05777319966183...|[0.98394494827477...|       0.0|
|[30.0,6744.0,0.0,...|    0|[-2.0577731996618...|[0.01605505172522...|       1.0|
|[30.0,8403.0,1.0,...|    0|[2.05777319966183...|[0.98394494827477...|       0.0|
|[30.0,8874.0,0.

## <font color="purple"> Accuracy of the Model

* Modelimizin doğruluğunu hesaplamak için **MultiClassClassificationEvaluator** sınıfını kullanmamız gerekir.

* Bu sınıf, çok sınıflı sınıflandırma modelleri için doğruluk(accuracy) gibi performans metriklerini hesaplar

In [131]:
from pyspark.ml.evaluation import  MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="label",
                                              predictionCol="prediction",
                                              metricName="accuracy")

In [132]:
accuracy=evaluator.evaluate(y_pred)

In [133]:
accuracy

0.8583690987124464

## <font color="purple"> Improve Accuracy - CrossValidator

* **CrossValidator**, modelimizin doğruluğunu arttırmak için kullanabileceğimiz bir doğrulama aracıdır. Bu araç, veri kümenizi belirli sayıda parçaya ayırır, her bir parçaya sırasıyla test verisi olarak kullanır ve diğer parçaları eğitim verisi olarak kullanarak modelinizi eğitir
* CrossValidator aşağıdaki temel parametreleri alır:


1.   **estimator**: modelinizi belirleyen bir estiamtor nesnesi
2.   **evaluator**: model performansını değerlendirmek için bir evaluator nesnesi
3.  **estimatorParamMaps**: farklı hiperparametre kombinasyonlarını belirten bir liste veya ızgara
4.**numFolds**: veri kümenizi kaç parçaya böleceğinizi belirten bir sayı



* **ParamGridBuilder**, bir veya daha fazla hiperparametre kombinasyonlarının bir ızgarasını oluşturmanıza olanak tanır

* Aşağıdaki kodda **maxIter** parametresi için [10,20] ve **maxDepth** parametresi için [5,10] değeriyle bir ızgara oluşturur.

In [134]:
gbm_cv=GBTClassifier(featuresCol="features",labelCol="label")

from pyspark.ml.tuning import ParamGridBuilder
paramGrid = ParamGridBuilder() \
      .addGrid(gbm.maxIter, [10,20]) \
      .addGrid(gbm.maxDepth,[5,10]) \
      .build()

In [135]:
evaluator = MulticlassClassificationEvaluator(labelCol="label",
                                              predictionCol="prediction",
                                              metricName="accuracy")

In [137]:
from pyspark.ml.tuning import CrossValidator

cv=CrossValidator(estimator=gbm_cv,
                  estimatorParamMaps=paramGrid,
                  evaluator=evaluator,
                  numFolds=2)

In [138]:
cvModel=cv.fit(trainData)


bestModel = cvModel.bestModel

In [142]:
pred = bestModel.transform(testData)

accuracy = evaluator.evaluate(pred)

print("Test verilerinin doğruluğu: ",accuracy)

Test verilerinin doğruluğu:  0.8497854077253219


In [143]:
# accuracy'i arttıramadım :(