#### Kullanılacak modüller import edilir.

In [1]:
from pyspark.sql import SparkSession, functions, Window
import pyspark

#### Spark ortamına erişim için session yaratılır.

In [2]:
spark = SparkSession.\
    builder.\
    master('local[*]').\
    appName('firstapp').\
    getOrCreate()

#### Yüklü PySpark versiyonu kontrol edilebilir.

In [3]:
pyspark.__version__

'3.1.2'

#### worldcities.csv dosyası proje klasöründen dataframe'e aktarılır.

In [4]:
df = spark.read.csv('worldcities.csv', header=True)

#### df 'te yer alan veri setine Spark içinde yer alan HIVE üzerinden sorgularmış gibi direkt sql sorgusu yapmamıza olanak sağlayan bir geçici view yaratılır. Bu view session bazlıdır.

In [5]:
df.createOrReplaceTempView("worldcities")

#### catalog.listTables() fonksiyonu ile hangi tablo ve viewların mevcut HIVE altında göründüğü öğrenilebilir. 

In [6]:
spark.catalog.listTables()

[Table(name='worldcities', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

#### worldcities veri setinin içeriği aşağıdaki şekildedir.

In [7]:
df.select("*").show(10)

+-----------+-----------+--------+--------+------------+----+----+----------------+-------+----------+----------+
|       city| city_ascii|     lat|     lng|     country|iso2|iso3|      admin_name|capital|population|        id|
+-----------+-----------+--------+--------+------------+----+----+----------------+-------+----------+----------+
|      Tokyo|      Tokyo| 35.6897|139.6922|       Japan|  JP| JPN|           Tōkyō|primary|  37977000|1392685764|
|    Jakarta|    Jakarta| -6.2146|106.8451|   Indonesia|  ID| IDN|         Jakarta|primary|  34540000|1360771077|
|      Delhi|      Delhi| 28.6600| 77.2300|       India|  IN| IND|           Delhi|  admin|  29617000|1356872604|
|     Mumbai|     Mumbai| 18.9667| 72.8333|       India|  IN| IND|     Mahārāshtra|  admin|  23355000|1356226629|
|     Manila|     Manila| 14.6000|120.9833| Philippines|  PH| PHL|          Manila|primary|  23088000|1608618140|
|   Shanghai|   Shanghai| 31.1667|121.4667|       China|  CN| CHN|        Shanghai|  adm

## Örnek Senaryo 1

#### Temp View üzerinde SQL kullanarak belirli bir paralel ile meridyen aralığındaki şehirlerin bilgisi aşağıdaki şekilde çekilir.

In [8]:
spark.sql("SELECT city, lat, lng, country \
           FROM worldcities \
           WHERE (lat BETWEEN 37.0000 AND 38.0000) \
               AND (lng BETWEEN 40.0000 AND 41.0000) \
           ORDER BY country,city \
           LIMIT 10").show()

+----------+-------+-------+-------+
|      city|    lat|    lng|country|
+----------+-------+-------+-------+
|    ‘Āmūdā|37.1042|40.9300|  Syria|
|     Derik|37.3644|40.2689| Turkey|
|Diyarbakır|37.9108|40.2367| Turkey|
| Kızıltepe|37.1939|40.5861| Turkey|
|    Mardin|37.3167|40.7378| Turkey|
|     Çınar|37.7256|40.4147| Turkey|
+----------+-------+-------+-------+



#### SQL ile yukarıda yapılan sorgunun Spark SQL fonksiyonlarıyla kullanımı aşağıdaki şekildedir.

In [9]:
df\
.select("city","lat","lng","country")\
.where(df.lat.between("37","38") & df.lng.between("40","41"))\
.orderBy("country","city")\
.limit(10)\
.show()

+----------+-------+-------+-------+
|      city|    lat|    lng|country|
+----------+-------+-------+-------+
|    ‘Āmūdā|37.1042|40.9300|  Syria|
|     Derik|37.3644|40.2689| Turkey|
|Diyarbakır|37.9108|40.2367| Turkey|
| Kızıltepe|37.1939|40.5861| Turkey|
|    Mardin|37.3167|40.7378| Turkey|
|     Çınar|37.7256|40.4147| Turkey|
+----------+-------+-------+-------+



#### Her ülkeye ait kaç şehir olduğu aşağıdaki şekilde sorgulanabilir.

In [11]:
(df.groupBy("country").count()).orderBy("count",ascending=False).show(10)

+--------------+-----+
|       country|count|
+--------------+-----+
| United States| 7824|
|        Brazil| 3371|
|       Germany| 2624|
|         Italy| 2124|
|        France| 2017|
|United Kingdom| 1814|
|   Philippines| 1533|
|         China| 1498|
|        Russia| 1487|
|         Spain| 1035|
+--------------+-----+
only showing top 10 rows



## Örnek Senaryo 2

#### Aggregation ve Windowing fonksiyon yeteneklerini sınamak için SQL ile yapılan aşağıdaki sorguda; ülkelerin 15M'den fazla nüfusa sahip şehirlerinin bazı türetilmiş verileri mevcut. 
    * RN kolonu ile ülkelerin kendi içinde nüfusu az olandan çok olana sıralanması sağlanır. (ROW NUMBER)
    * SM kolonu ile ülkelerin toplam nüfusu hesaplanır. Her bir birim 1 Milyon ölçeğindedir. (SUM)
    * CNT kolonu ile ülkelerin kayıtlı şehir sayısı hesaplanır. (COUNT)
    * DR kolonu ile toplam nüfusun şehir sayısına oranı hesaplanır ve şehirlerin ülke bazındaki yoğunlukları bulunur. (Density Ratio)

In [12]:
spark.sql("SELECT country,city,population, \
    ROW_NUMBER() OVER (PARTITION BY country ORDER BY city ASC) as RN, \
    ROUND(SUM(population/1000000) OVER(PARTITION BY country),3) as SM, \
    COUNT(city) OVER(PARTITION BY country) as CNT, \
    ROUND((SUM(population/1000000) OVER(PARTITION BY country))/(COUNT(city) OVER(PARTITION BY country)),3) AS DR \
    FROM worldcities WHERE population > 15000000").show(30)

+-------------+------------+----------+---+------+---+------+
|      country|        city|population| RN|    SM|CNT|    DR|
+-------------+------------+----------+---+------+---+------+
|       Russia|      Moscow|  17125000|  1|17.125|  1|17.125|
|  Philippines|      Manila|  23088000|  1|23.088|  1|23.088|
|       Turkey|    Istanbul|  15154000|  1|15.154|  1|15.154|
|    Argentina|Buenos Aires|  16157000|  1|16.157|  1|16.157|
|        China|     Beijing|  19433000|  1|78.384|  4|19.596|
|        China|   Guangzhou|  20902000|  2|78.384|  4|19.596|
|        China|    Shanghai|  22120000|  3|78.384|  4|19.596|
|        China|    Shenzhen|  15929000|  4|78.384|  4|19.596|
|        India|       Delhi|  29617000|  1|70.532|  3|23.511|
|        India|     Kolkāta|  17560000|  2|70.532|  3|23.511|
|        India|      Mumbai|  23355000|  3|70.532|  3|23.511|
|United States|    New York|  18713220|  1|18.713|  1|18.713|
|      Nigeria|       Lagos|  15279000|  1|15.279|  1|15.279|
|   Bang

#### Yukarıda yapılan örnekteki RN kısmı Spark'taki windowing fonksiyonalitesi kullanılarak aşağıdaki şekilde gerçekleştirilir. Dikkat edilirse önce Where ifadesi kullanılarak veri küçültülüp ardından oluşan set üzerinde de işlem yapmak mümkündür ve ihtiyaç dahilinde daha performanslı bir kullanım olacaktır.

In [13]:
df\
.where(df.population > 15000000)\
.select("country","city","population", 
          functions.row_number().over(Window.partitionBy("country").orderBy("city")).alias("RN"))\
.show(50)

+-------------+------------+----------+---+
|      country|        city|population| RN|
+-------------+------------+----------+---+
|       Russia|      Moscow|  17125000|  1|
|  Philippines|      Manila|  23088000|  1|
|       Turkey|    Istanbul|  15154000|  1|
|    Argentina|Buenos Aires|  16157000|  1|
|        China|     Beijing|  19433000|  1|
|        China|   Guangzhou|  20902000|  2|
|        China|    Shanghai|  22120000|  3|
|        China|    Shenzhen|  15929000|  4|
|        India|       Delhi|  29617000|  1|
|        India|     Kolkāta|  17560000|  2|
|        India|      Mumbai|  23355000|  3|
|United States|    New York|  18713220|  1|
|      Nigeria|       Lagos|  15279000|  1|
|   Bangladesh|       Dhaka|  15443000|  1|
|     Thailand|     Bangkok|  17066000|  1|
|       Mexico| Mexico City|  20996000|  1|
|    Indonesia|     Jakarta|  34540000|  1|
| Korea, South|       Seoul|  21794000|  1|
|       Brazil|   São Paulo|  22046000|  1|
|        Japan|       Tokyo|  37

#### SQL ile gerçekleştirdiğimiz Örnek Senaryo 2'nin Spark fonksiyonalitesi kullanılarak gerçekleştirilmiş hali aşağıdaki şekildedir. 

In [14]:
df\
.where(df.population > 15000000)\
.select("country","city","population",
          functions.row_number().over(Window.partitionBy("country").orderBy("city")).alias("RN"),
          functions.round(functions.sum(df.population/1000000).over(Window.partitionBy("country")),3).alias("SM"),
          functions.count("city").over(Window.partitionBy("country")).alias("CNT"),
          functions.round(((functions.sum(df.population/1000000).over(Window.partitionBy("country")))/
              (functions.count("city").over(Window.partitionBy("country")))),3).alias("DR"))\
.show(50)
#.orderBy("population", ascending=False)\

+-------------+------------+----------+---+------+---+------+
|      country|        city|population| RN|    SM|CNT|    DR|
+-------------+------------+----------+---+------+---+------+
|       Russia|      Moscow|  17125000|  1|17.125|  1|17.125|
|  Philippines|      Manila|  23088000|  1|23.088|  1|23.088|
|       Turkey|    Istanbul|  15154000|  1|15.154|  1|15.154|
|    Argentina|Buenos Aires|  16157000|  1|16.157|  1|16.157|
|        China|     Beijing|  19433000|  1|78.384|  4|19.596|
|        China|   Guangzhou|  20902000|  2|78.384|  4|19.596|
|        China|    Shanghai|  22120000|  3|78.384|  4|19.596|
|        China|    Shenzhen|  15929000|  4|78.384|  4|19.596|
|        India|       Delhi|  29617000|  1|70.532|  3|23.511|
|        India|     Kolkāta|  17560000|  2|70.532|  3|23.511|
|        India|      Mumbai|  23355000|  3|70.532|  3|23.511|
|United States|    New York|  18713220|  1|18.713|  1|18.713|
|      Nigeria|       Lagos|  15279000|  1|15.279|  1|15.279|
|   Bang