https://github.com/veribilimiokulu/udemy-apache-spark/blob/master/kurs_hazirlik/pyspark/MachineLearning/Preprocessing/PreprocessOps.ipynb

In [13]:
!pip install findspark

import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark import SparkContext


sc = SparkContext('local')
spark = SparkSession(sc)
sc




In [14]:
#simpledata veri seti üzerinde çalıştık
df=spark.read.csv('simpledata.csv',header=True , inferSchema = True, sep = ",")
df.toPandas().head()

Unnamed: 0,sirano,isim,yas,meslek,sehir,aylik_gelir
0,1,Cemal,35,Isci,Ankara,3500
1,2,Ceyda,42,Memur,Kayseri,4200
2,3,Timur,30,MÃ¼zisyen,Istanbul,9000
3,4,Burcu,29,Pazarlamaci,Ankara,4200
4,5,Yasemin,23,Pazarlamaci,Bursa,4800


yeni bir daha nitelik daha ekleyip, geliri 7000 tl'nin altı ve üstü olmak üzere iki yeni sınıf oluşturacağız. bu sınfların adları iyi ve kötü olacak

In [15]:
from pyspark.sql.functions import *

# withcolumn yeni bir sütun ekler

df1 = df.withColumn("ekonomik_durum",
when(col("aylik_gelir") >= 7000, "iyi").otherwise("kötü")
)

df1.toPandas().head()

Unnamed: 0,sirano,isim,yas,meslek,sehir,aylik_gelir,ekonomik_durum
0,1,Cemal,35,Isci,Ankara,3500,kötü
1,2,Ceyda,42,Memur,Kayseri,4200,kötü
2,3,Timur,30,MÃ¼zisyen,Istanbul,9000,iyi
3,4,Burcu,29,Pazarlamaci,Ankara,4200,kötü
4,5,Yasemin,23,Pazarlamaci,Bursa,4800,kötü


## 1. String Indexer AŞAMASI

In [16]:
# Stringindexer kategorik verileri string ifadeden kurtarıp, 0'den... n'e kadar numerik olacak şekilde
#en çok tekrarlanan "0" en az tekrarlanan "n" en yüksek rakamı alır
# index numaralara dönüştürür.

In [29]:
#kütüphaneleri yükleyelim
from pyspark.ml.feature import StringIndexer,  VectorAssembler, StandardScaler,OneHotEncoder

### meslek sütunu için StringIndexer

In [20]:
meslek_indexer = StringIndexer() \
.setInputCol("meslek") \            # input olarak baz alınacak sütun seçildi
.setOutputCol("meslek_index") \     # output olarak çıkacak yeni sütunun adı verildi
.setHandleInvalid("skip") 

#meslek indexer nesnesini aldık fit metodunu çalıştırmak üzere içinde df'1imizi aldık 
#bu modeli transform edip yeni bir df oluşturacağız meslek_indexer_df adında
meslek_indexer_model = meslek_indexer.fit(df1)
meslek_idexer_df = meslek_indexer_model.transform(df1)

In [22]:
meslek_idexer_df.toPandas().head(15)

Unnamed: 0,sirano,isim,yas,meslek,sehir,aylik_gelir,ekonomik_durum,meslek_index
0,1,Cemal,35,Isci,Ankara,3500,kötü,5.0
1,2,Ceyda,42,Memur,Kayseri,4200,kötü,0.0
2,3,Timur,30,MÃ¼zisyen,Istanbul,9000,iyi,1.0
3,4,Burcu,29,Pazarlamaci,Ankara,4200,kötü,2.0
4,5,Yasemin,23,Pazarlamaci,Bursa,4800,kötü,2.0
5,6,Ali,33,Memur,Ankara,4250,kötü,0.0
6,7,Dilek,29,Pazarlamaci,Istanbul,7300,iyi,2.0
7,8,Murat,31,MÃ¼zisyen,Istanbul,12000,iyi,1.0
8,9,Ahmet,33,Doktor,Ankara,18000,iyi,3.0
9,10,Muhittin,46,Berber,Istanbul,12000,iyi,4.0


In [23]:
# grupby yaparak bu yeni meslek_index sütunun içindeki tekrarlanan sıklık sayılarını alalım :
df1.groupBy(col("meslek")) \
.agg(count("*").alias("sayi")) \
.sort(desc("sayi")) \
.toPandas().head(10)


