In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline, PipelineModel

In [2]:
spark = SparkSession.builder \
.appName("PipelineOps") \
.master("local[2]") \
.config("spark.executor.memory","4g") \
.config("spark.driver.memory","2g") \
.getOrCreate()

In [3]:
df = spark.read \
.option("header","True") \
.option("sep",",") \
.option("inferSchema","True") \
.format("csv") \
.load("Q:\\SparkDatasets\\simpledataset.csv")

In [4]:
df.toPandas().head()

Unnamed: 0,sirano,isim,yas,meslek,sehir,aylik_gelir
0,1,Cemal,35,Isci,Ankara,3500
1,2,Ceyda,42,Memur,Kayseri,4200
2,3,Timur,30,Muzisyen,Istanbul,9000
3,4,Burcu,29,Pazarlamaci,Ankara,4200
4,5,Yasemin,23,Pazarlamaci,Bursa,4800


**ADDING TAG TO DATASETS**

In [23]:
df1 = df.withColumn("ekonomik_durum",
    F.when(F.col("aylik_gelir") > 7000, "iyi").otherwise("kötü")
)

**SEPERATE DATASET AS TRAIN AND TEST**

In [24]:
train_df, test_df = df1.randomSplit([0.8, 0.2], seed=142)

 **CREATING OBJECTS FOR PIPELINE**

In [25]:
meslek_indexer = StringIndexer() \
.setInputCol("meslek") \
.setOutputCol("meslek_index") \
.setHandleInvalid("skip")

//StringIndexer ile "meslek" columns 'u için Indexleme işlmemi yapıyoruz.

// setHandleInvalid kullanma amacımız train ile test arasınd Hollanda ile ilgili kısımları silmiştik dolayısı ile train kısımında gördüğü veriyi test kısmında göremezse hata veriyor spark biz setHandleInvalid in default değeri olan "error" dan "skip" e çekiyoruz şimdilik bunu aslında data cleaning preparing kısımında halletmeliyiz.

In [26]:
sehir_indexer = StringIndexer() \
.setInputCol("sehir") \
.setOutputCol("sehir_index") \
.setHandleInvalid("skip")

//StringIndexer ile "sehir" columns 'u için Indexleme işlmemi yapıyoruz.

In [27]:
encoder = OneHotEncoderEstimator() \
.setInputCols(["meslek_index","sehir_index"]) \
.setOutputCols(["meslek_encoded","sehir_encoded"])

//OneHotEncoder tüm String nitelikleri alıyor topluca kodluyordu.

In [28]:
assembler = VectorAssembler() \
.setInputCols(["yas","aylik_gelir","meslek_encoded","sehir_encoded"]) \
.setOutputCol("vectorized_features")

// VectorAssembler da bütün kategorik nitelikleri ve nümerik nitelikleri vektor haline getirip bir araya topluyordu.

In [29]:
label_indexer = StringIndexer() \
.setInputCol("ekonomik_durum") \
.setOutputCol("label")

 // label_indexer ise hedef değişkenimiz olan "ekonomik durum" u nümerik hale getiriyor.

In [30]:
scaler = StandardScaler() \
.setInputCol("vectorized_features") \
.setOutputCol("features")

 // StandardScaler ile ise birbirinden çok farklı ölçekleri aynı düzeye indiriyordu.

In [31]:
lr_object = LogisticRegression() \
.setFeaturesCol("features") \
.setLabelCol("label") \
.setPredictionCol("prediction")

**Pipeline**

In [32]:
pipeline_nesnesi = Pipeline() \
.setStages([meslek_indexer,sehir_indexer,encoder,assembler,label_indexer,scaler,lr_object])

In [33]:
pipeline_modeli = pipeline_nesnesi.fit(train_df)

In [34]:
pipeline_modeli.transform(test_df).select("label","prediction").toPandas().head()

Unnamed: 0,label,prediction
0,1.0,1.0
1,0.0,0.0
2,1.0,1.0
