# İleri Veri İşleme Modül Projesi

### Milli Teknoloji Hamlesi kapsamında düzenlenen Yapay Zeka Uzmanlık Programı dahilinde gerçekleştirilen uygulama ödevidir.

**Hazırlayan : Eda Nur ARSLAN**

### İçerik:
🔹 **Assessment 1:**  
Ceketlerin 2023 Haziran-Ağustos dönemindeki satışları bölgelere göre analiz edilerek, her bölgedeki toplam satış miktarı ve cirosu hesaplanmıştır.

🔹 **Assessment 2:**  
Her perakendecinin, müşteri bölgesine göre yaptığı ceket satışları analiz edilerek, en yüksek ciro elde ettiği bölge belirlenmiştir.

## Kullanılan Veri Tabloları

| Tablo Adı      | Açıklama |
|----------------|----------|
| `FactSale`     | Gerçekleşen tüm satış işlemlerini içerir  |
| `DimProduct`   | Ürün bilgilerini içerir  |
| `DimCustomer`  | Müşteri bilgileri (bulunduğu şehir dahil) |
| `DimRetailer`  | Perakendeci bilgileri |
| `DimRegion`    | Şehirleri bölgelere bağlar |
| `FactPurchase`    | Ürün satın alma işlemler |
| `DimSupplier`    | Tedarikçi bilgileri |
| `DimDate`    | Tarih boyut tablosu (gün/ay/yıl) |

## 1) Gerekli kütüphaneleri import etme

In [1]:
from pyspark.sql import SparkSession, functions as F

## 2) Parquet Veri Setlerinin Okunması ve Spark SQL Görünümlerinin Oluşturulması

---

In [2]:
spark = SparkSession.builder.appName("PySparkExample").getOrCreate()

df_pur = spark.read.parquet("/content/drive/MyDrive/yz-ileri veri işleme/data/purchase")
df_sal = spark.read.parquet("/content/drive/MyDrive/yz-ileri veri işleme/data/sale")
df_cus = spark.read.parquet("/content/drive/MyDrive/yz-ileri veri işleme/data/customer")
df_ret = spark.read.parquet("/content/drive/MyDrive/yz-ileri veri işleme/data/retailer")
df_pro = spark.read.parquet("/content/drive/MyDrive/yz-ileri veri işleme/data/product")
df_sup = spark.read.parquet("/content/drive/MyDrive/yz-ileri veri işleme/data/supplier")
df_reg = spark.read.parquet("/content/drive/MyDrive/yz-ileri veri işleme/data/region")
df_date = spark.read.parquet("/content/drive/MyDrive/yz-ileri veri işleme/data/date")

df_cus.createOrReplaceTempView("DimCustomer")
df_pur.createOrReplaceTempView("FactPurchase")
df_sal.createOrReplaceTempView("FactSale")
df_ret.createOrReplaceTempView("DimRetailer")
df_pro.createOrReplaceTempView("DimProduct")
df_sup.createOrReplaceTempView("DimSupplier")
df_reg.createOrReplaceTempView("DimRegion")
df_date.createOrReplaceTempView("DimDate")

## 3) Örnek SQL Sorguları

In [3]:
spark.sql("SELECT customer_id, name, surname, birth_date FROM DimCustomer LIMIT 5").show()

+-----------+-------+--------+----------+
|customer_id|   name| surname|birth_date|
+-----------+-------+--------+----------+
|          1| Jazmin|  Burril|1958-09-22|
|          2| Dalila|   Faers|2000-11-08|
|          3|Wayland|Walework|1976-03-08|
|          4|Amberly|  Haquin|1948-10-08|
|          5|Garrett|   Frear|1957-09-25|
+-----------+-------+--------+----------+



*   Amaç, müşteri verilerini örneklemek ve yapısını hızlıca incelemektir.

---

In [4]:
df_cus.select("customer_id", "name", "surname", "birth_date").show(5)

+-----------+-------+--------+----------+
|customer_id|   name| surname|birth_date|
+-----------+-------+--------+----------+
|          1| Jazmin|  Burril|1958-09-22|
|          2| Dalila|   Faers|2000-11-08|
|          3|Wayland|Walework|1976-03-08|
|          4|Amberly|  Haquin|1948-10-08|
|          5|Garrett|   Frear|1957-09-25|
+-----------+-------+--------+----------+
only showing top 5 rows



*   Amaç, veri çerçevesindeki müşteri bilgilerini hızlıca gözlemlemektir.

---

In [5]:
spark.sql("""
SELECT
    customer_id
    ,name
    ,surname
    ,YEAR(CURRENT_DATE()) - YEAR(birth_date) AS age
FROM DimCustomer
LIMIT 5
""").show()

+-----------+-------+--------+---+
|customer_id|   name| surname|age|
+-----------+-------+--------+---+
|          1| Jazmin|  Burril| 67|
|          2| Dalila|   Faers| 25|
|          3|Wayland|Walework| 49|
|          4|Amberly|  Haquin| 77|
|          5|Garrett|   Frear| 68|
+-----------+-------+--------+---+



