## THY 1.THY_DATA.txt veri setini kullanarak aşağıdaki görevleri yapınız.

# GÖREV 1: Veriyi Hazırlama

### Adım 1: Başlangıç

- Veriyi HDFS üzerinde datasets klasörü altına taşıyınız.
- Gerekli Kütüphaneleri yükleyiniz.
- Spark session oluşturunuz. Apache Spark'ı YARN modunda kullanınız.

**Not:** Veriyi `inferSchema` **True** olacak sekilde okutunuz, ilgili csv dosyasına göre `delimiter` seçiniz.

## 1-) Veri setini Hadoop HDFS'e yükleyiniz.

! hdfs dfs -put ~/datasets/thy_data.txt /user/train/datasets

In [1]:
! hdfs dfs -ls /user/train/datasets

Found 6 items
-rw-r--r--   1 train supergroup       4556 2020-09-23 20:56 /user/train/datasets/Advertising.csv
drwxr-xr-x   - train supergroup          0 2020-11-19 21:02 /user/train/datasets/churn-telecom
-rw-r--r--   1 train supergroup    2609524 2022-09-09 20:52 /user/train/datasets/dirty_store_transactions.csv
-rw-r--r--   1 train supergroup    5099520 2022-08-16 00:25 /user/train/datasets/flo100k_data.csv
drwxr-xr-x   - train supergroup          0 2020-11-21 11:16 /user/train/datasets/retail_db
-rw-r--r--   1 train supergroup   32106333 2022-09-09 22:43 /user/train/datasets/thy_data.txt


# GÖREV 2: İlk Bakış
Veri setinin

- İlk 5 gözlemini 
- Toplam gözlem sayısını
- Toplam değişken sayısını
- Değişken tiplerini inceleyiniz.

In [2]:
import findspark
findspark.init("/opt/manual/spark")

from pyspark.sql import SparkSession, functions as F

In [3]:
spark = (
    SparkSession.builder
    .appName("THY Analysis")
    .master("yarn")
    .enableHiveSupport()
    .getOrCreate())

In [5]:
df = spark.read.format("csv") \
.option("header", True) \
.option("inferSchema", True) \
.option("sep", ",") \
.load("/user/train/datasets/thy_data.txt")

In [6]:
df.limit(5).toPandas()

Unnamed: 0,SEASON,ORIGIN,DESTINATION,PSGR_COUNT
0,SUMMER,227,YYZ,6
1,SUMMER,224,222,3
2,SUMMER,226,JF8,1
3,SUMMER,227,3RG,11
4,SUMMER,227,6RV,245


In [7]:
# Toplam gözlem sayısı
df.count()

1719202

In [8]:
# Toplam değişken sayısı
len(df.columns)

4

In [9]:
df.printSchema()

root
 |-- SEASON: string (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- DESTINATION: string (nullable = true)
 |-- PSGR_COUNT: integer (nullable = true)



# GÖREV 3: Veri Analizi
- Unique Değişken Kontrolü
- Tanımlayıcı İstatistikler 
- Kalkış noktasına göre yolcu sayısı
- Yolcu Sayısının Mevsimlere Göre İncelenmesi

## Adım1: Origin değişkenindeki eşsiz sınıf sayısını bulunuz.

In [10]:
df.select("ORIGIN").distinct().count()

4210

## Adım 2: Mevsimlere göre tanımlayıcı istatistiklerini inceleyiniz.

In [11]:
from pyspark.sql.functions import sum, col, desc, mean, count
df.groupBy("SEASON") \
  .agg(sum("PSGR_COUNT").alias('TOTAL_PSGR_COUNT'), \
       mean("PSGR_COUNT").alias("AVG_PSGR_COUNT"), \
       count("PSGR_COUNT").alias("COUNT_OF_FLIGHTS")) \
  .sort(desc("TOTAL_PSGR_COUNT")).show()

+------+----------------+-----------------+----------------+
|SEASON|TOTAL_PSGR_COUNT|   AVG_PSGR_COUNT|COUNT_OF_FLIGHTS|
+------+----------------+-----------------+----------------+
|SUMMER|       521002675|567.8806552066541|          917451|
|WINTER|       336882094|420.1829420855103|          801751|
+------+----------------+-----------------+----------------+



## Adım 3: Yaz sezonunda en çok yolcunun taşındığı ilk 5 kalkış noktasını gözlemleyiniz. 

In [12]:
df.filter("SEASON IN ('SUMMER')").orderBy(F.desc("PSGR_COUNT")).limit(5).toPandas()

Unnamed: 0,SEASON,ORIGIN,DESTINATION,PSGR_COUNT
0,SUMMER,6PE,H8G,980759
1,SUMMER,H8G,6PE,978968
2,SUMMER,DEL,4OM,908383
3,SUMMER,MEL,3YD,907382
4,SUMMER,3YD,MEL,904641


## Adım 4: Veri setini okuyarak mevsim ve kalkış noktasına göre yolcu sayısını azalan olarak sıralayınız.

In [14]:
ordered_df_thy = df.groupBy("SEASON","ORIGIN").agg(F.sum("PSGR_COUNT").alias("PSGR_COUNT")) \
.orderBy(F.desc("PSGR_COUNT"))

In [15]:
ordered_df_thy.limit(5).toPandas()

Unnamed: 0,SEASON,ORIGIN,PSGR_COUNT
0,SUMMER,IC7,11177363
1,SUMMER,LHR,9696224
2,SUMMER,H8G,8432456
3,WINTER,IC7,7803139
4,SUMMER,DEL,7705173


# GÖREV 4: Veri Kaynaklarına Yazdırma

#### Hive
- Elde ettiğiniz veri setini hive veri tabanına orc formatında yazınız.

#### Veri Okuma

- Hive'a gönderdiğiniz veri setini okuyunuz.

#### Postgresql
- Elde ettiğiniz veri setini Postgresql'e traindb.public.ordered_df_thy tablosuna yazsın.

#### Veri Okuma

- Shell üzerinden Postgresql'e gönderdiğiniz veriyi okuyunuz.



## Adım1: Elde ettiğiniz veri setini hive veri tabanına orc formatında yazınız.

In [16]:
ordered_df_thy.write.format("orc").mode("overwrite").saveAsTable("test1.thy_market_grpby_season_org")

## Adım 2: Hive'a gönderdiğiniz veri setini okuyunuz.

In [17]:
spark.sql("select * from test1.thy_market_grpby_season_org;").limit(5).toPandas()

Unnamed: 0,SEASON,ORIGIN,PSGR_COUNT
0,WINTER,3VI,1
1,WINTER,4YD,1
2,WINTER,34R,1
3,SUMMER,47W,1
4,SUMMER,GO3,1


## Adım 3: Elde ettiğiniz veri setini Postgresql'e traindb.public.ordered_df_thy tablosuna yazsın.

In [19]:
jdbcUrl = "jdbc:postgresql://localhost/traindb?user=train&password=Ankara06"

In [20]:
ordered_df_thy.write \
.jdbc(url=jdbcUrl,
              table="ordered_df_thy", 
              mode="overwrite", 
              properties={"driver": 'org.postgresql.Driver'})

## Adım 4: Shell üzerinden Postgresql'e gönderdiğiniz veriyi okuyunuz.