Unnamed: 0,meslek,sayi
0,Memur,3
1,Pazarlamaci,3
2,MÃ¼zisyen,3
3,Doktor,2
4,Isci,1
5,Tuhafiyeci,1
6,Berber,1
7,TornacÄ±,1


### Aynısını şimdi sehir sütunu için kullanarak StringIndexer yapalım

In [25]:
sehir_indexer = StringIndexer() \
.setInputCol("sehir") \
.setOutputCol("sehir_index")

sehir_indexer_model = sehir_indexer.fit(meslek_idexer_df)
sehir_indexer_df = sehir_indexer_model.transform(meslek_idexer_df)

In [27]:
sehir_indexer_df.toPandas().head(5)

Unnamed: 0,sirano,isim,yas,meslek,sehir,aylik_gelir,ekonomik_durum,meslek_index,sehir_index
0,1,Cemal,35,Isci,Ankara,3500,kötü,5.0,0.0
1,2,Ceyda,42,Memur,Kayseri,4200,kötü,0.0,3.0
2,3,Timur,30,MÃ¼zisyen,Istanbul,9000,iyi,1.0,1.0
3,4,Burcu,29,Pazarlamaci,Ankara,4200,kötü,2.0,0.0
4,5,Yasemin,23,Pazarlamaci,Bursa,4800,kötü,2.0,2.0


### Şehir Frekansları

In [28]:
df1.groupBy("sehir") \
.agg(count("sehir").alias("sayi")) \
.sort(desc("sayi")) \
.toPandas().head(10)

Unnamed: 0,sehir,sayi
0,Ankara,7
1,Istanbul,4
2,Kayseri,1
3,Bursa,1
4,Ã‡orum,1
5,Ä°zmir,1


## 2. OneHotEncoderEstimator AŞAMASI

One-Hot,kategorik değişkenleri yeni bir sütun içinde "0" ya da "1" olacak şekilde nümerik olarak kodlamamızı sağlar

In [33]:
encoder = OneHotEncoder() \
.setInputCols(["meslek_index","sehir_index"]) \
.setOutputCols(["meslek_encoded","sehir_encoded"])

In [34]:
encoder_model = encoder.fit(sehir_indexer_df)
encoder_df = encoder_model.transform(sehir_indexer_df)

In [35]:
encoder_df.toPandas().head(5)

Unnamed: 0,sirano,isim,yas,meslek,sehir,aylik_gelir,ekonomik_durum,meslek_index,sehir_index,meslek_encoded,sehir_encoded
0,1,Cemal,35,Isci,Ankara,3500,kötü,5.0,0.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0)","(1.0, 0.0, 0.0, 0.0, 0.0)"
1,2,Ceyda,42,Memur,Kayseri,4200,kötü,0.0,3.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 1.0, 0.0)"
2,3,Timur,30,MÃ¼zisyen,Istanbul,9000,iyi,1.0,1.0,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(0.0, 1.0, 0.0, 0.0, 0.0)"
3,4,Burcu,29,Pazarlamaci,Ankara,4200,kötü,2.0,0.0,"(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0, 0.0)"
4,5,Yasemin,23,Pazarlamaci,Bursa,4800,kötü,2.0,2.0,"(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 1.0, 0.0, 0.0)"


## 3. VectorAssembler AŞAMASI

bütün girdi nitelikleri bir vektör haline getirip bit sütun içinde toplayacağız 

In [39]:
# girdi sütünlarını ve oluşacak yeni çıktı sütununu tanımlıyoruz
assembler = VectorAssembler() \
.setInputCols(["yas","aylik_gelir","meslek_encoded","sehir_encoded"]) \
.setOutputCol("vectorized_features")
#burada fit itme işlemi kullanmadık çünkü bütün input sütunlarımız nümerik, yani öğrenecek bir şey yok

In [40]:
# Pandas dataframe head() metodundan satır truncate'i kaldırmak için 
import pandas as pd
pd.set_option('display.max_colwidth', -1)

