<div style="background-color: white; padding: 10px; border-bottom: 6px solid #C2172D;">
    <h2 style="color: black" id="introduction">Batch Data Processing with Apache Spark</h2>
    <p></p>
</div>


In [None]:
from google.colab import drive
drive.mount('/content/drive')

## [Contents](#contents)
1. [Introduction](#introduction)
2. [Importing the libraries](#library)
3. [Reading the data](#read_data)
4. [SparkSQL Practices](#spark_sql_practices)
   * [Selecting columns](#selecting_columns)
   * [Data manipulation](#data_manipulation)
   * [Filtering rows](#filtering_rows)
   * [Aggregating data](#aggregating_data)
   * [Joining](#joining)
5. [Case Studies](#assignments)
   * [Assignment 1: Jacket sales per region](#assignment_1)
   * [Assignment 2: Maximum turnover of the retailer regions](#assignment_2)

<div style="background-color: white; padding: 10px; border-bottom: 4px solid #C2172D;">
    <a id="introduction">
        <h3 style="color: #C2172D">1. Introduction</h3>
    </a>  
</div>

<img src="assets/img/data_model.svg"  style="width:1000px; padding: 20px"/>

#### SQL Tables Description
- **FactSale:** Sales transactions fact table
- **FactPurchase:** Purchases fact table
- **DimRetailer:** Retailer details dimension table
- **DimCustomer:** Customer details dimension table
- **DimProduct:** Product details dimension table
- **DimRegion:** Region details dimension table
- **DimDate:** Date dimension table
- **DimSupplier:** Supplier details dimension table

<div style="background-color: white; padding: 10px; border-bottom: 4px solid #C2172D;">
    <a id="library">
        <h3 style="color: #C2172D">2. Importing the libraries</h3>
    </a>
</div>

In [None]:
from pyspark.sql import SparkSession, functions as F

In [None]:
!pip install py4j



<div style="background-color: white; padding: 10px; border-bottom: 4px solid #C2172D;">
    <a id="read_data">
        <h3 style="color: #C2172D">3. Reading the data</h3>
    </a>
</div>

In [None]:
# Creating new SparkSession instance
spark = SparkSession.builder.appName("PySparkExample").getOrCreate()

# Reading parquet data and assigning to DataFrame variables
df_ret = spark.read.parquet("/content/retail.parquet")
df_cus = spark.read.parquet("/content/part-00000-0eb2cc7e-9512-4a03-89b7-838575beabff-c000.snappy.parquet")
df_pur = spark.read.parquet("/content/part-00000-29203130-0a81-43a4-9c6f-5c9f50912c86-c000.snappy.parquet")
df_sal = spark.read.parquet("/content/sale.parquet")


df_pro = spark.read.parquet("/content/product.parquet")
df_sup = spark.read.parquet("/content/supplier.parquet")
df_reg = spark.read.parquet("/content/region.parquet")
df_date = spark.read.parquet("/content/date.parquet")

# Creating temporary view tables for Spark SQL queries
df_cus.createOrReplaceTempView("DimCustomer")
df_pur.createOrReplaceTempView("FactPurchase")
df_sal.createOrReplaceTempView("FactSale")
df_ret.createOrReplaceTempView("DimRetailer")
df_pro.createOrReplaceTempView("DimProduct")
df_sup.createOrReplaceTempView("DimSupplier")
df_reg.createOrReplaceTempView("DimRegion")
df_date.createOrReplaceTempView("DimDate")

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("PySparkExample").getOrCreate()


<div style="background-color: white; padding: 10px; border-bottom: 4px solid #C2172D;">
    <a id="spark_sql_practices">
        <h3 style="color: #C2172D">4. Spark SQL Practices</h3>
    </a>
</div>

**<a id="selecting_columns">Selecting columns</a>**

In [None]:
spark.sql("SELECT customer_id, name, surname, birth_date FROM DimCustomer LIMIT 5").show()

+-----------+-------+--------+----------+
|customer_id|   name| surname|birth_date|
+-----------+-------+--------+----------+
|          1| Jazmin|  Burril|1958-09-22|
|          2| Dalila|   Faers|2000-11-08|
|          3|Wayland|Walework|1976-03-08|
|          4|Amberly|  Haquin|1948-10-08|
|          5|Garrett|   Frear|1957-09-25|
+-----------+-------+--------+----------+



In [None]:
df_cus.select("customer_id", "name", "surname", "birth_date").show(5)

+-----------+-------+--------+----------+
|customer_id|   name| surname|birth_date|
+-----------+-------+--------+----------+
|          1| Jazmin|  Burril|1958-09-22|
|          2| Dalila|   Faers|2000-11-08|
|          3|Wayland|Walework|1976-03-08|
|          4|Amberly|  Haquin|1948-10-08|
|          5|Garrett|   Frear|1957-09-25|
+-----------+-------+--------+----------+
only showing top 5 rows



**<a id="data_manipulation">Data manipulation: </a>** Calculating the ages from date of birth data.

In [None]:
spark.sql("""
SELECT
    customer_id
    ,name
    ,surname
    ,YEAR(CURRENT_DATE()) - YEAR(birth_date) AS age
FROM DimCustomer
LIMIT 5
""").show()

+-----------+-------+--------+---+
|customer_id|   name| surname|age|
+-----------+-------+--------+---+
|          1| Jazmin|  Burril| 67|
|          2| Dalila|   Faers| 25|
|          3|Wayland|Walework| 49|
|          4|Amberly|  Haquin| 77|
|          5|Garrett|   Frear| 68|
+-----------+-------+--------+---+



In [None]:
(
    df_cus.withColumn("age", F.year(F.current_date()) - F.year("birth_date"))
    .select("customer_id", "name", "surname", "age")
    .show(5)
)

+-----------+-------+--------+---+
|customer_id|   name| surname|age|
+-----------+-------+--------+---+
|          1| Jazmin|  Burril| 67|
|          2| Dalila|   Faers| 25|
|          3|Wayland|Walework| 49|
|          4|Amberly|  Haquin| 77|
|          5|Garrett|   Frear| 68|
+-----------+-------+--------+---+
only showing top 5 rows



**<a id="filtering_rows">Filtering rows</a>**

In [None]:
spark.sql("""
SELECT
    name
    ,surname
    ,age
FROM
(
    SELECT
        customer_id
        ,name
        ,surname
        ,YEAR(CURRENT_DATE()) - YEAR(birth_date) AS age
    FROM DimCustomer
)
WHERE age >= 30
LIMIT 5
""").show()

+-------+--------+---+
|   name| surname|age|
+-------+--------+---+
| Jazmin|  Burril| 67|
|Wayland|Walework| 49|
|Amberly|  Haquin| 77|
|Garrett|   Frear| 68|
|  Horst|   Isted| 50|
+-------+--------+---+



In [None]:
(
    df_cus.withColumn("age", F.year(F.current_date()) - F.year("birth_date"))
    .select("name", "surname", "age")
    .filter(F.col("age") >= 30)
    .show(5)
)

+-------+--------+---+
|   name| surname|age|
+-------+--------+---+
| Jazmin|  Burril| 67|
|Wayland|Walework| 49|
|Amberly|  Haquin| 77|
|Garrett|   Frear| 68|
|  Horst|   Isted| 50|
+-------+--------+---+
only showing top 5 rows



**<a id="aggregating_data">Aggregating data</a>**

In [None]:
(
    df_sal.groupBy("order_id").agg(
        F.sum("quantity").alias("total_quantity"),
        F.sum("total_amt").alias("total_amount")
    ).orderBy("total_quantity", ascending=False)
    .show(10)
)

+--------+--------------+------------+
|order_id|total_quantity|total_amount|
+--------+--------------+------------+
|    3647|            13|         521|
|    2574|            13|         488|
|    3515|            13|         402|
|     101|            12|         359|
|     440|            12|         426|
|    3763|            12|         323|
|    1585|            12|         488|
|    3289|            12|         327|
|    2337|            11|         357|
|    3743|            11|         359|
+--------+--------------+------------+
only showing top 10 rows



**<a id="joining">Joining</a>**

In [None]:
spark.sql("""
SELECT
    region_name
    ,AVG(YEAR(CURRENT_DATE()) - YEAR(birth_date)) AS age
FROM DimCustomer cus
INNER JOIN DimRegion reg
ON cus.city_id = reg.city_id
GROUP BY region_name
ORDER BY age DESC
""").show()

+-----------------+------------------+
|      region_name|               age|
+-----------------+------------------+
|          Akdeniz| 51.81521739130435|
|     Dogu Anadolu| 51.13095238095238|
|Guneydogu Anadolu| 49.58119658119658|
|          Marmara|49.189542483660134|
|       Ic Anadolu| 49.07772020725388|
|        Karadeniz| 48.75121951219512|
|              Ege|47.888888888888886|
+-----------------+------------------+



In [None]:
(
    df_cus
    .join(df_reg, df_cus.city_id == df_reg.city_id)
    .groupBy("region_name").agg(
        F.avg(F.year(F.current_date()) - F.year("birth_date")).alias("age")
    )
    .orderBy("age", ascending=False)
    .show()
)

+-----------------+------------------+
|      region_name|               age|
+-----------------+------------------+
|          Akdeniz| 50.81521739130435|
|     Dogu Anadolu| 50.13095238095238|
|Guneydogu Anadolu| 48.58119658119658|
|          Marmara|48.189542483660134|
|       Ic Anadolu| 48.07772020725388|
|        Karadeniz| 47.75121951219512|
|              Ege|46.888888888888886|
+-----------------+------------------+



<div style="background-color: white; padding: 10px; border-bottom: 4px solid #C2172D;">
    <a id="case_studies">
        <h3 style="color: #C2172D">5. Case Studies</h3>
    </a>  
</div>

<div style="background-color: white; padding: 10px;">
    <a id="assignment_1">
        <h4 style="color: #0D9276">Assignment 1: Jacket sales per region</h3>
    </a>
</div>
<br>
<h4>
    Write SparkSQL scripts that results: Region-based total quantity and amount of jacket sales between June and August 2023.
</h4>
<p>The expected out is as follows: </p>

| region_name       | product_type | total_quantity | total_amount |   |
|-------------------|--------------|----------------|--------------|---|
| Marmara           | Jacket       | 213            | 8358         |   |
| Dogu Anadolu      | Jacket       | 284            | 11547        |   |
| Guneydogu Anadolu | Jacket       | 176            | 6981         |   |
| Ic Anadolu        | Jacket       | 260            | 10496        |   |
| Akdeniz           | Jacket       | 162            | 6637         |   |
| Karadeniz         | Jacket       | 310            | 12582        |   |
| Ege               | Jacket       | 101            | 3953      


### External links
- https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.join.html
- https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.filter.html
- https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.groupBy.html   |   |

In [None]:
#sparksql çözümüm:
spark.sql("""
SELECT
    DimRegion.region_name,
    DimProduct.product_type,
    SUM(FactSale.quantity) AS total_quantity,
    SUM(FactSale.total_amt) AS total_amount
    FROM DimProduct
    JOIN FactSale ON DimProduct.product_id = FactSale.product_id
    JOIN DimCustomer ON FactSale.customer_id = DimCustomer.customer_id
    JOIN DimRegion ON DimCustomer.city_id = DimRegion.city_id
    JOIN DimDate ON FactSale.date = DimDate.date
    WHERE DimProduct.product_type = 'Jacket'
    AND DimDate.year = 2023
    AND DimDate.month BETWEEN 6 AND 8
    GROUP BY DimRegion.region_name, DimProduct.product_type

""").show()

+-----------------+------------+--------------+------------+
|      region_name|product_type|total_quantity|total_amount|
+-----------------+------------+--------------+------------+
|          Marmara|      Jacket|           213|        8358|
|     Dogu Anadolu|      Jacket|           284|       11547|
|Guneydogu Anadolu|      Jacket|           176|        6981|
|       Ic Anadolu|      Jacket|           260|       10496|
|          Akdeniz|      Jacket|           162|        6637|
|        Karadeniz|      Jacket|           310|       12582|
|              Ege|      Jacket|           101|        3953|
+-----------------+------------+--------------+------------+



In [None]:
#pyspark çözümüm:
from pyspark.sql.functions import col
#-- kolaylık olması açısından tablolara alias verdim
result=(
    df_pro.alias("DimProduct")
    .join(
        df_sal.alias("FactSale"),
        col("DimProduct.product_id") == col("FactSale.product_id"),
        "inner"
    )
    .join(
        df_cus.alias("DimCustomer"),
        col("FactSale.customer_id") == col("DimCustomer.customer_id"),
        "inner"
    )
    .join(
        df_reg.alias("DimRegion"),
        col("DimCustomer.city_id") == col("DimRegion.city_id"),
        "inner"
    )
    .join(
        df_date.alias("DimDate"),
        col("FactSale.date") == col("DimDate.date"),
        "inner"
    )
    .filter(
        (col("DimProduct.product_type") == "Jacket") &
        (col("DimDate.year") == 2023) &
        (col("DimDate.month").between(6, 8))
    )
    .groupBy("DimRegion.region_name", "DimProduct.product_type")
    .agg(
        sum("FactSale.quantity").alias("total_quantity"),
        sum("FactSale.total_amt").alias("total_amount")
    )
    .select(
        "DimRegion.region_name",
        "DimProduct.product_type",
        "total_quantity",
        "total_amount"
    )
)

result.show()

+-----------------+------------+--------------+------------+
|      region_name|product_type|total_quantity|total_amount|
+-----------------+------------+--------------+------------+
|          Marmara|      Jacket|           213|        8358|
|     Dogu Anadolu|      Jacket|           284|       11547|
|Guneydogu Anadolu|      Jacket|           176|        6981|
|       Ic Anadolu|      Jacket|           260|       10496|
|          Akdeniz|      Jacket|           162|        6637|
|        Karadeniz|      Jacket|           310|       12582|
|              Ege|      Jacket|           101|        3953|
+-----------------+------------+--------------+------------+



<div style="background-color: white; padding: 10px;">
    <a id="assignment_2">
        <h4 style="color: #0D9276">Assignment 2: Maximum turnover of the retailer regions</h3>
    </a>
</div>
<br>
<h4>
    Find the maximum turnover region of each retailer, and obtain total amount for each retailer and region.
</h4>
<p>The expected out is as follows: </p>

| retailer_id | retailer_name | region_name | total_amount |
|-------------|---------------|-------------|--------------|
| 1           | A             | Karadeniz   | 42642        |
| 2           | B             | Ic Anadolu  | 71689        |
| 3           | C             | Ic Anadolu  | 11995        |
| 4           | C             | Karadeniz   | 16081        |

<br>

In [None]:
#sparksql çözümüm

query = """
WITH tablo AS (
  -- "tablo" adında geçici bir tablo oluşturdum ileride istediğim sütunları buradan seçeceğim
    SELECT
        DimRetailer.retailer_id,
        DimRetailer.retailer_name,
        DimRegion.region_name,
        SUM(FactSale.total_amt) AS total_amount,
        ROW_NUMBER() OVER (PARTITION BY DimRetailer.retailer_id ORDER BY SUM(FactSale.total_amt) DESC) AS retailerbest  -- en yüksek satışı ROW_NUMBER fonksiyonu ile buldum : "retailerbest"
                                                                                                              -- retailer_id' ye göre gruplama yaptım ve büyükten küçüğe sıraladım
    FROM FactSale
    LEFT JOIN DimRetailer ON FactSale.retailer_id = DimRetailer.retailer_id
    LEFT JOIN DimCustomer ON FactSale.customer_id = DimCustomer.customer_id
    LEFT JOIN DimRegion ON DimCustomer.city_id = DimRegion.city_id
    GROUP BY DimRetailer.retailer_id, DimRetailer.retailer_name, DimRegion.region_name
)
SELECT
    retailer_id,
    retailer_name,
    region_name,
    total_amount
FROM tablo
WHERE retailerbest = 1
ORDER BY retailer_id;
"""
result = spark.sql(query)
result.show()


+-----------+-------------+-----------+------------+
|retailer_id|retailer_name|region_name|total_amount|
+-----------+-------------+-----------+------------+
|          1|            A|  Karadeniz|       42642|
|          2|            B| Ic Anadolu|       71689|
|          3|            C| Ic Anadolu|       11995|
|          4|            D|  Karadeniz|       16081|
+-----------+-------------+-----------+------------+



In [None]:
#pyspark çözümüm
from pyspark.sql import functions as F
from pyspark.sql.window import Window
sales_data = (
    df_sal
    .join(df_ret, "retailer_id")
    .join(df_cus, "customer_id")
    .join(df_reg, df_cus.city_id == df_reg.city_id)
    .groupby("retailer_id", "retailer_name", "region_name")
    .agg(F.sum("total_amt").alias("total_amount"))
)

window_spec = Window.partitionBy("retailer_id").orderBy(F.desc("total_amount")) #en fazla satış yapan en üstte olacak
result = sales_data.withColumn("rank", F.row_number().over(window_spec)).filter(F.col("rank") == 1) #en fazla satış yapan bölge 1 numaralı olacağı için o bölgeleri seçiyorum
result.select("retailer_id", "retailer_name", "region_name", "total_amount").show()

+-----------+-------------+-----------+------------+
|retailer_id|retailer_name|region_name|total_amount|
+-----------+-------------+-----------+------------+
|          1|            A|  Karadeniz|       42642|
|          2|            B| Ic Anadolu|       71689|
|          3|            C| Ic Anadolu|       11995|
|          4|            D|  Karadeniz|       16081|
+-----------+-------------+-----------+------------+



In [None]:
query = """
WITH tablo AS (
    SELECT
        DimRetailer.retailer_id,
        DimRetailer.retailer_name,
        DimRegion.region_name,
        SUM(FactSale.total_amt) AS total_amount,
        RANK() OVER (PARTITION BY DimRetailer.retailer_id ORDER BY SUM(FactSale.total_amt) DESC) AS retailerbest
    FROM FactSale
    LEFT JOIN DimRetailer ON FactSale.retailer_id = DimRetailer.retailer_id
    LEFT JOIN DimCustomer ON FactSale.customer_id = DimCustomer.customer_id
    LEFT JOIN DimRegion ON DimCustomer.city_id = DimRegion.city_id
    GROUP BY DimRetailer.retailer_id, DimRetailer.retailer_name, DimRegion.region_name
)
SELECT
    retailer_id,
    retailer_name,
    region_name,
    total_amount
FROM tablo
WHERE retailerbest = 1
ORDER BY retailer_id;

"""
# Execute the query using spark.sql
result = spark.sql(query)
result.show()

+-----------+-------------+-----------+------------+
|retailer_id|retailer_name|region_name|total_amount|
+-----------+-------------+-----------+------------+
|          1|            A|  Karadeniz|       42642|
|          2|            B| Ic Anadolu|       71689|
|          3|            C| Ic Anadolu|       11995|
|          4|            D|  Karadeniz|       16081|
+-----------+-------------+-----------+------------+



# ***ENİSE AHSEN KARADAĞ***