<img src="PySpark.avif">

### Spark ve PySpark Nedir?

Apache Spark, büyük veri işleme ve analizi için kullanılan açık kaynaklı bir veri işleme çerçevesidir. Spark, büyük veri kümelerini hızlı, paralel ve dağıtık bir şekilde işlemek, analiz etmek ve modellemek için tasarlanmıştır.

PySpark ise, büyük veri işleme ve analizi için kullanılan Apache Spark'ın Python API'sidir. Spark, dağıtık veri işleme, yüksek performanslı hesaplama ve büyük veri analizi için tasarlanmış bir açık kaynaklı bir veri işleme çerçevesidir. PySpark ise Python programlama diliyle Spark platformuna erişim sağlayan bir kütüphanedir. İşte PySpark'ın temel özellikleri:

1. **Dağıtık İşleme:** Spark, büyük veri kümelerini parçalara ayırarak ve bu parçaları birden fazla bilgisayar üzerinde işleyerek paralel işlem yapabilir. Bu sayede işlemler daha hızlı ve ölçeklenebilir hale gelir.

2. **Yüksek Hız:** Spark, veriyi bellekte (RAM) tutarak işlem yapar. Bu, geleneksel disk tabanlı veri işleme sistemlerine göre çok daha hızlı sonuçlar elde etmenizi sağlar.

3. **Çeşitli Veri Kaynakları:** PySpark, çeşitli veri kaynaklarına (CSV, JSON, Parquet, Hive, HBase vb.) erişim sağlar ve bu verileri Spark veri yapıları olan RDD (Resilient Distributed Dataset) veya DataFrame formatında işleyebilirsiniz.

4. **Yüksek Seviyeli API:** PySpark, Python programlama dilini kullandığı için kullanımı kolaydır ve geniş bir Python topluluğu tarafından desteklenir. Bu, geliştirme süreçlerini hızlandırır.

5. **Makine Öğrenimi ve Büyük Veri Analizi:** Spark, geniş bir veri işleme kütüphanesine sahiptir. PySpark üzerinden Spark MLlib'i kullanarak makine öğrenimi modelleri oluşturabilir ve büyük veri kümesi üzerinde analizler yapabilirsiniz.

6. **Dağıtık Veri Setleri:** Spark, RDD ve DataFrame gibi yapılarla veriyi temsil eder. Bu yapılar, işlemlerinizi paralel olarak işlemek için kullanılır.

7. **Veri Akışı:** Spark Streaming, gerçek zamanlı veri akışını işlemek için kullanılan bir modüldür. Bu sayede veri akışlarını anlık olarak işleyebilirsiniz.

8. **Graf İşleme:** Spark GraphX, büyük çaplı graf verilerini işlemek ve analiz etmek için kullanılır.

PySpark ile işe başlamak için, öncelikle bir Spark oturumu oluşturmanız gerekmektedir. Bu oturum, Spark bağlantısını kurmanıza ve Spark üzerinde işlem yapmanıza olanak sağlar. Daha sonra veri yükleme, veri işleme, analiz ve model oluşturma gibi adımları gerçekleştirebilirsiniz.

# Projemiz: Spark ile Büyük Veri olan "Churn" Tahmini

Spark oturumu açtıktan sonra izlenecek adımları aşağıdaki gibi özetleyebiliriz:
Elbette, işte zamirleri kullanarak PySpark kullanarak Büyük Veri Müşteri Kaybı (Churn) tahmini projesinin adımları:

1. **Veri Hazırlığı:**
   - İlgili verileri toplayarak veri kaynaklarından verileri alacağız.
   - Bu verileri PySpark DataFrame'e yükleyeceğiz veya oluşturacağız.
   - Verileri incelemek ve işlem yapmak için keşifsel veri analizi (EDA) gerçekleştireceğiz. Eksik değerler, aykırı değerler ve kategorik değişkenler hakkında işlemler yapacağız.

2. **Veri Ön İşleme:**
   - Kategorik değişkenleri sayısal değerlere dönüştürmemiz gerekecek. Bunun için etiket kodlamayı veya tekil kodlamayı kullanabiliriz.
   - Modelin performansını artırabilecek yeni özellikler oluşturmak için özellik seçimi ve mühendisliği yapacağız.