In [41]:
#artık  "vector_df" adinda yeni bir dataframe'i oluşturuyoruz transform diyerek
vector_df = assembler.transform(encoder_df)
# oluşan yeni dataframe'in genişliği çok fazla olduğu için 
#ekranda rahat gözüksün diye  select ile sadece vectorized_features adındaki output column'ı seçtik 
vector_df.select("vectorized_features").toPandas().head(5)

Unnamed: 0,vectorized_features
0,"(35.0, 3500.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0)"
1,"(42.0, 4200.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0)"
2,"(30.0, 9000.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0)"
3,"(29.0, 4200.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0)"
4,"(23.0, 4800.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0)"


# 4. LabelIndexer AŞAMASI

hedef değişkenimizi artık numerik hale getiriyoruz . hedef değişken genel olarak label indexer ile nümerik hale getirilyor


In [44]:
# ilk önce nesne oluşturuyoruz. input ve output belirliyoruz 
label_indexer = StringIndexer() \
.setInputCol("ekonomik_durum") \
.setOutputCol("label")  # genelde label veriliyor onun için "label" adı verdik

In [45]:
# model oluşturup fit ve transform edeceğiz
# fit etmemiz gerekiyor çünkü bir estimator olarak hesaplayacağı bir veri var.

label_indexer_model = label_indexer.fit(vector_df)
label_indexer_df = label_indexer_model.transform(vector_df)

In [46]:
# oluşan yeni dataframe'in genişliği çok fazla olduğu için 
#ekranda rahat gözüksün diyeselect ile sadece vectorized_features ve label adındaki output column'ı seçtik 

label_indexer_df.select("vectorized_features","label").toPandas().head(5)

Unnamed: 0,vectorized_features,label
0,"(35.0, 3500.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0)",0.0
1,"(42.0, 4200.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0)",0.0
2,"(30.0, 9000.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0)",1.0
3,"(29.0, 4200.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0)",0.0
4,"(23.0, 4800.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0)",0.0


# 5. StandardScaler ile Standardizasyon AŞAMASI

In [47]:
# nitelikleri aynı ölçeklere yaklaştırmazsak , büyük ölçekli veri örneğin maaş; küçük öçekli örneğin yaş değişkenine
#baskın gelebilir

In [48]:
scaler = StandardScaler() \
.setInputCol("vectorized_features") \ # önceden vektörize ettiğmiz bağımlı değişkenleri aldık
.setOutputCol("features") # genelde "features" adı veriliyor

In [49]:
#nesneden model oluşturup içine dataframe koyalım.
scaler_model = scaler.fit(label_indexer_df)
scaler_df = scaler_model.transform(label_indexer_df)

In [50]:
scaler_df.select("features","label").toPandas().head(5)

Unnamed: 0,features,label
0,"(5.0082809740601215, 0.7723249167865694, 0.0, 0.0, 0.0, 0.0, 0.0, 3.8729833462074175, 0.0, 1.9364916731037087, 0.0, 0.0, 0.0, 0.0)",0.0
1,"(6.009937168872146, 0.9267899001438834, 2.41522945769824, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 3.8729833462074175, 0.0)",0.0
2,"(4.292812263480104, 1.9859783574511787, 0.0, 2.41522945769824, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 2.1846572437632577, 0.0, 0.0, 0.0)",1.0
3,"(4.149718521364101, 0.9267899001438834, 0.0, 0.0, 2.41522945769824, 0.0, 0.0, 0.0, 0.0, 1.9364916731037087, 0.0, 0.0, 0.0, 0.0)",0.0
4,"(3.29115606866808, 1.0591884573072952, 0.0, 0.0, 2.41522945769824, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 3.8729833462074175, 0.0, 0.0)",0.0


# 6. Train-Test Ayırma AŞAMASI

In [51]:
train_df, test_df = scaler_df.randomSplit([0.8,0.2], seed=142)

In [52]:
train_df.toPandas().head(5)

