In [0]:
from pyspark.sql import SparkSession

In [0]:
# builder ile oluşturduk. appName() ile isim verdik. getOrCreate OOP'den gelen bir özelliktir. en son da spark adında bir değişkene atadık.
spark = SparkSession.builder.appName("Spark Çalışması").getOrCreate()

In [0]:
# spark bilgileri.
spark

In [0]:
# delimiterle verideki virgül ve noktalı virgülle ayrılmış verileri ayırdık. Her bir ayarlama için tek tek option kullanmak gerekiyor. inferSchema ile şema oluşturur.
rawDS = spark.read.option("delimiter",";").option("header",True).option("inferSchema",True).csv("/FileStore/tables/market2.csv")

In [0]:
# veri setini gösteriyor.
rawDS.show()

In [0]:
# şemayı gösteriyor.
rawDS.printSchema()

In [0]:
rawDS.show()

In [0]:
# veri setinden seçtiğimiz verilerle kendimize selectedDS adında bir dataset oluşturduk.
selectedDS = rawDS.select("ITEMNAME","FICHENO","LINENETTOTAL","CITY","CLIENTCODE","CLIENTNAME")

In [0]:
selectedDS.show()

In [0]:
# selectedDS'i CITY'e göre grupla. count ise hangi şehirden kaç satış olmuş bunu gösterir.
cityByCountDS = selectedDS.groupBy("CITY").count()

In [0]:
cityByCountDS.show()

In [0]:
# selectedDS deki CLIENTNAME ile countu sırala.
selectedDS.groupBy("CLIENTNAME").count().show()

In [0]:
# değişken atadıysak show() kullanamayız.
renamedDS = cityByCountDS.withColumnRenamed("count","CITY_COUNT").show()

In [0]:
renamedDS = cityByCountDS.withColumnRenamed("count","CITY_COUNT")

In [0]:
# CITY_COUNT > 1000 olanları filtreleyip göster.
renamedDS.filter("CITY_COUNT > 1000").show()

In [0]:
# yukarıdakinin farklı gösterilmiş hali.
renamedDS.filter(renamedDS['CITY_COUNT']>1000).show()

In [0]:
renamedDS.filter("CITY_COUNT > 1000 and CITY_COUNT < 1500").show()

In [0]:
renamedDS.filter("CITY_COUNT > 1000 or CITY_COUNT < 1500").show()

In [0]:
# bir müşterinin datadaki tüm alışverişlerini sum ile topladık. ascending = False ile b-k sıraladık.clientCodeTotalDS
clientCodeTotalDS = rawDS.groupBy("CLIENTCODE").sum("LINENETTOTAL").orderBy("sum(LINENETTOTAL)",ascending = False)

In [0]:
# round yuvarlar. * tüm functionları yükler.
from pyspark.sql.functions import *
clientCodeTotalDS.withColumn("TOTAL_SPEND",round("sum(LINENETTOTAL)",2))

In [0]:
# yeni bir sütun ekleme
finalDS = clientCodeTotalDS.withColumn("TOTAL_SPEND",round("sum(LINENETTOTAL)",2)).drop("sum(LINENETTOTAL)")

In [0]:
finalDS.orderBy("TOTAL_SPEND", ascending=False).show(5)

In [0]:
# müşterilerden ilk 5 ini b-k sıraladık
customerTop5 = finalDS.orderBy("TOTAL_SPEND", ascending=False)

In [0]:
# null değerleri attık.
customerTop5.na.drop().show(5)

In [0]:
# null kısımlara farklı değerler atadık.
customerTop5.na.fill({"CLIENTCODE":-1,"TOTAL_SPEND":100}).show(5)

In [0]:
rawDS.createOrReplaceTempView("MARKET")

In [0]:
%sql
SELECT CLIENTCODE,COUNT(*) AS CLIENT_COUNT FROM MARKET GROUP BY CLIENTCODE
-- sql ile verileri görselleştirdik.

CLIENTCODE,CLIENT_COUNT
835106,2
274770,1
956546,1
534324,1
819303,5
319929,1
875159,1
734536,4
1044242,2
776188,1


In [0]:
cityByCountDS.show()

In [0]:
# pysparkla sql kullanmadan görselleştirdik.
display(cityByCountDS)

CITY,count
Isparta,139
Edirne,49
Burdur,64
Muş,96
Adana,439
Batman,104
Nevşehir,51
Aksaray,120
Sivas,119
Ordu,219