3. **Veri Bölünmesi:**
   - Veriyi eğitim ve test kümelerine ayıracağız. Genellikle verinin %70-80'ini eğitim, %20-30'unu test olarak ayırabiliriz.

4. **Model Oluşturma:**
   - GBTClassifier gibi bir makine öğrenimi modelini seçeceğiz ve ilgili parametreleri ayarlayacağız.
   - Eğitim verilerini kullanarak modeli eğiteceğiz.

5. **Model Değerlendirmesi:**
   - Test verilerini kullanarak modelin performansını değerlendireceğiz. Doğruluk, hassasiyet, kesinlik, geri çağırma gibi metrikleri kullanacağız.
   - Sonuçları anlamak için confusion matrix gibi görselleştirmeleri kullanacağız.

6. **Model Ayarlaması ve Optimizasyon:**
   - Modelin performansını artırmak için hiperparametre ayarlamaları yapacağız. GridSearch veya RandomizedSearch gibi yöntemleri kullanabiliriz.
   - Aşırı öğrenmeyi önlemek için düzenleme (regularization) tekniklerini (örneğin, ağaç sayısını sınırlamak) uygulayabiliriz.

7. **Sonuçların Sunumu:**
   - Modelin sonuçlarını yönetim veya ilgili paydaşlarla paylaşmak için etkili bir sunum hazırlayacağız.
   - Önemli özellikleri, işaretçileri ve modelin tahmin yeteneğini vurgulayacağız.

8. **Üretim Ortamına Entegrasyon:**
   - Başarılı bir model elde edersek, modeli üretim ortamına entegre etmek için gerekli adımları takip edeceğiz.
   - Modeli kullanıma sunmak için gereken işlemleri gerçekleştireceğiz.

9. **Sürekli İyileştirme:**
   - Modeli düzenli olarak gözden geçirip yeni verilerle güncelleyeceğiz. Müşteri davranışı zamanla değişebilir, bu nedenle modelin güncelliğini sağlayacağız.

Bu adımları takip ederek, PySpark kullanarak Büyük Veri Müşteri Kaybı tahmini projesini gerçekleştireceğiz.

In [1]:
# İndirmeler
!pip install pycaret
!pip install pyspark
!pip install findspark