*   Amaç, müşteri yaş bilgilerini dinamik olarak hesaplayıp örneklemektir.

---

In [6]:
(
    df_cus.withColumn("age", F.year(F.current_date()) - F.year("birth_date"))
    .select("customer_id", "name", "surname", "age")
    .show(5)
)

+-----------+-------+--------+---+
|customer_id|   name| surname|age|
+-----------+-------+--------+---+
|          1| Jazmin|  Burril| 67|
|          2| Dalila|   Faers| 25|
|          3|Wayland|Walework| 49|
|          4|Amberly|  Haquin| 77|
|          5|Garrett|   Frear| 68|
+-----------+-------+--------+---+
only showing top 5 rows



*    "df_cus" DataFrame’ine müşterilerin yaşını (age) hesaplayan yeni bir sütun ekler. Amaç, müşteri veri çerçevesi üzerinden dinamik yaş hesaplaması yaparak örnek verileri incelemektir.

---

In [7]:
spark.sql("""
SELECT
    name
    ,surname
    ,age
FROM
(
    SELECT
        customer_id
        ,name
        ,surname
        ,YEAR(CURRENT_DATE()) - YEAR(birth_date) AS age
    FROM DimCustomer
)
WHERE age >= 30
LIMIT 5
""").show()

+-------+--------+---+
|   name| surname|age|
+-------+--------+---+
| Jazmin|  Burril| 67|
|Wayland|Walework| 49|
|Amberly|  Haquin| 77|
|Garrett|   Frear| 68|
|  Horst|   Isted| 50|
+-------+--------+---+





*   Amaç, yaş filtresi kullanarak belirli yaş grubundaki müşterileri listelemektir.

---

In [8]:
(
    df_cus.withColumn("age", F.year(F.current_date()) - F.year("birth_date"))
    .select("name", "surname", "age")
    .filter(F.col("age") >= 30)
    .show(5)
)

+-------+--------+---+
|   name| surname|age|
+-------+--------+---+
| Jazmin|  Burril| 67|
|Wayland|Walework| 49|
|Amberly|  Haquin| 77|
|Garrett|   Frear| 68|
|  Horst|   Isted| 50|
+-------+--------+---+
only showing top 5 rows



* "df_cus" DataFrame’ine dinamik olarak yaş (age) sütunu ekler. Amaç,
 belirli bir yaş grubundaki müşteri verilerini analiz etmektir.

---