Unnamed: 0,sirano,isim,yas,meslek,sehir,aylik_gelir,ekonomik_durum,meslek_index,sehir_index,meslek_encoded,sehir_encoded,vectorized_features,label,features
0,1,Cemal,35,Isci,Ankara,3500,kötü,5.0,0.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0)","(1.0, 0.0, 0.0, 0.0, 0.0)","(35.0, 3500.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0)",0.0,"(5.0082809740601215, 0.7723249167865694, 0.0, 0.0, 0.0, 0.0, 0.0, 3.8729833462074175, 0.0, 1.9364916731037087, 0.0, 0.0, 0.0, 0.0)"
1,2,Ceyda,42,Memur,Kayseri,4200,kötü,0.0,3.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 1.0, 0.0)","(42.0, 4200.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0)",0.0,"(6.009937168872146, 0.9267899001438834, 2.41522945769824, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 3.8729833462074175, 0.0)"
2,3,Timur,30,MÃ¼zisyen,Istanbul,9000,iyi,1.0,1.0,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(0.0, 1.0, 0.0, 0.0, 0.0)","(30.0, 9000.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0)",1.0,"(4.292812263480104, 1.9859783574511787, 0.0, 2.41522945769824, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 2.1846572437632577, 0.0, 0.0, 0.0)"
3,5,Yasemin,23,Pazarlamaci,Bursa,4800,kötü,2.0,2.0,"(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 1.0, 0.0, 0.0)","(23.0, 4800.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0)",0.0,"(3.29115606866808, 1.0591884573072952, 0.0, 0.0, 2.41522945769824, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 3.8729833462074175, 0.0, 0.0)"
4,6,Ali,33,Memur,Ankara,4250,kötü,0.0,0.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0, 0.0)","(33.0, 4250.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0)",0.0,"(4.722093489828115, 0.9378231132408343, 2.41522945769824, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.9364916731037087, 0.0, 0.0, 0.0, 0.0)"


In [53]:
test_df.toPandas().head(5)

Unnamed: 0,sirano,isim,yas,meslek,sehir,aylik_gelir,ekonomik_durum,meslek_index,sehir_index,meslek_encoded,sehir_encoded,vectorized_features,label,features
0,4,Burcu,29,Pazarlamaci,Ankara,4200,kötü,2.0,0.0,"(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0, 0.0)","(29.0, 4200.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0)",0.0,"(4.149718521364101, 0.9267899001438834, 0.0, 0.0, 2.41522945769824, 0.0, 0.0, 0.0, 0.0, 1.9364916731037087, 0.0, 0.0, 0.0, 0.0)"
1,9,Ahmet,33,Doktor,Ankara,18000,iyi,3.0,0.0,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0, 0.0)","(33.0, 18000.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0)",1.0,"(4.722093489828115, 3.9719567149023574, 0.0, 0.0, 0.0, 2.8419928002940256, 0.0, 0.0, 0.0, 1.9364916731037087, 0.0, 0.0, 0.0, 0.0)"
2,11,Hicaziye,47,Tuhafiyeci,Ankara,4800,kötü,7.0,0.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0, 0.0)","(47.0, 4800.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0)",0.0,"(6.725405879452164, 1.0591884573072952, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.9364916731037087, 0.0, 0.0, 0.0, 0.0)"
3,15,Åehmuz,41,MÃ¼zisyen,Ankara,8700,iyi,1.0,0.0,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0, 0.0)","(41.0, 8700.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0)",1.0,"(5.866843426756143, 1.9197790788694726, 0.0, 2.41522945769824, 0.0, 0.0, 0.0, 0.0, 0.0, 1.9364916731037087, 0.0, 0.0, 0.0, 0.0)"


# 7. Basit Bir Makine Öğrenmesi Modeli

In [54]:
# logistic regression modeli indiriyoruz
from pyspark.ml.classification import LogisticRegression

In [62]:
# logistic regresyon nesnesi oluşturuyoruz, 3 temel parametresini giriyoruz. sırasıyla : girdi verisi-hedef değişken-tahmin
lr_object = LogisticRegression() \
.setFeaturesCol("features") \
.setLabelCol("label") \
.setPredictionCol("prediction")

In [63]:
# lineer regresyon modeli oluşturuyoruz, train data frame'e göre
lr_model = lr_object.fit(train_df)

In [64]:
# test data frame'i tanıtıyoruz
result_df = lr_model.transform(test_df)

In [65]:
#sonuç ekranı
result_df.select("label","prediction").toPandas().head()

Unnamed: 0,label,prediction
0,0.0,0.0
1,1.0,1.0
2,0.0,0.0
3,1.0,0.0


In [66]:
# basit ,sade ve ayırma kuralımız sağlam olduğu için bütün hepsini doğru bildi. basit olduğu için ezberleme yok