Collecting pycaret
  Downloading pycaret-3.0.4-py3-none-any.whl (484 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m484.4/484.4 kB[0m [31m6.2 MB/s[0m eta [36m0:00:00[0m
Collecting pyod>=1.0.8 (from pycaret)
  Downloading pyod-1.1.0.tar.gz (153 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m153.4/153.4 kB[0m [31m9.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting category-encoders>=2.4.0 (from pycaret)
  Downloading category_encoders-2.6.2-py2.py3-none-any.whl (81 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m81.8/81.8 kB[0m [31m9.3 MB/s[0m eta [36m0:00:00[0m
Collecting deprecation>=2.1.0 (from pycaret)
  Downloading deprecation-2.1.0-py2.py3-none-any.whl (11 kB)
Collecting xxhash (from pycaret)
  Downloading xxhash-3.3.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (194 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m194.1/194.1 k

In [2]:
# Kütüphaneler
import findspark
import pandas as pd
import numpy as np
import seaborn as sns
from datetime import date, timedelta, datetime
import time
import os
import matplotlib.pyplot as plt
%matplotlib inline
import pyspark
from pyspark.context import SparkContext
from pyspark.ml.feature import StringIndexer
from sklearn.metrics import f1_score, recall_score, precision_score
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, GBTClassifier, LinearSVC

## 1. **Veri Hazırlığı:**

In [3]:
# Spark oturumu açıyoruz:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

MAX_MEMORY = "10g"

spark = SparkSession \
    .builder \
    .appName("Churn") \
    .config("spark.executor.memory", MAX_MEMORY) \
    .config("spark.driver.memory", MAX_MEMORY) \
    .getOrCreate()

Bu kod örneği PySpark ile bir Spark oturumu (Spark Session) oluşturmayı gösteriyor. Bir Spark oturumu, Spark işlemlerini ve uygulamalarını yürütmek için kullanılan temel arayüzdür. Kodun her bir bölümünü ayrıntılı bir şekilde açıklayalım:

1. **MAX_MEMORY Tanımı:**
   Bu kısım, bellek sınırlarını belirlemek için kullanılan bir sabiti tanımlar. `"10g"` ifadesi, 10 gigabayt bellek miktarını temsil eder. Bu, Spark oturumunda kullanılacak toplam bellek miktarını belirtir.

2. **SparkSession Oluşturma:**
   - `SparkSession`: Bu sınıf, Spark uygulamaları için temel giriş noktasıdır. Uygulamalarınızı yürütmek ve Spark'a erişmek için kullanılır.
   - `.builder`: SparkSession oluşturmak için kullanılan bir yapılandırıcı nesnesi oluşturur.
   - `.appName("Churn")`: Uygulama adını belirtir. Bu isim Spark UI'da görünecektir.
   - `.config("spark.executor.memory", MAX_MEMORY)`: Bu yöntem, Spark yapılandırmasını ayarlar. Burada "spark.executor.memory" yapılandırmasını "10g" olarak ayarlıyoruz. Bu, her bir Spark işçi sürecine tahsis edilen bellek miktarını belirtir.
   - `.config("spark.driver.memory", MAX_MEMORY)`: Benzer şekilde, "spark.driver.memory" yapılandırmasını da "10g" olarak ayarlıyoruz. Bu, sürücü programın (ana işlem) kullanabileceği bellek miktarını belirtir.
   - `.getOrCreate()`: Eğer mevcut bir Spark oturumu varsa, onu alır; yoksa yeni bir Spark oturumu oluşturur.

Bu kodun amacı, yüksek düzeyde yapılandırılmış bir Spark oturumu oluşturmak ve bu oturumda tanımlanan bellek sınırlarını belirlemektir. Bellek sınırları, Spark işlemlerinin ve hesaplamalarının verimli ve dengeli bir şekilde yapılmasını sağlamak için önemlidir. Özellikle büyük veri işlemleri yürütüldüğünde, bellek ayarlarını doğru bir şekilde yapılandırmak büyük önem taşır.

In [4]:
# Veriyi okuyoruz
df= spark.read.format("csv").option("header","true").load("churn.csv")

In [5]:
df.show()

+---+-------------------+----+--------------+---------------+-----+---------+-----+
|_c0|              Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|Churn|
+---+-------------------+----+--------------+---------------+-----+---------+-----+
|  0|   Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|    1|
|  1|      Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|    1|
|  2|        Eric Lozano|38.0|      12884.75|              0| 6.67|     12.0|    1|
|  3|      Phillip White|42.0|       8010.76|              0| 6.71|     10.0|    1|
|  4|     Cynthia Norton|37.0|       9191.58|              0| 5.56|      9.0|    1|
|  5|   Jessica Williams|48.0|      10356.02|              0| 5.12|      8.0|    1|
|  6|        Eric Butler|44.0|      11331.58|              1| 5.23|     11.0|    1|
|  7|      Zachary Walsh|32.0|       9885.12|              1| 6.92|      9.0|    1|
|  8|        Ashlee Carr|43.0|       14062.6|              1| 5.46|     11.0

### EDA - Exploratory Data Analysis

In [6]:
# Satır sayısı
df.count()

900

In [7]:
# Sutun Sayısı
len(df.columns)

8

In [8]:
# info gibi bilgiler için
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- Names: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Total_Purchase: string (nullable = true)
 |-- Account_Manager: string (nullable = true)
 |-- Years: string (nullable = true)
 |-- Num_Sites: string (nullable = true)
 |-- Churn: string (nullable = true)



In [9]:
# Bu kod, _c0 adındaki bir sütunu index olarak yeniden adlandırmak için kullanılır.
# Bu tür ad değişiklikleri, verinin daha iyi anlaşılabilir ve yönetilebilir olmasını sağlamak için yaygın olarak kullanılır.
df=df.withColumnRenamed("_c0","index")

In [10]:
#Burada pthondaki unique gibi sutun içindeki benzersizliklere bakacağız.
#Çünkü verinin tekrar edip etmediğini bilerek daha sıhhatli sonuçlara ulaşırız,
#Names ı neden seçtik çünkü diğer sutunlarda benzerlik olabilir ama isim benzerliği nadir rastlanan bir durumdur.
df.select("names").distinct().count() #names sütununda benzesiz olan değerlerin sayısını hesaplar

899

In [11]:
#900 olan veri 899 çıktığı için isimde bir tekrar etme sözkonusu onu bulmak için isimleri gruplandırma yapıyoruz.
df.groupBy("names").count().sort("count",ascending=False).show(5)

+----------------+-----+
|           names|count|
+----------------+-----+
|   Jennifer Wood|    2|
|    Patrick Bell|    1|
|Patrick Robinson|    1|
|   Chelsea Marsh|    1|
|     John Barber|    1|
+----------------+-----+
only showing top 5 rows



In [12]:
# iki giriş olan Jennifer Wood'un sağlamasını yapıyoruz.
df.filter(df["names"] == "Jennifer Wood").show()

+-----+-------------+----+--------------+---------------+-----+---------+-----+
|index|        Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|Churn|
+-----+-------------+----+--------------+---------------+-----+---------+-----+
|   22|Jennifer Wood|35.0|       9381.12|              1| 6.78|     11.0|    1|
|  439|Jennifer Wood|48.0|      11585.16|              0| 4.61|      9.0|    0|
+-----+-------------+----+--------------+---------------+-----+---------+-----+



In [13]:
# İstatistiğine bakıyoruz
df.select('age','total_purchase','account_manager','years','num_sites','churn').describe().show()

+-------+-----------------+-----------------+------------------+-----------------+------------------+-------------------+
|summary|              age|   total_purchase|   account_manager|            years|         num_sites|              churn|
+-------+-----------------+-----------------+------------------+-----------------+------------------+-------------------+
|  count|              900|              900|               900|              900|               900|                900|
|   mean|41.81666666666667|10062.82403333334|0.4811111111111111| 5.27315555555555| 8.587777777777777|0.16666666666666666|
| stddev|6.127560416916251|2408.644531858096|0.4999208935073339|1.274449013194616|1.7648355920350969| 0.3728852122772358|
|    min|             22.0|            100.0|                 0|              1.0|              10.0|                  0|
|    max|             65.0|           9993.5|                 1|             9.15|               9.0|                  1|
+-------+---------------

In [14]:
# Aynı veriyi pandas tablosunda görüntülemek
df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
index,900,449.5,259.95191863111916,0,99
Names,900,,,Aaron King,Zachary Walsh
Age,900,41.81666666666667,6.127560416916251,22.0,65.0
Total_Purchase,900,10062.82403333334,2408.644531858096,100.0,9993.5
Account_Manager,900,0.4811111111111111,0.4999208935073339,0,1
Years,900,5.27315555555555,1.274449013194616,1.0,9.15
Num_Sites,900,8.587777777777777,1.7648355920350969,10.0,9.0
Churn,900,0.16666666666666666,0.3728852122772358,0,1


In [23]:
# boş verileri çıkarmak için
df=df.dropna()

In [24]:
# veri tiplerine bakacağız
df.dtypes

[('index', 'string'),
 ('Names', 'string'),
 ('Age', 'string'),
 ('Total_Purchase', 'string'),
 ('Account_Manager', 'string'),
 ('Years', 'string'),
 ('Num_Sites', 'string'),
 ('Churn', 'string')]

## 2. **Veri Ön İşleme:**

#### Feature Engineering

In [26]:
# Hedef sutunumuz olan "Churn"u Classification algoritmasına vermeden önce int e çevirmeliyiz ki işleyebilsin.
# "Churn" adlı kategorik sütunu sayısal etiketlere dönüştürüp, "label" adında yeni bir sütun oluşturulur.

from pyspark.ml.feature import StringIndexer

stringIndexer=StringIndexer(inputCol="Churn", outputCol="churn_int")

indexed=stringIndexer.fit(df).transform(df)

In [28]:
# Çevirdiğimizi df'ye atıyoruz
df = indexed.withColumn("churn_int",indexed.churn_int.cast("integer"))

In [29]:
# Kontrol edelim
df.dtypes

[('index', 'string'),
 ('Names', 'string'),
 ('Age', 'string'),
 ('Total_Purchase', 'string'),
 ('Account_Manager', 'string'),
 ('Years', 'string'),
 ('Num_Sites', 'string'),
 ('Churn', 'string'),
 ('churn_int', 'int')]

In [30]:
# Churn dışındaki diğer sutunlar için de type değiştirme yapıyoruz ama Sadece veri tipi değişikliği var.
# Yeniden adlandırma ayrı bir işlem olarak gerçekleştirilmez.
from pyspark.sql.functions import col

df=df.select([col(column).cast("integer").alias(column)
              if column in ["index","Age",'Total_Purchase','Account_Manager','Years','Num_Sites','Churn'] else col(column)
              for column in df.columns])

In [31]:
df.dtypes
# artık bütün veriler sayısal değerlere dönüştü

[('index', 'int'),
 ('Names', 'string'),
 ('Age', 'int'),
 ('Total_Purchase', 'int'),
 ('Account_Manager', 'int'),
 ('Years', 'int'),
 ('Num_Sites', 'int'),
 ('Churn', 'int'),
 ('churn_int', 'int')]

In [45]:
# Hedefimiz dışındaki tüm sutunları bir vektöre atıyoruz

from pyspark.ml.feature import VectorAssembler
depended=["Age",'Total_Purchase','Account_Manager','Years','Num_Sites']
vectorAssembler=VectorAssembler(inputCols=depended, outputCol="features")
df_vect=vectorAssembler.transform(df)
df_vect.show()
final_df=df_vect.select("features","churn_int")
final_df.show()

+-----+-------------------+---+--------------+---------------+-----+---------+-----+---------+--------------------+
|index|              Names|Age|Total_Purchase|Account_Manager|Years|Num_Sites|Churn|churn_int|            features|
+-----+-------------------+---+--------------+---------------+-----+---------+-----+---------+--------------------+
|    0|   Cameron Williams| 42|         11066|              0|    7|        8|    1|        1|[42.0,11066.0,0.0...|
|    1|      Kevin Mueller| 41|         11916|              0|    6|       11|    1|        1|[41.0,11916.0,0.0...|
|    2|        Eric Lozano| 38|         12884|              0|    6|       12|    1|        1|[38.0,12884.0,0.0...|
|    3|      Phillip White| 42|          8010|              0|    6|       10|    1|        1|[42.0,8010.0,0.0,...|
|    4|     Cynthia Norton| 37|          9191|              0|    5|        9|    1|        1|[37.0,9191.0,0.0,...|
|    5|   Jessica Williams| 48|         10356|              0|    5|    

Yukarıdaki hedef dışındakileri vektöre atamanın faydalarını şöyle sıralayabilirz:

Veriyi Hazırlama: Birçok makine öğrenimi algoritması, bağımsız değişkenleri vektörler veya matrisler olarak bekler. Bu kodlar, belirtilen bağımsız değişkenleri bir araya getirerek, veriyi algoritmaya hazır bir formatta dönüştürmek amacıyla kullanılır.

Özellik Mühendisliği: Bağımsız değişkenlerin özelliklerini vektörel bir özellikte birleştirerek, modelin daha fazla bilgiyle eğitilmesini sağlar. Özellikle bazı modeller, bağımsız değişkenlerin etkileşimlerini veya kombinasyonlarını daha iyi yakalayarak daha iyi sonuçlar üretebilir.

Veri Boyutunu Azaltma: Eğer çok sayıda bağımsız değişkeniniz varsa, bunları bir vektörel özellik olarak birleştirerek veri boyutunu azaltabilirsiniz. Bu, gereksiz boyut artışını engelleyerek, daha hızlı ve daha verimli model eğitimi sağlar.

Bazı Algoritmalar için Gereklilik: Özellikle bazı algoritmalar, bağımsız değişkenleri vektör formatında almayı bekler. Bu kodlar, bu tür algoritmalarla uyumlu hale getirir.

Kodun Okunabilirliği: Bu kodlar, özellik mühendisliği adımını daha kolay anlaşılır ve düzenli bir şekilde gerçekleştirmenizi sağlar.

Sonuç olarak, bu kodlar, belirtilen bağımsız değişkenleri birleştirip bir vektörel özellik sütunu oluşturarak veriyi modele hazır hale getirir. Bu tür bir özellik mühendisliği, genellikle modelinizin performansını artırmanıza ve daha iyi sonuçlar elde etmenize yardımcı olabilir.

In [34]:
final_df=df.withColumn("age_sqrt",df.Age**2)

Bu kod, DataFrame'deki "Age" (yaş) sütunundaki değerleri kullanarak yeni bir sütun olan "age_sqrt" (yaşın karesi) sütununu oluşturmak için kullanılır. Bu tür özellik mühendisliği işlemleri, veriyi daha iyi anlamak, modele daha fazla bilgi sağlamak veya bazı modellerin performansını artırmak amacıyla gerçekleştirilir. İşte kodun amaçları ve faydaları:

1. **Model İçin Daha İyi Özellik Sağlama:** Bazı durumlarda, yaş değeriyle birlikte yaşın karesi de modelinize daha fazla bilgi sağlayabilir. Örneğin, bazı davranışlar yaşın karesiyle daha iyi açıklanabilir. Model, yaşın karesini kullanarak daha karmaşık ilişkileri yakalamak veya bazı örüntüleri keşfetmek için daha iyi bir performans gösterebilir.

2. **Non-Lineer İlişkileri Temsil Etme:** Modeliniz, sadece yaşın kendisi yerine yaşın karesi ile daha iyi non-lineer ilişkileri temsil edebilir. Özellikle doğrusal olmayan ilişkileri modellemek için bu tür özellikleri kullanmak faydalı olabilir.

3. **Azalan Önemli Etki:** Yaşın karesi, yaş arttıkça yaşın kendisinin etkisinin azaldığı bir durumu temsil edebilir. Örneğin, yaşın gençken daha belirgin olduğu bir etki sonrasında yaşın artmasıyla etkisinin azaldığı bir senaryo düşünülebilir. Bu tür durumları yakalamak için bu tür dönüşümler yapmak yararlı olabilir.

4. **Hem Sayısal Hem de Kategorik Değerler İçin Uygunluk:** Yaş değeri kategorik olmadığından ve genellikle sürekli bir değişken olduğundan, yaşın karesi gibi türetilmiş özellikler bu tür durumlar için daha uygundur.

Ancak, her durumda özellik mühendisliği yapmadan önce dikkatli bir analiz ve modelinize uygunluğunu değerlendirmek önemlidir. Özellikle gereksiz veya anlamsız özellikler modelinizi karmaşıklaştırabilir ve performansını düşürebilir.

## 3. **Veri Bölünmesi:**

In [50]:
(traindf, testdf)=final_df.randomSplit([0.7,0.3],seed=42)

In [51]:
traindf.count()

667

In [52]:
traindf.show(5)

+--------------------+---------+
|            features|churn_int|
+--------------------+---------+
|[22.0,11254.0,1.0...|        0|
|[25.0,9672.0,0.0,...|        0|
|[26.0,8939.0,0.0,...|        0|
|[27.0,8628.0,1.0,...|        0|
|[28.0,8670.0,0.0,...|        0|
+--------------------+---------+
only showing top 5 rows



In [37]:
testdf.count()

233

## 4. **Model Oluşturma:**

In [53]:
from pyspark.ml.classification import GBTClassifier

In [54]:
gbt = GBTClassifier(maxIter=70,maxDepth=20,featuresCol="features",labelCol="churn_int")

In [55]:
gbt_model = gbt.fit(traindf)

In [56]:
pred = gbt_model.transform(testdf)

In [57]:
pred.show()

+--------------------+---------+--------------------+--------------------+----------+
|            features|churn_int|       rawPrediction|         probability|prediction|
+--------------------+---------+--------------------+--------------------+----------+
|[26.0,8787.0,1.0,...|        1|[2.05777319966183...|[0.98394494827477...|       0.0|
|[28.0,9090.0,1.0,...|        0|[-2.0577731996618...|[0.01605505172522...|       1.0|
|[28.0,11204.0,0.0...|        0|[2.05777319966183...|[0.98394494827477...|       0.0|
|[28.0,11245.0,0.0...|        0|[2.05777319966183...|[0.98394494827477...|       0.0|
|[29.0,9617.0,0.0,...|        0|[2.05777319966183...|[0.98394494827477...|       0.0|
|[29.0,10203.0,1.0...|        0|[2.05777319966183...|[0.98394494827477...|       0.0|
|[29.0,11274.0,1.0...|        0|[2.05777319966183...|[0.98394494827477...|       0.0|
|[30.0,6744.0,0.0,...|        0|[-2.0577731996618...|[0.01605505172522...|       1.0|
|[30.0,8403.0,1.0,...|        0|[2.05777319966183...|[

## 5. **Model Değerlendirmesi:**

In [60]:
from pyspark.ml.evaluation import  MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="churn_int",
                                              predictionCol="prediction",
                                              metricName="accuracy")

In [61]:
accuracy=evaluator.evaluate(pred)

In [62]:
accuracy

0.8583690987124464

## 6. **Model Doğrulama:**

Bu kod, çapraz doğrulama için bir Gradient Boosted Trees sınıflandırma modeli oluştururken denenecek olan farklı "maxIter" ve "maxDepth" parametre kombinasyonlarını belirler. Bu kombinasyonlar çapraz doğrulama işlemi sırasında modelin farklı parametre ayarlarıyla performansını değerlendirmenizi sağlar.

In [64]:
gbt_cv=GBTClassifier(featuresCol="features",labelCol="churn_int")

from pyspark.ml.tuning import ParamGridBuilder
paramGrid = ParamGridBuilder() \
      .addGrid(gbt.maxIter, [10,20]) \
      .addGrid(gbt.maxDepth,[5,10]) \
      .build()

In [66]:
from pyspark.ml.tuning import CrossValidator

cv=CrossValidator(estimator=gbt_cv,
                  estimatorParamMaps=paramGrid,
                  evaluator=evaluator,
                  numFolds=2)

In [67]:
cvModel=cv.fit(traindf)

bestModel = cvModel.bestModel

In [68]:
pred2 = bestModel.transform(testdf)

accuracy = evaluator.evaluate(pred2)

print("Test verilerinin doğruluğu: ",accuracy)

Test verilerinin doğruluğu:  0.8497854077253219


## 7. **Sonuçların Sunumu:**

Sonuç olarak bu proje kapsamında, PySpark kullanarak büyük veri üzerinde churn tahmini gerçekleştiren bir makine öğrenimi modeli oluşturduk. Veriyi okuma, ön işleme, özellik mühendisliği, model oluşturma ve hiperparametre ayarı gibi adımları takip ettik. Oluşturduğumuz Gradient Boosted Trees (GBT) sınıflandırma modelini farklı hiperparametre kombinasyonlarıyla çapraz doğrulama yöntemiyle değerlendirdik. Sonuçlarımızı MulticlassClassificationEvaluator kullanarak doğruluk metriği üzerinden değerlendirdik. Bu proje, büyük veri setleri üzerinde PySpark'ın yeteneklerini kullanarak veri analizi ve makine öğrenimi uygulamalarını gerçekleştirebileceğimizi gösterdi. Projede edindiğimiz deneyimle, gelecekteki büyük veri projelerine daha iyi yaklaşabileceğimiz ve daha gelişmiş analizler gerçekleştirebileceğimiz bir temel oluşturduk.

In [None]:
# Eğitilen GBT modelini seçip
trained_model = cvModel.bestModel

# Modeli "11-Spark Big Data Machine Learning" adlı dosyaya kaydet
saved_model = "Project Based Learning Level-2/11-Spark Big Data Machine Learning/my_model"
trained_model.save(saved_model)

# Kaydedilen modeli yüklemek için:
# loaded_model = GBTClassifierModel.load(saved_model)