JOIN İŞLEMLERİ : İki veya daha fazla tabloyu (veya DataFrame'i) ortak bir sütun üzerinden birleştirerek ilişkili verileri tek bir tablo gibi analiz etmeyi sağlar.

Temel JOIN türleri:

| JOIN Türü     | Açıklama |
|---------------|----------|
| **INNER JOIN** | Eşleşen kayıtları getirir |
| **LEFT JOIN**  | Sol tablodaki tüm kayıtları, sağdan eşleşenlerle birlikte getirir |
| **RIGHT JOIN** | Sağ tablodaki tüm kayıtları, soldan eşleşenlerle birlikte getirir |
| **FULL JOIN**  | Her iki tablodaki tüm kayıtları getirir, eşleşmeyenlere NULL yazar |                |
| **LEFT OUTER JOIN** (ya da `LEFT JOIN`)  | Sol tablodaki tüm kayıtları döndürür; sağda eşleşmeyenler için `NULL` koyar. |
| **RIGHT OUTER JOIN** (ya da `RIGHT JOIN`) | Sağ tablodaki tüm kayıtları döndürür; solda eşleşmeyenler için `NULL` koyar. |
| **FULL OUTER JOIN** (ya da `OUTER JOIN`)  | Her iki tablodaki tüm kayıtları döndürür; eşleşmeyen yerler `NULL` olur.     |
| **CROSS JOIN**     | İki tablonun tüm kayıtlarını çarpar (kartaizyen çarpım); filtre olmadan tüm kombinasyonlar döner. Çok büyük sonuçlar üretebilir. |

In [9]:
spark.sql("""
SELECT *
FROM FactSale as fact
INNER JOIN DimProduct as dim
ON fact.product_id = dim.product_id
""").show()

+--------+----------+-----------+-----------+--------+---------+----------+----------+-------------+------------+------+------+---------+----------+
|order_id|product_id|customer_id|retailer_id|quantity|total_amt|      date|product_id| product_code|product_type|colour|  size| material|unit_price|
+--------+----------+-----------+-----------+--------+---------+----------+----------+-------------+------------+------+------+---------+----------+
|       1|       241|        551|          2|       1|       32|2023-08-30|       241| GJKT001BLD-S|      Jacket| Black| Small|    Denim|        32|
|       1|       139|        551|          2|       1|       23|2023-08-30|       139|    GS002WR-L|       Shirt| White| Large|    Rayon|        23|
|       1|        36|        551|          2|       1|       20|2023-08-30|        36|  GTS001BC-XL|      Tshirt|  Blue|XLarge|   Cotton|        20|
|       1|       319|        551|          2|       1|       41|2023-08-30|       319| BTRS001WCR-L|    Tr

---

In [10]:
spark.sql("""
SELECT *
FROM FactSale as fact
LEFT JOIN DimProduct as dim
ON fact.product_id = dim.product_id
""").show()

+--------+----------+-----------+-----------+--------+---------+----------+----------+-------------+------------+------+------+---------+----------+
|order_id|product_id|customer_id|retailer_id|quantity|total_amt|      date|product_id| product_code|product_type|colour|  size| material|unit_price|
+--------+----------+-----------+-----------+--------+---------+----------+----------+-------------+------------+------+------+---------+----------+
|       1|       241|        551|          2|       1|       32|2023-08-30|       241| GJKT001BLD-S|      Jacket| Black| Small|    Denim|        32|
|       1|       139|        551|          2|       1|       23|2023-08-30|       139|    GS002WR-L|       Shirt| White| Large|    Rayon|        23|
|       1|        36|        551|          2|       1|       20|2023-08-30|        36|  GTS001BC-XL|      Tshirt|  Blue|XLarge|   Cotton|        20|
|       1|       319|        551|          2|       1|       41|2023-08-30|       319| BTRS001WCR-L|    Tr

---

In [11]:
spark.sql("""
SELECT *
FROM FactSale as fact
RIGHT JOIN DimProduct as dim
ON fact.product_id = dim.product_id
""").show()

+--------+----------+-----------+-----------+--------+---------+----------+----------+------------+------------+------+-----+--------+----------+
|order_id|product_id|customer_id|retailer_id|quantity|total_amt|      date|product_id|product_code|product_type|colour| size|material|unit_price|
+--------+----------+-----------+-----------+--------+---------+----------+----------+------------+------------+------+-----+--------+----------+
|    4858|         1|        548|          1|       3|       75|2023-10-20|         1| BTS001BLC-S|      Tshirt| Black|Small|  Cotton|        25|
|    4541|         1|        931|          2|       1|       25|2023-01-26|         1| BTS001BLC-S|      Tshirt| Black|Small|  Cotton|        25|
|    4520|         1|        524|          2|       2|       50|2023-06-17|         1| BTS001BLC-S|      Tshirt| Black|Small|  Cotton|        25|
|    4423|         1|        435|          2|       1|       25|2023-06-05|         1| BTS001BLC-S|      Tshirt| Black|Small

---

In [12]:
spark.sql("""
SELECT *
FROM FactSale as fact
LEFT OUTER JOIN DimProduct as dim
ON fact.product_id = dim.product_id
""").show()

+--------+----------+-----------+-----------+--------+---------+----------+----------+-------------+------------+------+------+---------+----------+
|order_id|product_id|customer_id|retailer_id|quantity|total_amt|      date|product_id| product_code|product_type|colour|  size| material|unit_price|
+--------+----------+-----------+-----------+--------+---------+----------+----------+-------------+------------+------+------+---------+----------+
|       1|       241|        551|          2|       1|       32|2023-08-30|       241| GJKT001BLD-S|      Jacket| Black| Small|    Denim|        32|
|       1|       139|        551|          2|       1|       23|2023-08-30|       139|    GS002WR-L|       Shirt| White| Large|    Rayon|        23|
|       1|        36|        551|          2|       1|       20|2023-08-30|        36|  GTS001BC-XL|      Tshirt|  Blue|XLarge|   Cotton|        20|
|       1|       319|        551|          2|       1|       41|2023-08-30|       319| BTRS001WCR-L|    Tr

---

In [13]:
spark.sql("""
SELECT *
FROM FactSale as fact
CROSS JOIN DimProduct as dim
ON fact.product_id = dim.product_id
""").show()

+--------+----------+-----------+-----------+--------+---------+----------+----------+-------------+------------+------+------+---------+----------+
|order_id|product_id|customer_id|retailer_id|quantity|total_amt|      date|product_id| product_code|product_type|colour|  size| material|unit_price|
+--------+----------+-----------+-----------+--------+---------+----------+----------+-------------+------------+------+------+---------+----------+
|       1|       241|        551|          2|       1|       32|2023-08-30|       241| GJKT001BLD-S|      Jacket| Black| Small|    Denim|        32|
|       1|       139|        551|          2|       1|       23|2023-08-30|       139|    GS002WR-L|       Shirt| White| Large|    Rayon|        23|
|       1|        36|        551|          2|       1|       20|2023-08-30|        36|  GTS001BC-XL|      Tshirt|  Blue|XLarge|   Cotton|        20|
|       1|       319|        551|          2|       1|       41|2023-08-30|       319| BTRS001WCR-L|    Tr

---

In [14]:
spark.sql("""
SELECT
    region_name
    ,AVG(YEAR(CURRENT_DATE()) - YEAR(birth_date)) AS age
FROM DimCustomer cus
INNER JOIN DimRegion reg
ON cus.city_id = reg.city_id
GROUP BY region_name
ORDER BY age DESC
""").show()

+-----------------+------------------+
|      region_name|               age|
+-----------------+------------------+
|          Akdeniz| 51.81521739130435|
|     Dogu Anadolu| 51.13095238095238|
|Guneydogu Anadolu| 49.58119658119658|
|          Marmara|49.189542483660134|
|       Ic Anadolu| 49.07772020725388|
|        Karadeniz| 48.75121951219512|
|              Ege|47.888888888888886|
+-----------------+------------------+



---

In [15]:
spark.sql("""
SELECT *,
       ROW_NUMBER() OVER (PARTITION BY product_id ORDER BY total_amt DESC) as rank
FROM FactSale as fact
""").show()

+--------+----------+-----------+-----------+--------+---------+----------+----+
|order_id|product_id|customer_id|retailer_id|quantity|total_amt|      date|rank|
+--------+----------+-----------+-----------+--------+---------+----------+----+
|     325|         1|        849|          2|       3|       75|2023-12-16|   1|
|     872|         1|        410|          4|       3|       75|2023-09-08|   2|
|    2207|         1|        711|          1|       3|       75|2023-11-30|   3|
|    3310|         1|        658|          2|       3|       75|2023-05-06|   4|
|    4250|         1|        111|          2|       3|       75|2023-12-29|   5|
|    4858|         1|        548|          1|       3|       75|2023-10-20|   6|
|     615|         1|        298|          1|       2|       50|2023-02-07|   7|
|     704|         1|        535|          4|       2|       50|2023-01-23|   8|
|    1364|         1|        351|          2|       2|       50|2023-01-19|   9|
|    1400|         1|       

* Amaç, her ürün için en yüksek tutarlı satışları sıralamak ve takip etmektir.

---

In [16]:
(
    df_sal.groupBy("order_id").agg(
        F.sum("quantity").alias("total_quantity"),
        F.sum("total_amt").alias("total_amount")
    ).orderBy("total_quantity", ascending=False)
    .show(10)
)

+--------+--------------+------------+
|order_id|total_quantity|total_amount|
+--------+--------------+------------+
|    3647|            13|         521|
|    2574|            13|         488|
|    3515|            13|         402|
|     101|            12|         359|
|     440|            12|         426|
|    3763|            12|         323|
|    1585|            12|         488|
|    3289|            12|         327|
|    2337|            11|         357|
|    3743|            11|         359|
+--------+--------------+------------+
only showing top 10 rows



* Amaç, en büyük hacimli siparişleri belirleyerek satış performansını analiz etmektir

---

In [17]:
(
    df_cus
    .join(df_reg, df_cus.city_id == df_reg.city_id)
    .groupBy("region_name").agg(
        F.avg(F.year(F.current_date()) - F.year("birth_date")).alias("age")
    )
    .orderBy("age", ascending=False)
    .show()
)

+-----------------+------------------+
|      region_name|               age|
+-----------------+------------------+
|          Akdeniz| 51.81521739130435|
|     Dogu Anadolu| 51.13095238095238|
|Guneydogu Anadolu| 49.58119658119658|
|          Marmara|49.189542483660134|
|       Ic Anadolu| 49.07772020725388|
|        Karadeniz| 48.75121951219512|
|              Ege|47.888888888888886|
+-----------------+------------------+



* Amaç, bölgelerdeki müşteri kitlesinin yaş dağılımını analiz etmektir.

---

## Başka Hangi Sorgular Yazılabilir?

In [18]:
df_sal.join(df_pro, "product_id") \
    .groupBy("product_type") \
    .agg(F.sum("total_amt").alias("total_revenue")) \
    .orderBy("total_revenue", ascending=False) \
    .show()

+------------+-------------+
|product_type|total_revenue|
+------------+-------------+
|      Jacket|       237113|
|    Trousers|       228603|
|       Shirt|       139614|
|      Tshirt|       103765|
+------------+-------------+



* Amaç: Hangi ürün türü (Jacket, Tshirt, vb.) daha fazla ciro getirmiş?

---

In [19]:
df_sal.withColumn("month", F.month("date")) \
    .groupBy("month") \
    .agg(F.sum("total_amt").alias("monthly_revenue")) \
    .orderBy("month") \
    .show()

+-----+---------------+
|month|monthly_revenue|
+-----+---------------+
|    1|          62906|
|    2|          53313|
|    3|          62365|
|    4|          65533|
|    5|          61612|
|    6|          53981|
|    7|          62887|
|    8|          54599|
|    9|          57765|
|   10|          60040|
|   11|          58401|
|   12|          55693|
+-----+---------------+



* Amaç: Satışlar yıl içinde hangi aylarda daha yüksek?

---

In [20]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, datediff

window_spec = Window.partitionBy("customer_id").orderBy("date")

df_sal.withColumn("prev_date", lag("date").over(window_spec)) \
    .withColumn("days_between_orders", datediff("date", "prev_date")) \
    .select("customer_id", "order_id", "date", "days_between_orders") \
    .show()

+-----------+--------+----------+-------------------+
|customer_id|order_id|      date|days_between_orders|
+-----------+--------+----------+-------------------+
|          1|    3161|2023-03-19|               NULL|
|          1|    3161|2023-03-19|                  0|
|          1|    3161|2023-03-19|                  0|
|          1|    3161|2023-03-19|                  0|
|          1|    3161|2023-03-19|                  0|
|          1|    4146|2023-06-28|                101|
|          1|    4146|2023-06-28|                  0|
|          1|    4146|2023-06-28|                  0|
|          1|    4146|2023-06-28|                  0|
|          1|    2756|2023-07-23|                 25|
|          1|    2756|2023-07-23|                  0|
|          1|    2756|2023-07-23|                  0|
|          1|    3796|2023-08-11|                 19|
|          1|    3796|2023-08-11|                  0|
|          1|    3796|2023-08-11|                  0|
|          1|    3796|2023-0

* Amaç: Müşteriler ne sıklıkla alışveriş yapıyor?

---

In [21]:
spark.sql("""
SELECT
    r.region_name,
    COUNT(DISTINCT fs.order_id) AS total_orders
FROM FactSale fs
JOIN DimCustomer c ON fs.customer_id = c.customer_id
JOIN DimRegion r ON c.city_id = r.city_id
GROUP BY r.region_name
ORDER BY total_orders DESC
LIMIT 5
""").show()

+-----------------+------------+
|      region_name|total_orders|
+-----------------+------------+
|        Karadeniz|        1029|
|       Ic Anadolu|         956|
|     Dogu Anadolu|         859|
|          Marmara|         760|
|Guneydogu Anadolu|         569|
+-----------------+------------+



* Amaç: Hangi bölgelerde en fazla sipariş verildiğini analiz ederek, satış yoğunluğu olan bölgeleri belirlemek.

---

In [22]:
spark.sql("""
SELECT
    s.sup_name,
    ROUND(SUM(fp.total_cost) / SUM(fp.pur_quan), 2) AS avg_unit_cost
FROM FactPurchase fp
JOIN DimSupplier s ON fp.supplier_id = s.supplier_id
GROUP BY s.sup_name
ORDER BY avg_unit_cost DESC
""").show()

+------------+-------------+
|    sup_name|avg_unit_cost|
+------------+-------------+
|Side Tekstil|        31.66|
| Ege Ruzgari|         30.9|
|  Moda Kumas|        14.05|
|Kumas Sanati|         6.66|
+------------+-------------+



* Amaç: Hangi tedarikçiden alınan ürünlerin birim maliyetinin daha yüksek veya düşük olduğunu analiz etmek.

---

In [23]:
spark.sql("""SELECT COUNT(*) AS customer_count
FROM (
    SELECT customer_id, product_id, COUNT(*) AS times
    FROM FactSale
    GROUP BY customer_id, product_id
    HAVING COUNT(*) > 1
) AS sub""").show()

+--------------+
|customer_count|
+--------------+
|           357|
+--------------+



* Amaç: Ürünlere olan sadakati ölçmek ve tekrar eden alışveriş yapan müşteri sayısını analiz etmek.

---

In [24]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

top_product_df = df_sal \
    .join(df_cus, "customer_id") \
    .join(df_reg, df_cus.city_id == df_reg.city_id) \
    .join(df_pro, "product_id") \
    .groupBy("region_name", "product_type") \
    .agg(F.sum("quantity").alias("total_quantity"))

window_spec = Window.partitionBy("region_name").orderBy(F.desc("total_quantity"))

top_product_df \
    .withColumn("rank", row_number().over(window_spec)) \
    .filter("rank == 1") \
    .select("region_name", "product_type", "total_quantity") \
    .show()

+-----------------+------------+--------------+
|      region_name|product_type|total_quantity|
+-----------------+------------+--------------+
|          Akdeniz|    Trousers|           577|
|     Dogu Anadolu|      Jacket|          1068|
|              Ege|      Tshirt|           458|
|Guneydogu Anadolu|      Jacket|           723|
|       Ic Anadolu|      Tshirt|          1184|
|        Karadeniz|    Trousers|          1188|
|          Marmara|      Tshirt|           970|
+-----------------+------------+--------------+



* Amaç: Her bölgedeki müşteri tercihlerine göre hangi ürün tipinin daha çok satıldığını belirlemek.

---

## Assessment 1 – Ceket Satışlarının Bölgesel Dağılımı

### Hedef:
2023 Haziran-Ağustos ayları arasında ceket satışlarının toplam miktar ve ciro bazında **bölgelere göre dağılımını** elde etmek.

### Uygulanan Filtreler:
- `product_type = 'Jacket'`
- `date` ∈ [2023-06-01, 2023-08-31]

### Elde Edilen Sütunlar:
- `region_name`
- `product_type`
- `total_quantity`
- `total_amount`

Not : Öncesinde tüm tablolardaki sütunları rahatça görebilmek ve hangi sütunların kullanacağına karar verebilmek amacıyla tablolar üzerinde incelemeler yapıldı.

In [25]:
spark.sql("""
SELECT
    *
FROM DimRegion
""").show(5)

+-------+--------------+-----------------+
|city_id|     city_name|      region_name|
+-------+--------------+-----------------+
|      1|         Adana|       Ic Anadolu|
|      2|      Adiyaman|Guneydogu Anadolu|
|      3|Afyonkarahisar|       Ic Anadolu|
|      4|          Agri|     Dogu Anadolu|
|      5|        Amasya|        Karadeniz|
+-------+--------------+-----------------+
only showing top 5 rows



---

In [26]:
spark.sql("""
SELECT
    *
FROM DimProduct
""").show(5)

+----------+------------+------------+------+------+--------+----------+
|product_id|product_code|product_type|colour|  size|material|unit_price|
+----------+------------+------------+------+------+--------+----------+
|         1| BTS001BLC-S|      Tshirt| Black| Small|  Cotton|        25|
|         2| BTS001BLC-M|      Tshirt| Black|Medium|  Cotton|        25|
|         3| BTS001BLC-L|      Tshirt| Black| Large|  Cotton|        25|
|         4|BTS001BLC-XL|      Tshirt| Black|XLarge|  Cotton|        25|
|         5|  BTS001WC-S|      Tshirt| White| Small|  Cotton|        25|
+----------+------------+------------+------+------+--------+----------+
only showing top 5 rows



---

In [27]:
spark.sql("""
SELECT
    *
FROM FactSale
""").show(5)

+--------+----------+-----------+-----------+--------+---------+----------+
|order_id|product_id|customer_id|retailer_id|quantity|total_amt|      date|
+--------+----------+-----------+-----------+--------+---------+----------+
|       1|       241|        551|          2|       1|       32|2023-08-30|
|       1|       139|        551|          2|       1|       23|2023-08-30|
|       1|        36|        551|          2|       1|       20|2023-08-30|
|       1|       319|        551|          2|       1|       41|2023-08-30|
|       1|         5|        551|          2|       1|       25|2023-08-30|
+--------+----------+-----------+-----------+--------+---------+----------+
only showing top 5 rows



---

In [28]:
spark.sql("""
SELECT
    *
FROM DimCustomer
""").show(5)

+-----------+-------+-------+--------+------+----------+-------------+--------------------+
|customer_id|city_id|   name| surname|gender|birth_date|        phone|               email|
+-----------+-------+-------+--------+------+----------+-------------+--------------------+
|          1|     30| Jazmin|  Burril|Female|1958-09-22|(493) 8889636|jburril0@soundclo...|
|          2|     25| Dalila|   Faers|Female|2000-11-08|(404) 6357120|dfaers1@sitemeter...|
|          3|     15|Wayland|Walework|  Male|1976-03-08|(277) 1691679|wwalework2@quantc...|
|          4|     42|Amberly|  Haquin|Female|1948-10-08|(460) 2147509|ahaquin3@telegrap...|
|          5|     41|Garrett|   Frear|  Male|1957-09-25|(858) 3767105|     gfrear4@tiny.cc|
+-----------+-------+-------+--------+------+----------+-------------+--------------------+
only showing top 5 rows



---

In [29]:
spark.sql("""
SELECT
    *
FROM FactPurchase
""").show(5)

+-----------+-----------+----------+--------+---------+----------+----------+
|purchase_id|supplier_id|product_id|pur_quan|pur_price|total_cost|      date|
+-----------+-----------+----------+--------+---------+----------+----------+
|          1|          2|         1|    1000|       10|     10000|2023-01-01|
|          2|          2|         2|    1500|       10|     10000|2023-01-01|
|          3|          2|         3|     500|       10|      5000|2023-01-01|
|          4|          2|         4|    1000|       10|      5000|2023-01-01|
|          5|          2|         5|    1000|       10|     15000|2023-01-01|
+-----------+-----------+----------+--------+---------+----------+----------+
only showing top 5 rows



---

In [30]:
spark.sql("""
SELECT
    *
FROM DimRetailer
""").show()

+-----------+-------+-------------+-------------+
|retailer_id|city_id|retailer_type|retailer_name|
+-----------+-------+-------------+-------------+
|          1|     34|     Internet|            A|
|          2|     34|     Internet|            B|
|          3|     34|     Internet|            C|
|          4|     35|     Internet|            D|
+-----------+-------+-------------+-------------+



---

In [31]:
spark.sql("""
SELECT
    *
FROM DimSupplier
""").show()

+-----------+-------+------------+--------+
|supplier_id|city_id|    sup_name|sup_type|
+-----------+-------+------------+--------+
|          1|      6|  Moda Kumas|   Shirt|
|          2|     34|Kumas Sanati|  Tshirt|
|          3|      7|Side Tekstil|Trousers|
|          4|     35| Ege Ruzgari|  Jacket|
+-----------+-------+------------+--------+



---

In [32]:
spark.sql("""
SELECT
    *
FROM DimDate
""").show(5)

+----------+---------+----+-----+-------+----+
|      date|      day|week|month|quarter|year|
+----------+---------+----+-----+-------+----+
|2023-01-01|   Sunday|   1|    1|     Q1|2023|
|2023-01-02|   Monday|   1|    1|     Q1|2023|
|2023-01-03|  Tuesday|   1|    1|     Q1|2023|
|2023-01-04|Wednesday|   1|    1|     Q1|2023|
|2023-01-05| Thursday|   1|    1|     Q1|2023|
+----------+---------+----+-----+-------+----+
only showing top 5 rows



---

## Spark SQL ile Çözüm:

### Not : İstenen çıktıya tamamen uydurmak için Case yapısı kullanıldı. Case yapısı kullanmadan elde edilen çıktı:


In [33]:
spark.sql("""
SELECT
    r.region_name,
    'Jacket' AS product_type,
    SUM(fs.quantity) AS total_quantity,
    SUM(fs.total_amt) AS total_amount
FROM FactSale fs
JOIN DimProduct p ON fs.product_id = p.product_id
JOIN DimCustomer c ON fs.customer_id = c.customer_id
JOIN DimRegion r ON c.city_id = r.city_id
WHERE p.product_type = 'Jacket'
  AND CAST(fs.date AS DATE) BETWEEN DATE('2023-06-01') AND DATE('2023-08-31')
GROUP BY r.region_name
ORDER BY total_quantity DESC, region_name ASC
""").show()

+-----------------+------------+--------------+------------+
|      region_name|product_type|total_quantity|total_amount|
+-----------------+------------+--------------+------------+
|        Karadeniz|      Jacket|           310|       12582|
|     Dogu Anadolu|      Jacket|           284|       11547|
|       Ic Anadolu|      Jacket|           260|       10496|
|          Marmara|      Jacket|           213|        8358|
|Guneydogu Anadolu|      Jacket|           176|        6981|
|          Akdeniz|      Jacket|           162|        6637|
|              Ege|      Jacket|           101|        3953|
+-----------------+------------+--------------+------------+



---

### Case yapısı kullanılarak elde edilen çıktı:

In [34]:
spark.sql("""
SELECT
    r.region_name,
    'Jacket' AS product_type,
    SUM(fs.quantity) AS total_quantity,
    SUM(fs.total_amt) AS total_amount
FROM FactSale fs
JOIN DimProduct p ON fs.product_id = p.product_id
JOIN DimCustomer c ON fs.customer_id = c.customer_id
JOIN DimRegion r ON c.city_id = r.city_id
WHERE p.product_type = 'Jacket'
  AND CAST(fs.date AS DATE) BETWEEN DATE('2023-06-01') AND DATE('2023-08-31')
GROUP BY r.region_name
ORDER BY CASE r.region_name
    WHEN 'Marmara' THEN 1
    WHEN 'Dogu Anadolu' THEN 2
    WHEN 'Guneydogu Anadolu' THEN 3
    WHEN 'Ic Anadolu' THEN 4
    WHEN 'Akdeniz' THEN 5
    WHEN 'Karadeniz' THEN 6
    WHEN 'Ege' THEN 7
    ELSE 8
END
""").show()

+-----------------+------------+--------------+------------+
|      region_name|product_type|total_quantity|total_amount|
+-----------------+------------+--------------+------------+
|          Marmara|      Jacket|           213|        8358|
|     Dogu Anadolu|      Jacket|           284|       11547|
|Guneydogu Anadolu|      Jacket|           176|        6981|
|       Ic Anadolu|      Jacket|           260|       10496|
|          Akdeniz|      Jacket|           162|        6637|
|        Karadeniz|      Jacket|           310|       12582|
|              Ege|      Jacket|           101|        3953|
+-----------------+------------+--------------+------------+



---

## PySpark API ile Çözüm :

### Spark tablosuna dönüştürme işlemi :

In [35]:
FactSale = spark.table("FactSale")
DimProduct = spark.table("DimProduct")
DimCustomer = spark.table("DimCustomer")
DimRegion = spark.table("DimRegion")

---

### DataFrame oluşturma:

In [36]:
from pyspark.sql.functions import col, sum as _sum, lit, to_date, when

result_df = FactSale \
    .join(DimProduct, "product_id") \
    .join(DimCustomer, "customer_id") \
    .join(DimRegion, DimCustomer["city_id"] == DimRegion["city_id"]) \
    .filter(
        (col("product_type") == "Jacket") &
        (to_date("date") >= lit("2023-06-01")) &
        (to_date("date") <= lit("2023-08-31"))
    ) \
    .groupBy("region_name") \
    .agg(
        lit("Jacket").alias("product_type"),
        _sum("quantity").alias("total_quantity"),
        _sum("total_amt").alias("total_amount")
    )

---

### Sıralama:

In [37]:
ordered_df = result_df.withColumn(
    "sort_order",
    when(col("region_name") == "Marmara", 1)
    .when(col("region_name") == "Dogu Anadolu", 2)
    .when(col("region_name") == "Guneydogu Anadolu", 3)
    .when(col("region_name") == "Ic Anadolu", 4)
    .when(col("region_name") == "Akdeniz", 5)
    .when(col("region_name") == "Karadeniz", 6)
    .when(col("region_name") == "Ege", 7)
    .otherwise(8)
)

---

In [38]:
ordered_df.orderBy("sort_order").select(
    "region_name", "product_type", "total_quantity", "total_amount"
).show()

+-----------------+------------+--------------+------------+
|      region_name|product_type|total_quantity|total_amount|
+-----------------+------------+--------------+------------+
|          Marmara|      Jacket|           213|        8358|
|     Dogu Anadolu|      Jacket|           284|       11547|
|Guneydogu Anadolu|      Jacket|           176|        6981|
|       Ic Anadolu|      Jacket|           260|       10496|
|          Akdeniz|      Jacket|           162|        6637|
|        Karadeniz|      Jacket|           310|       12582|
|              Ege|      Jacket|           101|        3953|
+-----------------+------------+--------------+------------+



---

## Assessment 2 – Perakendecilerin Bölgesel Ciro Analizi

### Hedef:
Her perakendecinin ceket satışlarıyla en fazla ciro yaptığı **bölgeyi belirlemek**.

### Adımlar:
1. Satışlar, müşteri lokasyonuna göre bölgelere bağlandı (customer → city → region)
2. Her `retailer_id` için `region_name` bazında toplam `total_amt` hesaplandı.
3. `ROW_NUMBER() OVER (...)` window function ile en yüksek ciro yapılan bölge tespit edildi.

## Spark SQL ile Çözüm:


In [39]:
spark.sql("""
SELECT
    retailer_id,
    retailer_name,
    region_name,
    total_amount
FROM (
    SELECT
        fs.retailer_id,
        dr.retailer_name,
        r.region_name,
        SUM(fs.total_amt) AS total_amount,
        ROW_NUMBER() OVER (
            PARTITION BY fs.retailer_id
            ORDER BY SUM(fs.total_amt) DESC
        ) AS rank
    FROM FactSale fs
    JOIN DimRetailer dr ON fs.retailer_id = dr.retailer_id
    JOIN DimCustomer c ON fs.customer_id = c.customer_id
    JOIN DimRegion r ON c.city_id = r.city_id
    GROUP BY fs.retailer_id, dr.retailer_name, r.region_name
) temp
WHERE rank = 1
ORDER BY retailer_id
""").show()

+-----------+-------------+-----------+------------+
|retailer_id|retailer_name|region_name|total_amount|
+-----------+-------------+-----------+------------+
|          1|            A|  Karadeniz|       42642|
|          2|            B| Ic Anadolu|       71689|
|          3|            C| Ic Anadolu|       11995|
|          4|            D|  Karadeniz|       16081|
+-----------+-------------+-----------+------------+



---

## PySpark API ile Çözüm:

In [40]:
FactSale = spark.table("FactSale")
DimCustomer = spark.table("DimCustomer")
DimRetailer = spark.table("DimRetailer")
DimRegion = spark.table("DimRegion")

---

In [41]:
from pyspark.sql.functions import sum as _sum

result_df = FactSale \
    .join(DimRetailer, "retailer_id") \
    .join(DimCustomer, "customer_id") \
    .join(DimRegion, DimCustomer["city_id"] == DimRegion["city_id"]) \
    .groupBy("retailer_id", "retailer_name", "region_name") \
    .agg(_sum("total_amt").alias("total_amount"))

---

In [42]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Pencere tanımı: her retailer için büyükten küçüğe ciroya göre
window_spec = Window.partitionBy("retailer_id").orderBy(result_df["total_amount"].desc())

top_region_df = result_df \
    .withColumn("rank", row_number().over(window_spec)) \
    .filter(col("rank") == 1) \
    .select("retailer_id", "retailer_name", "region_name", "total_amount")

---

In [43]:
top_region_df.select("retailer_id", "retailer_name", "region_name", "total_amount").show()

+-----------+-------------+-----------+------------+
|retailer_id|retailer_name|region_name|total_amount|
+-----------+-------------+-----------+------------+
|          1|            A|  Karadeniz|       42642|
|          2|            B| Ic Anadolu|       71689|
|          3|            C| Ic Anadolu|       11995|
|          4|            D|  Karadeniz|       16081|
+-----------+-------------+-----------+------------+



---

# Proje Linki : https://github.com/edanurarslan/Pyspark-Sales-Analysis

# Hazırlayan: Eda Nur ARSLAN