[Index]
* [Create DataFrame From List](#1)
* [Dosyadan veri okuyarak Dataframe oluşturmak](#2)
* [WordCount](#3)
* [Csv Dosyasindan SQL almak](#4)
* [String Operations](#5)
    * [Concat](#6)
    * [Number Format](#7)
    * [3. lower, initcap, length](#8)
    * [4. Trim](#9)
    * [5.Replace and Split](#10)
    
* [Pyspark Df to Disk](#11)

* [Manual Schema](#12)

* [Date Time Operations](#13)
* [PIVOT TABLE](#14)
* [DF to Write Kafka](#15)

# Create DataFrame From List<a class="anchor" id="1"></a>

In [1]:
import findspark
findspark.init("/Users/resitkadir/spark/spark-2.4.6/")
#spark session olusturuyoruz
from pyspark.sql import SparkSession

# Aşağıdaki ayarları bilgisayarınızın belleğine göre değiştirebilirsiniz
spark = SparkSession.builder \
        .master("local[4]") \
        .appName("Create-Dataframe") \
        .config("spark.executor.memory","6g") \
        .config("spark.driver.memory","2g") \
        .getOrCreate()

# sparkContext'i kısaltmada tut
sc = spark.sparkContext

In [2]:
#Row rdd yapiyoruz 
#x i al row yap
from pyspark.sql import Row
list_rdd = sc.parallelize([1,2,3,4,5,6,4,5]).map(lambda x: Row(x))

In [3]:
# Sütun ismi bir tane bile olsa Python listesi olarak parametre verilir
#dataframe ceviriyoruz burada
df_from_list = list_rdd.toDF(['rakamlar'])

In [4]:
df_from_list.show()

+--------+
|rakamlar|
+--------+
|       1|
|       2|
|       3|
|       4|
|       5|
|       6|
|       4|
|       5|
+--------+



In [5]:
# range ile dataframe yaratmak
#-1

df_from_range = sc.parallelize(range(10,100,5)). \
                map(lambda x: (x,)). \
                toDF(["range"])
#lambda x sonucu range basligi altina at
df_from_range.show(4)

+-----+
|range|
+-----+
|   10|
|   15|
|   20|
|   25|
+-----+
only showing top 4 rows



In [6]:
#yontem-2
from pyspark.sql.types import IntegerType
#spark session ile yapalim ()
df_from_range2 =spark.createDataFrame(range(10,100,5), IntegerType())
df_from_range2.show(3)

+-----+
|value|
+-----+
|   10|
|   15|
|   20|
+-----+
only showing top 3 rows



# Dosyadan veri okuyarak Dataframe oluşturmak <a class="anchor" id="2"></a>

In [7]:
df_from_file = spark.read.csv("datasets/OnlineRetail.csv")
df_from_file.show(3)


+--------------------+
|                 _c0|
+--------------------+
|InvoiceNo;StockCo...|
|536365;85123A;WHI...|
|536365;71053;WHIT...|
+--------------------+
only showing top 3 rows



In [8]:
df_from_file =  spark.read. \
                option("sep",";"). \
                csv("datasets/OnlineRetail.csv")

df_from_file.show(3)

+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|      _c0|      _c1|                 _c2|     _c3|            _c4|      _c5|       _c6|           _c7|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|
|   536365|   85123A|WHITE HANGING HEA...|       6|1.12.2010 08:26|     2,55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|1.12.2010 08:26|     3,39|     17850|United Kingdom|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
only showing top 3 rows



In [9]:
df_from_file.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)



In [10]:
df_from_file =  spark.read. \
                option("sep",";"). \
                option("header","True"). \
                option("inferSchema","True"). \
                csv("datasets/OnlineRetail.csv")
                #interScheman veri turleri duzelltti
df_from_file.show(3)

+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|1.12.2010 08:26|     2,55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|1.12.2010 08:26|     3,39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|1.12.2010 08:26|     2,75|     17850|United Kingdom|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
only showing top 3 rows



In [11]:
#print schema ,dataframe rdd farkli oldugu icin, en onemli farki bu
df_from_file.printSchema()
#veri turleri hepsi string

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: string (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



In [12]:
#Action yapalim
df_from_file.count()

541909

In [13]:
#select ile islem yapalim
df_from_file.select("InvoiceNo","StockCode").show(10)

+---------+---------+
|InvoiceNo|StockCode|
+---------+---------+
|   536365|   85123A|
|   536365|    71053|
|   536365|   84406B|
|   536365|   84029G|
|   536365|   84029E|
|   536365|    22752|
|   536365|    21730|
|   536366|    22633|
|   536366|    22632|
|   536367|    84879|
+---------+---------+
only showing top 10 rows



In [14]:
#sort
df_from_file.sort("InvoiceNo").show(10)

+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|   536365|    71053| WHITE METAL LANTERN|       6|1.12.2010 08:26|     3,39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|1.12.2010 08:26|     3,39|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|1.12.2010 08:26|     4,25|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|1.12.2010 08:26|     3,39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|1.12.2010 08:26|     2,75|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|1.12.2010 08:26|     7,65|     17850|United Kingdom|
|   536365|   85123A|WHITE HANGING HEA...|       6|1.12.2010 08:

In [15]:
df_from_file.sort("Quantity").explain()
#spark in neler yapdigihi explain ediyor 

== Physical Plan ==
*(2) Sort [Quantity#107 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(Quantity#107 ASC NULLS FIRST, 200)
   +- *(1) FileScan csv [InvoiceNo#104,StockCode#105,Description#106,Quantity#107,InvoiceDate#108,UnitPrice#109,CustomerID#110,Country#111] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/resitkadir/Desktop/pyspark_intro/Final/datasets/OnlineRetail.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<InvoiceNo:string,StockCode:string,Description:string,Quantity:int,InvoiceDate:string,UnitP...


In [16]:
# Dinamik conf ayarı ve shuffle partition sayısını değiştirme
spark.conf.set("spark.sql.shuffle.partitions","5")

In [17]:
# Yeni conf ile sort
df_from_file.select("Description","Quantity").sort("Quantity").explain()

== Physical Plan ==
*(2) Sort [Quantity#107 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(Quantity#107 ASC NULLS FIRST, 5)
   +- *(1) FileScan csv [Description#106,Quantity#107] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/resitkadir/Desktop/pyspark_intro/Final/datasets/OnlineRetail.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Description:string,Quantity:int>


In [18]:
# Dinamik conf ayarı ve shuffle partition sayısını değiştirme
spark.conf.set("spark.sql.shuffle.partitions","5")
# Yeni conf ile sort
df_from_file.select("Description","Quantity").sort("Quantity").explain()

== Physical Plan ==
*(2) Sort [Quantity#107 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(Quantity#107 ASC NULLS FIRST, 5)
   +- *(1) FileScan csv [Description#106,Quantity#107] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/resitkadir/Desktop/pyspark_intro/Final/datasets/OnlineRetail.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Description:string,Quantity:int>


In [19]:
sc.stop()

# Word Count <a class="anchor" id="3"></a>

In [20]:
import findspark
findspark.init()
# Create SparkContext
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

# Aşağıdaki ayarları bilgisayarınızın belleğine göre değiştirebilirsiniz
spark = SparkSession.builder \
        .master("local[4]") \
        .appName("Dataframe-WordCount") \
        .config("spark.executor.memory","6g") \
        .config("spark.driver.memory","2g") \
        .getOrCreate()

# sparkContext'i kısaltmada tut
sc = spark.sparkContext

veri_dosyasi = "datasets/omer_seyfettin_forsa_hikaye.txt"
hikaye_df = spark.read.text(veri_dosyasi)
#text , textfile degil

hikaye_df.show(3,truncate=False)

+--------------------------------------------------------------------------------------+
|value                                                                                 |
+--------------------------------------------------------------------------------------+
|Ömer Seyfettin -         Forsa                                                        |
|                                                                                      |
|Akdeniz’in, kahramanlık yuvası sonsuz ufuklarına bakan küçük tepe, minimini bir çiçek |
+--------------------------------------------------------------------------------------+
only showing top 3 rows



**RDD'de flatMap ile boşluklardan kelimeleri ayırıp aşağı atıyorduk.** 

**Yapısal API'de benzer işi sql explode fonksiyonu ile yapıyoruz.**

In [21]:
from pyspark.sql.functions import explode, split, col
# Her bir kelimeyi boşluklarla ayıralım ve başka bir rdd'de tutalım
kelimeler = hikaye_df.select(explode(split(col("value"), " ")).alias("value"))
#explode ile secim yapdik,icine col ile value yazdik,split ilede split fonk ile yazdik,boslukla ayir
#alias(as demek numpy as gibi) select in icinde olmasi lazim, ve isim ver
kelimeler.show(5)

+---------+
|    value|
+---------+
|     Ömer|
|Seyfettin|
|        -|
|         |
|         |
+---------+
only showing top 5 rows



In [22]:
#grouoby ile sayalim
from pyspark.sql.functions import desc
kelimeler.groupBy("value").count().orderBy(desc("count")).show(10)

+-----+-----+
|value|count|
+-----+-----+
|     |   85|
|  bir|   32|
|    –|   31|
|  yıl|    8|
| diye|    6|
|dedi.|    5|
| Türk|    5|
| Kırk|    5|
| onun|    5|
|doğru|    5|
+-----+-----+
only showing top 10 rows



In [23]:
#ikinci yontem
kelimeler.groupBy("value").agg({"value": "count"}).orderBy(desc("count(value)")).show(10)

+-----+------------+
|value|count(value)|
+-----+------------+
|     |          85|
|  bir|          32|
|    –|          31|
|  yıl|           8|
| diye|           6|
| Kırk|           5|
| Türk|           5|
|dedi.|           5|
| onun|           5|
|doğru|           5|
+-----+------------+
only showing top 10 rows



In [24]:
sc.stop()

## CsvDosyasinaSQLAtmak <a class="anchor" id="4"></a>

In [25]:
#session olustur
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .master("local[4]") \
        .appName("Sql-Olusturmak") \
        .config("spark.executor.memory","4g") \
        .config("spark.driver.memory","2g") \
        .getOrCreate()


#datayi oku,dataframe olusturduk
retailDF =  spark.read \
            .option("header","True") \
            .option("inferSchema","True") \
            .option("sep",";") \
            .csv("datasets/OnlineRetail.csv")

#sanal bir tablo olusturduk
retailDF.createOrReplaceTempView("tablo")


In [26]:
retailDF.limit(5).toPandas().head()

Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
0,536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,1.12.2010 08:26,255,17850,United Kingdom
1,536365,71053,WHITE METAL LANTERN,6,1.12.2010 08:26,339,17850,United Kingdom
2,536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,1.12.2010 08:26,275,17850,United Kingdom
3,536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,1.12.2010 08:26,339,17850,United Kingdom
4,536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,1.12.2010 08:26,339,17850,United Kingdom


In [27]:
#sql yeri actik
spark.sql("""

SELECT*FROM tablo

""").show(3)

+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|1.12.2010 08:26|     2,55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|1.12.2010 08:26|     3,39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|1.12.2010 08:26|     2,75|     17850|United Kingdom|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
only showing top 3 rows



In [28]:
#groupby yapalim
spark.sql("""

SELECT Country ,SUM(UnitPrice) as UnitPrice
FROM tablo
GROUP BY Country
ORDER BY UnitPrice DESC

""").show(3)

+--------------+---------+
|       Country|UnitPrice|
+--------------+---------+
|United Kingdom|  94911.0|
|          EIRE|   9423.0|
|       Germany|   7930.0|
+--------------+---------+
only showing top 3 rows



In [29]:
#eger bir datasetini sik kullaniyorsak cacheleyerek daha hizli islem yapabiliriz, yontemi ise
retailDF.cache()

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: string, UnitPrice: string, CustomerID: int, Country: string]

In [30]:
sc.stop()

# DataFrame String Operations <a class="anchor" id="5"></a>

In [2]:
import findspark
findspark.init("/Users/resitkadir/spark/spark-2.4.6/")
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .master("local[4]") \
        .appName("StringOps") \
        .config("spark.executor.memory","4g") \
        .config("spark.driver.memory","2g") \
        .getOrCreate()

simple_df = spark.read \
            .option("header","True") \
            .option("inferSchema","True") \
            .option("sep",",") \
            .csv("datasets/simple_dirty_data.csv")

simple_df.show(3)

+------+------+---+--------+--------+-----------+-----------+---------------+
|sirano|  isim|yas|cinsiyet|  meslek|      sehir|aylik_gelir|       mal_mulk|
+------+------+---+--------+--------+-----------+-----------+---------------+
|     1| Cemal| 35|       E|    Isci|     Ankara|     3500.0|          araba|
|     2|ceyda | 42|       K|   Memur|    Kayseri|     4200.0|       araba|ev|
|     3| Timur| 30|    null|Müzüsyen|Istanbul   |     9000.0|araba|ev|yazlık|
+------+------+---+--------+--------+-----------+-----------+---------------+
only showing top 3 rows



In [5]:
# Bu ders tamamen sql string ops ile ilgili o yüzden hepsini indirelim
#from pyspark.sql.functions import *
import pyspark.sql.functions as F
from pyspark.sql.functions import col

### Concat<a class="anchor" id="6"></a>

In [6]:
#Concat iki stringi birlestiren fonksiyondur

# Meslek ve şehir birleştirme. Araya birşey eklemek için lit() fonksiyonu kullanırız.
#yeni dataframe olustircaz
#meslek ile sehri concat edelim ve yeni sutun yapalaim 
df_concat = simple_df.withColumn("meslek_sehir", F.concat(col("meslek"), F.lit(" - "), col("sehir")))
df_concat.show(n=5, truncate=False)

+------+-------+---+--------+-----------+-----------+-----------+---------------+------------------------+
|sirano|isim   |yas|cinsiyet|meslek     |sehir      |aylik_gelir|mal_mulk       |meslek_sehir            |
+------+-------+---+--------+-----------+-----------+-----------+---------------+------------------------+
|1     |Cemal  |35 |E       |Isci       |Ankara     |3500.0     |araba          |Isci - Ankara           |
|2     |ceyda  |42 |K       |Memur      |Kayseri    |4200.0     |araba|ev       |Memur - Kayseri         |
|3     |Timur  |30 |null    |Müzüsyen   |Istanbul   |9000.0     |araba|ev|yazlık|Müzüsyen - Istanbul     |
|4     |Burcu  |29 |K       |Pazarlamacı|    Ankara |4200.0     |araba          |Pazarlamacı -     Ankara|
|5     |Yasemin|23 |K       |Pazarlamaci|Bursa      |4800.0     |araba          |Pazarlamaci - Bursa     |
+------+-------+---+--------+-----------+-----------+-----------+---------------+------------------------+
only showing top 5 rows



## Number Formats <a class="anchor" id="7"></a>

In [8]:
# numara formatlama,virgulden sonra iki basamak yapalim
#yeni sutun ismi aylik gelir format olsun
df_number_format = simple_df \
                    .withColumn("aylik_gelir_format", F.format_number(col("aylik_gelir"), 2))

df_number_format.show(n=10, truncate=False)

+------+--------+---+--------+-----------+-----------+-----------+----------------------+------------------+
|sirano|isim    |yas|cinsiyet|meslek     |sehir      |aylik_gelir|mal_mulk              |aylik_gelir_format|
+------+--------+---+--------+-----------+-----------+-----------+----------------------+------------------+
|1     |Cemal   |35 |E       |Isci       |Ankara     |3500.0     |araba                 |3,500.00          |
|2     |ceyda   |42 |K       |Memur      |Kayseri    |4200.0     |araba|ev              |4,200.00          |
|3     |Timur   |30 |null    |Müzüsyen   |Istanbul   |9000.0     |araba|ev|yazlık       |9,000.00          |
|4     |Burcu   |29 |K       |Pazarlamacı|    Ankara |4200.0     |araba                 |4,200.00          |
|5     |Yasemin |23 |K       |Pazarlamaci|Bursa      |4800.0     |araba                 |4,800.00          |
|6     | Ali    |33 |E       |Memur      |Ankara     |4250.0     |ev                    |4,250.00          |
|7     |Dilek   |29

### 3. lower, initcap, length <a class="anchor" id="8"></a>

In [9]:
import pyspark.sql.functions as F

In [10]:
# meslek küçük harf, isimlerin ilk harfi büyük, sehirlerin kelime uzunlukları

df_lower = simple_df \
        .withColumn("meslek_lower", F.lower(col("meslek"))) \
        .withColumn("isim_initcap", F.initcap(col("isim"))) \
        .withColumn("sehir_length", F.length(col("sehir")))

# meslekler kucuk harfli olsun,isimlerinde(ilk harfi buyuk yaz) , ve sheir kelimesini uzunlugu nedir
df_lower.show(n=5, truncate=False)

+------+-------+---+--------+-----------+-----------+-----------+---------------+------------+------------+------------+
|sirano|isim   |yas|cinsiyet|meslek     |sehir      |aylik_gelir|mal_mulk       |meslek_lower|isim_initcap|sehir_length|
+------+-------+---+--------+-----------+-----------+-----------+---------------+------------+------------+------------+
|1     |Cemal  |35 |E       |Isci       |Ankara     |3500.0     |araba          |isci        |Cemal       |6           |
|2     |ceyda  |42 |K       |Memur      |Kayseri    |4200.0     |araba|ev       |memur       |Ceyda       |7           |
|3     |Timur  |30 |null    |Müzüsyen   |Istanbul   |9000.0     |araba|ev|yazlık|müzüsyen    |Timur       |11          |
|4     |Burcu  |29 |K       |Pazarlamacı|    Ankara |4200.0     |araba          |pazarlamacı |Burcu       |10          |
|5     |Yasemin|23 |K       |Pazarlamaci|Bursa      |4800.0     |araba          |pazarlamaci |Yasemin     |5           |
+------+-------+---+--------+---

# 4- Trim <a class="anchor" id="9"></a>

In [11]:
#Trim bosluk olan yerleri bak ankara ornegi ustteki gibi(10)

#rt-rightrim
df_trim = simple_df \
            .withColumn("sehir_rtrim", F.rtrim(col("sehir"))) \
            .withColumn("sehir_ltrim", F.ltrim(col("sehir"))) \
            .withColumn("sehir_trim", F.trim(col("sehir")))

df_trim.show(n=5, truncate=False)

+------+-------+---+--------+-----------+-----------+-----------+---------------+-----------+-----------+----------+
|sirano|isim   |yas|cinsiyet|meslek     |sehir      |aylik_gelir|mal_mulk       |sehir_rtrim|sehir_ltrim|sehir_trim|
+------+-------+---+--------+-----------+-----------+-----------+---------------+-----------+-----------+----------+
|1     |Cemal  |35 |E       |Isci       |Ankara     |3500.0     |araba          |Ankara     |Ankara     |Ankara    |
|2     |ceyda  |42 |K       |Memur      |Kayseri    |4200.0     |araba|ev       |Kayseri    |Kayseri    |Kayseri   |
|3     |Timur  |30 |null    |Müzüsyen   |Istanbul   |9000.0     |araba|ev|yazlık|Istanbul   |Istanbul   |Istanbul  |
|4     |Burcu  |29 |K       |Pazarlamacı|    Ankara |4200.0     |araba          |    Ankara |Ankara     |Ankara    |
|5     |Yasemin|23 |K       |Pazarlamaci|Bursa      |4800.0     |araba          |Bursa      |Bursa      |Bursa     |
+------+-------+---+--------+-----------+-----------+-----------

## 5-Replace and Split <a class="anchor" id="10"></a>

In [12]:
df_replace = simple_df \
            .withColumn("sehir_ist", F.regexp_replace(col("sehir"), "Ist", "İST")) \
            .withColumn("mal_mulk_split", F.split(col("mal_mulk"), "\\|")) \
            .withColumn("mal_mulk_ilk_eleman", col("mal_mulk_split")[0])
            
    #sehir_ist diye bir column ac ve Ist yazilan yeri IST ile degistir
    #F split ile split ettik
    # ve ilk elemanlari al
df_replace.show(n=5, truncate=False)

+------+-------+---+--------+-----------+-----------+-----------+---------------+-----------+-------------------+-------------------+
|sirano|isim   |yas|cinsiyet|meslek     |sehir      |aylik_gelir|mal_mulk       |sehir_ist  |mal_mulk_split     |mal_mulk_ilk_eleman|
+------+-------+---+--------+-----------+-----------+-----------+---------------+-----------+-------------------+-------------------+
|1     |Cemal  |35 |E       |Isci       |Ankara     |3500.0     |araba          |Ankara     |[araba]            |araba              |
|2     |ceyda  |42 |K       |Memur      |Kayseri    |4200.0     |araba|ev       |Kayseri    |[araba, ev]        |araba              |
|3     |Timur  |30 |null    |Müzüsyen   |Istanbul   |9000.0     |araba|ev|yazlık|İSTanbul   |[araba, ev, yazlık]|araba              |
|4     |Burcu  |29 |K       |Pazarlamacı|    Ankara |4200.0     |araba          |    Ankara |[araba]            |araba              |
|5     |Yasemin|23 |K       |Pazarlamaci|Bursa      |4800.0   

In [13]:
df_replace.printSchema()

root
 |-- sirano: integer (nullable = true)
 |-- isim: string (nullable = true)
 |-- yas: integer (nullable = true)
 |-- cinsiyet: string (nullable = true)
 |-- meslek: string (nullable = true)
 |-- sehir: string (nullable = true)
 |-- aylik_gelir: double (nullable = true)
 |-- mal_mulk: string (nullable = true)
 |-- sehir_ist: string (nullable = true)
 |-- mal_mulk_split: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- mal_mulk_ilk_eleman: string (nullable = true)



In [14]:
spark.stop()

# Pyspark DF to Disk ("Write")  <a class="anchor" id="11"></a>

In [15]:
#ETL (Extract-Transform-Load) --> Cek ,donustur,yukle***
#kirli veri(Kaynak)--->ETL --->Temiz veri (Hedef)***

In [16]:
import findspark
findspark.init("/Users/resitkadir/spark/spark-2.4.6/")
from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .master("local[4]") \
        .appName("DFToDisk") \
        .config("spark.executor.memory","4g") \
        .config("spark.driver.memory","2g") \
        .getOrCreate()
#datayi session icine alliyoruz
df = spark.read \
        .option("header","True") \
        .option("inferSchema","True") \
        .option("sep",",") \
        .csv("datasets/simple_dirty_data.csv")

df.show(5)

+------+-------+---+--------+-----------+-----------+-----------+---------------+
|sirano|   isim|yas|cinsiyet|     meslek|      sehir|aylik_gelir|       mal_mulk|
+------+-------+---+--------+-----------+-----------+-----------+---------------+
|     1|  Cemal| 35|       E|       Isci|     Ankara|     3500.0|          araba|
|     2| ceyda | 42|       K|      Memur|    Kayseri|     4200.0|       araba|ev|
|     3|  Timur| 30|    null|   Müzüsyen|Istanbul   |     9000.0|araba|ev|yazlık|
|     4| Burcu | 29|       K|Pazarlamacı|     Ankara|     4200.0|          araba|
|     5|Yasemin| 23|       K|Pazarlamaci|      Bursa|     4800.0|          araba|
+------+-------+---+--------+-----------+-----------+-----------+---------------+
only showing top 5 rows



In [17]:
#Data cleaning,Session icine aldikdan sonra temizliyoruz
from pyspark.sql import functions as F
df2 = df \
    .withColumn("isim", F.trim(F.initcap(df.isim))) \
    .withColumn("cinsiyet", F.when(df['cinsiyet'].isNull(), "U").otherwise(df['cinsiyet'])) \
    .withColumn("sehir", F.when(df['sehir'].isNull(), "BİLİNMİYOR").otherwise(F.trim(F.upper(df['sehir']))))

#isimlerin basharflerini buyk yapip trim yapalim,
#cinsiyet yoksa U yaz, gelmezse kendisi kalsin
#sehirde nulla bilinmiyor yazbiliniyorsa trim ve upper uygula 
df2.show(5)

+------+-------+---+--------+-----------+--------+-----------+---------------+
|sirano|   isim|yas|cinsiyet|     meslek|   sehir|aylik_gelir|       mal_mulk|
+------+-------+---+--------+-----------+--------+-----------+---------------+
|     1|  Cemal| 35|       E|       Isci|  ANKARA|     3500.0|          araba|
|     2|  Ceyda| 42|       K|      Memur| KAYSERI|     4200.0|       araba|ev|
|     3|  Timur| 30|       U|   Müzüsyen|ISTANBUL|     9000.0|araba|ev|yazlık|
|     4|  Burcu| 29|       K|Pazarlamacı|  ANKARA|     4200.0|          araba|
|     5|Yasemin| 23|       K|Pazarlamaci|   BURSA|     4800.0|          araba|
+------+-------+---+--------+-----------+--------+-----------+---------------+
only showing top 5 rows



In [18]:
#temizlmis datayi diske yazma
df2.coalesce(1) \
    .write \
    .mode("overwrite") \
    .option("sep",",") \
    .option("header","True") \
    .csv("datasets/simple_dirty_data2.csv")

#parca parca yazar spark
#yaz,modu overwrite olsun,yazarken boslukla ayirsin,baslik var,ve bu path a yazsin

In [19]:
#diske yazdigimiz file okuyalim
df3 = spark.read \
.option("header","True") \
.option("inferSchema","True") \
.option("sep",",") \
.csv("/Users/resitkadir/ApacheSpark/data/simple_dirty_data2.csv")

df3.show(5)

+------+-------+---+--------+-----------+--------+-----------+---------------+
|sirano|   isim|yas|cinsiyet|     meslek|   sehir|aylik_gelir|       mal_mulk|
+------+-------+---+--------+-----------+--------+-----------+---------------+
|     1|  Cemal| 35|       E|       Isci|  ANKARA|     3500.0|          araba|
|     2|  Ceyda| 42|       K|      Memur| KAYSERI|     4200.0|       araba|ev|
|     3|  Timur| 30|       U|   Müzüsyen|ISTANBUL|     9000.0|araba|ev|yazlık|
|     4|  Burcu| 29|       K|Pazarlamacı|  ANKARA|     4200.0|          araba|
|     5|Yasemin| 23|       K|Pazarlamaci|   BURSA|     4800.0|          araba|
+------+-------+---+--------+-----------+--------+-----------+---------------+
only showing top 5 rows



In [20]:
spark.stop()

# MAnual Schema <a class="anchor" id="12"></a>
**RDD ile dataframe arasindaki en buyuk fark schema dir**

**Scheman bir cok katki saglar bunlardan biride optimizasyondur,kataliz optimizer adinda bir optimizor var,schema ile ve high level API ile daha iyi performans gosteriyor**

In [21]:
import findspark
findspark.init("/Users/resitkadir/spark/spark-2.4.6/")
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .master("local[4]") \
        .appName("Csv-Üzeri-SQL") \
        .config("spark.executor.memory","6g") \
        .config("spark.driver.memory","2g") \
        .getOrCreate()


df = spark.read \
    .option("header","True") \
    .option("inferSchema","True") \
    .option("sep",";") \
    .csv("datasets/OnlineRetail.csv")

df.show(3)



+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|1.12.2010 08:26|     2,55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|1.12.2010 08:26|     3,39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|1.12.2010 08:26|     2,75|     17850|United Kingdom|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
only showing top 3 rows



In [22]:
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: string (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



    Örneğin yukarıda UnitPrice'ın  Float olmasına ihtiyaç var

    Spark'ın çıkarımı bize her zaman yetmez ellerimizle kendi şemamızı yapalım .Önce gerekli kütüphaneleri içeri alalım

In [23]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, FloatType

In [24]:
manual_schema = StructType(
[
    StructField("InvoiceNo", StringType(), True),
    StructField("StockCode", StringType(), True),
    StructField("Description", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("InvoiceDate", StringType(), True),
    StructField("UnitPrice", FloatType(), True),
    StructField("CustomerID", IntegerType(), True),
    StructField("Country", StringType(), True)
]
)

In [25]:
# Veriye elle hazırlanan şema ile tekrar okuma
#manuel schemani okuyalim
df2 = spark.read \
    .option("header","True") \
    .schema(manual_schema) \
    .option("sep",";") \
    .csv("datasets/OnlineRetail.csv")
df2.show(3)

+---------+---------+-----------+--------+-----------+---------+----------+-------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+-----------+---------+----------+-------+
|     null|     null|       null|    null|       null|     null|      null|   null|
|     null|     null|       null|    null|       null|     null|      null|   null|
|     null|     null|       null|    null|       null|     null|      null|   null|
+---------+---------+-----------+--------+-----------+---------+----------+-------+
only showing top 3 rows



    UnitPrice'da sıkıntı var. "." yerine "," olduğu için schma string'den float'a dönüştüremiyor. Çözüm bulmak gerekli.
   
### ÇÖZÜM
Çözüm olarak veriyi okurken "," ile "." yı değiştirerek okuyalım ve dataframe'i tekrar , ve . değişmiş

şekilde diske yazalım. Diskten okurken düzeltilmiş csv den okuyalım ve elle hazırlanmış şemamızı kullanalım.

In [26]:
from pyspark.sql import functions as F

df = spark.read \
.option("header","True") \
.option("inferSchema","True") \
.option("sep",";") \
.csv("/Users/resitkadir/ApacheSpark/data/OnlineRetail.csv") \
.withColumn("UnitPrice",F.regexp_replace(F.col("UnitPrice"), ",","."))

#unitprice virgul gordugun yere nokta yaz dedik
df.show(3)

+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|1.12.2010 08:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|1.12.2010 08:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|1.12.2010 08:26|     2.75|     17850|United Kingdom|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
only showing top 3 rows



In [27]:
#Diske yazalim
df \
    .coalesce(1) \
    .write \
    .mode("overwrite") \
    .option("sep",";") \
    .option("header","true") \
    .csv("datasets\OnlineRetail3.csv")


In [28]:
#yazdigimizi tekrar alalim

df_temiz = spark.read \
    .option("header","True") \
    .schema(manual_schema) \
    .option("sep",";") \
    .csv("datasets/OnlineRetail3.csv") \

In [29]:
df_temiz.show(5)

+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|1.12.2010 08:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|1.12.2010 08:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|1.12.2010 08:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|1.12.2010 08:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|1.12.2010 08:26|     3.39|     17850|United Kingdom|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
only showing top 5 rows



In [30]:
df_temiz.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: float (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



In [31]:
spark.stop()

# Date Time Operations<a class="anchor" id="13"></a>

In [32]:
import findspark
findspark.init("/Users/resitkadir/spark/spark-2.4.6/")
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[4]") \
    .appName("Csv-Üzeri-SQL") \
    .config("spark.executor.memory","4g") \
    .config("spark.driver.memory","2g") \
    .getOrCreate()

df = spark.read \
    .option("header","True") \
    .option("inferSchema","True") \
    .option("sep",";") \
    .csv("datasets/OnlineRetail.csv") \
    .select("InvoiceDate").distinct()
#ayni tarihler gelmesinn distinc ile 

df.show(3)

+---------------+
|    InvoiceDate|
+---------------+
|3.12.2010 16:50|
|7.12.2010 12:28|
|8.12.2010 15:02|
+---------------+
only showing top 3 rows



In [33]:
#Formatı anlamak için beş satırdan gün ve ay belli olmuyor daha fazla satır görelim.
df.show(5)

+----------------+
|     InvoiceDate|
+----------------+
| 3.12.2010 16:50|
| 7.12.2010 12:28|
| 8.12.2010 15:02|
|10.12.2010 09:53|
|12.12.2010 13:32|
+----------------+
only showing top 5 rows



Evet şimdi anlaşıldı. Format gün.ay.yıl saat:dakika

**yani dd.MM.yyyy HH:mm**

Datetime da ise varsayılan format yyyy-MM-dd HH:mm:ss

In [34]:
mevcut_format = 'dd.MM.yyyy HH:mm'
#day, month,year,hour,minute

from pyspark.sql import functions as F

df2 = df.withColumn("InvoiceDate", F.trim(F.col("InvoiceDate"))) \
    .withColumn("normal_tarih", F.to_date(F.col("InvoiceDate"), mevcut_format)) \
    .withColumn("standart_ts", F.to_timestamp(F.col("InvoiceDate"), mevcut_format)) \

#Trimledik
#yil ay gun olsun,mevcut format ile nasil format yapmak istedgimizi yazdik
# tarih saaat dilim yapalim
df2.show(4)

+----------------+------------+-------------------+
|     InvoiceDate|normal_tarih|        standart_ts|
+----------------+------------+-------------------+
| 3.12.2010 16:50|  2010-12-03|2010-12-03 16:50:00|
| 7.12.2010 12:28|  2010-12-07|2010-12-07 12:28:00|
| 8.12.2010 15:02|  2010-12-08|2010-12-08 15:02:00|
|10.12.2010 09:53|  2010-12-10|2010-12-10 09:53:00|
+----------------+------------+-------------------+
only showing top 4 rows



In [35]:
#tarih format degistirme
format_tr = "dd/MM/yyyy HH:mm:ss"
format_eng = "MM-dd-yyyy HH:mm:ss"

df3 = df2 \
.withColumn("TSTR", F.date_format(F.col("standart_ts"), format_tr)) \
.withColumn("TSENG", F.date_format(F.col("standart_ts"), format_eng)) \
.withColumn("unix_time", F.unix_timestamp(F.col("standart_ts"))) \
#ustte formatlayip altta degistirebiliyoruz / - degiskligi
df3.show(4)

+----------------+------------+-------------------+-------------------+-------------------+----------+
|     InvoiceDate|normal_tarih|        standart_ts|               TSTR|              TSENG| unix_time|
+----------------+------------+-------------------+-------------------+-------------------+----------+
| 3.12.2010 16:50|  2010-12-03|2010-12-03 16:50:00|03/12/2010 16:50:00|12-03-2010 16:50:00|1291391400|
| 7.12.2010 12:28|  2010-12-07|2010-12-07 12:28:00|07/12/2010 12:28:00|12-07-2010 12:28:00|1291721280|
| 8.12.2010 15:02|  2010-12-08|2010-12-08 15:02:00|08/12/2010 15:02:00|12-08-2010 15:02:00|1291816920|
|10.12.2010 09:53|  2010-12-10|2010-12-10 09:53:00|10/12/2010 09:53:00|12-10-2010 09:53:00|1291971180|
+----------------+------------+-------------------+-------------------+-------------------+----------+
only showing top 4 rows



In [36]:
# Tarih ekleme, tarih farkı, timestamp içinden yılı alma
df4 = df2 \
    .withColumn("bir_yil", F.date_add(F.col("standart_ts"), 365)) \
    .withColumn("yil", F.year(F.col("standart_ts"))) \
    .withColumn("fark", F.datediff(F.col("bir_yil"), F.col("standart_ts")))

#365 gun eklenecek standart ts e 
#yil i alalim sadece 
#farki alalim biryil-standart_ts den
df4.show(4)

+----------------+------------+-------------------+----------+----+----+
|     InvoiceDate|normal_tarih|        standart_ts|   bir_yil| yil|fark|
+----------------+------------+-------------------+----------+----+----+
| 3.12.2010 16:50|  2010-12-03|2010-12-03 16:50:00|2011-12-03|2010| 365|
| 7.12.2010 12:28|  2010-12-07|2010-12-07 12:28:00|2011-12-07|2010| 365|
| 8.12.2010 15:02|  2010-12-08|2010-12-08 15:02:00|2011-12-08|2010| 365|
|10.12.2010 09:53|  2010-12-10|2010-12-10 09:53:00|2011-12-10|2010| 365|
+----------------+------------+-------------------+----------+----+----+
only showing top 4 rows



In [37]:
spark.stop()

# Pivot table<a class="anchor" id="14"></a>

In [38]:
import findspark
import findspark
findspark.init("/Users/resitkadir/spark/spark-2.4.6/")
# Create SparkContext
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

# Aşağıdaki ayarları bilgisayarınızın belleğine göre değiştirebilirsiniz
spark = SparkSession.builder \
    .master("local[4]") \
    .appName("Dataset-Olusturmak") \
    .config("spark.executor.memory","4g") \
    .config("spark.driver.memory","2g") \
    .getOrCreate()

# sparkContext'i kısaltmada tut
sc = spark.sparkContext


my_tuple = ((1,0),(1,1),(0,0),(0,1),(1,0),(1,1),(0,0),(0,1),(1,1),(1,1),(0,0))
                    
matris_df = spark.createDataFrame(my_tuple). \
selectExpr("_1 as label","_2 as prediction")
matris_df.show()

+-----+----------+
|label|prediction|
+-----+----------+
|    1|         0|
|    1|         1|
|    0|         0|
|    0|         1|
|    1|         0|
|    1|         1|
|    0|         0|
|    0|         1|
|    1|         1|
|    1|         1|
|    0|         0|
+-----+----------+



In [39]:
#Pivota geçmeden önce gruplanmış haline bakalım
matris_df.groupBy("label","prediction").count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|         0|    2|
|    1|         1|    4|
|    0|         1|    2|
|    0|         0|    3|
+-----+----------+-----+



In [40]:
# 0 yukarıda matris
matris_df.groupBy("label"). \
        pivot("prediction", [0,1]). \
        count(). \
        orderBy("label"). \
        show()

+-----+---+---+
|label|  0|  1|
+-----+---+---+
|    0|  3|  2|
|    1|  2|  4|
+-----+---+---+



In [41]:
# 1 yukarıda matris
from pyspark.sql.functions import desc
matris_df.groupBy("label"). \
pivot("prediction", [1,0]). \
count(). \
orderBy(matris_df.label.desc()). \
show()

+-----+---+---+
|label|  1|  0|
+-----+---+---+
|    1|  4|  2|
|    0|  2|  3|
+-----+---+---+



In [42]:
spark.stop()

# To write KAFKA <a class="anchor" id="15"></a>

In [43]:
import findspark
findspark.init("/Users/resitkadir/spark/spark-2.4.6/")

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell'


from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder \
        .master("local[4]") \
        .appName("WriteToKafka") \
        .getOrCreate()

df = spark.read.format("csv") \
        .option("header", True) \
        .load("datasets/Advertising.csv")

df.show(2)

+---+-----+-----+---------+-----+
|_c0|   TV|Radio|Newspaper|Sales|
+---+-----+-----+---------+-----+
|  1|230.1| 37.8|     69.2| 22.1|
|  2| 44.5| 39.3|     45.1| 10.4|
+---+-----+-----+---------+-----+
only showing top 2 rows



In [44]:
df2 = df.withColumn("key", col('_c0')).drop('_c0')
df2.show(2)

+-----+-----+---------+-----+---+
|   TV|Radio|Newspaper|Sales|key|
+-----+-----+---------+-----+---+
|230.1| 37.8|     69.2| 22.1|  1|
| 44.5| 39.3|     45.1| 10.4|  2|
+-----+-----+---------+-----+---+
only showing top 2 rows



In [45]:
df3 = df2.select('key',
                 concat(
                 col('TV'), lit(","),
                 col('Radio'), lit(","),
                col('Newspaper'), lit(","),
                col('Sales')
                 ).alias("value")

)

df3.show(2)

+---+--------------------+
|key|               value|
+---+--------------------+
|  1|230.1,37.8,69.2,22.1|
|  2| 44.5,39.3,45.1,10.4|
+---+--------------------+
only showing top 2 rows



In [47]:
df3 \
    .write \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "test") \
    .save()

AnalysisException: 'Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;'

In [48]:
#https://spark.apache.org/docs/2.4.0/structured-streaming-programming-guide.html

In [None]:
#sonuclar icin kafka consumer i calistirmamiz lazim
#sonuc calismadi ---(92)


In [None]:
spark.stop()