<img src="assets/img/huawei_logo.png"  style="width:160px;"/>
<div style="background-color: white; padding: 10px; border-bottom: 6px solid #C2172D;">
    <h2 style="color: black" id="introduction">Batch Data Processing with Apache Spark</h2>
    <p>Tolgahan Cepel - Mert Akat</p>
    <p></p>
</div>


## [Contents](#contents)
1. [Introduction](#introduction)
2. [Importing the libraries](#library)
3. [Reading the data](#read_data)
4. [SparkSQL API Practices](#spark_sql_practices)
   * [Selecting columns](#selecting_columns)
   * [Data manipulation](#data_manipulation)
   * [Filtering rows](#filtering_rows)
   * [Aggregating data](#aggregating_data)
   * [Joining](#joining)
5. [Case Studies](#assignments)
   * [Assignment 1: Jacket sales per region](#assignment_1)
   * [Assignment 2: Maximum turnover of the retailer regions](#assignment_2)

<div style="background-color: white; padding: 10px; border-bottom: 4px solid #C2172D;">
    <a id="introduction">
        <h3 style="color: #C2172D">1. Introduction</h3>
    </a>  
</div>

<img src="assets/img/data_model.svg"  style="width:1000px; padding: 20px"/>

#### SQL Tables Description
- **FactSale:** Sales transactions fact table
- **FactPurchase:** Purchases fact table
- **DimRetailer:** Retailer details dimension table
- **DimCustomer:** Customer details dimension table
- **DimProduct:** Product details dimension table
- **DimRegion:** Region details dimension table
- **DimDate:** Date dimension table
- **DimSupplier:** Supplier details dimension table

<div style="background-color: white; padding: 10px; border-bottom: 4px solid #C2172D;">
    <a id="library">
        <h3 style="color: #C2172D">2. Importing the libraries</h3>
    </a>
</div>

In [4]:
!pip install pyspark



In [3]:
from pyspark.sql import SparkSession, functions as F

In [5]:
#drive connection
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


<div style="background-color: white; padding: 10px; border-bottom: 4px solid #C2172D;">
    <a id="read_data">
        <h3 style="color: #C2172D">3. Reading the data</h3>
    </a>
</div>

In [6]:
# Creating new SparkSession instance
spark = SparkSession.builder.appName("PySparkExample").getOrCreate()

# Reading parquet data and assigning to DataFrame variables
df_pur = spark.read.parquet("/content/drive/MyDrive/W3_Spark_Modul_Projesi/W3_Spark_Modul_Projesi/data/purchase")
df_sal = spark.read.parquet("/content/drive/MyDrive/W3_Spark_Modul_Projesi/W3_Spark_Modul_Projesi/data/sale")
df_cus = spark.read.parquet("/content/drive/MyDrive/W3_Spark_Modul_Projesi/W3_Spark_Modul_Projesi/data/customer")
df_ret = spark.read.parquet("/content/drive/MyDrive/W3_Spark_Modul_Projesi/W3_Spark_Modul_Projesi/data/retailer")
df_pro = spark.read.parquet("/content/drive/MyDrive/W3_Spark_Modul_Projesi/W3_Spark_Modul_Projesi/data/product")
df_sup = spark.read.parquet("/content/drive/MyDrive/W3_Spark_Modul_Projesi/W3_Spark_Modul_Projesi/data/supplier")
df_reg = spark.read.parquet("/content/drive/MyDrive/W3_Spark_Modul_Projesi/W3_Spark_Modul_Projesi/data/region")
df_date = spark.read.parquet("/content/drive/MyDrive/W3_Spark_Modul_Projesi/W3_Spark_Modul_Projesi/data/date")

# Creating temporary view tables for Spark SQL queries
df_cus.createOrReplaceTempView("DimCustomer")
df_pur.createOrReplaceTempView("FactPurchase")
df_sal.createOrReplaceTempView("FactSale")
df_ret.createOrReplaceTempView("DimRetailer")
df_pro.createOrReplaceTempView("DimProduct")
df_sup.createOrReplaceTempView("DimSupplier")
df_reg.createOrReplaceTempView("DimRegion")
df_date.createOrReplaceTempView("DimDate")

In [7]:
spark.sql(f"SELECT * FROM FactPurchase LIMIT 5").show()

+-----------+-----------+----------+--------+---------+----------+----------+
|purchase_id|supplier_id|product_id|pur_quan|pur_price|total_cost|      date|
+-----------+-----------+----------+--------+---------+----------+----------+
|          1|          2|         1|    1000|       10|     10000|2023-01-01|
|          2|          2|         2|    1500|       10|     10000|2023-01-01|
|          3|          2|         3|     500|       10|      5000|2023-01-01|
|          4|          2|         4|    1000|       10|      5000|2023-01-01|
|          5|          2|         5|    1000|       10|     15000|2023-01-01|
+-----------+-----------+----------+--------+---------+----------+----------+



In [8]:
spark.sql(f"SELECT * FROM FactSale LIMIT 5").show()

+--------+----------+-----------+-----------+--------+---------+----------+
|order_id|product_id|customer_id|retailer_id|quantity|total_amt|      date|
+--------+----------+-----------+-----------+--------+---------+----------+
|       1|       241|        551|          2|       1|       32|2023-08-30|
|       1|       139|        551|          2|       1|       23|2023-08-30|
|       1|        36|        551|          2|       1|       20|2023-08-30|
|       1|       319|        551|          2|       1|       41|2023-08-30|
|       1|         5|        551|          2|       1|       25|2023-08-30|
+--------+----------+-----------+-----------+--------+---------+----------+



In [9]:
spark.sql(f"SELECT * FROM DimRetailer LIMIT 5").show()

+-----------+-------+-------------+-------------+
|retailer_id|city_id|retailer_type|retailer_name|
+-----------+-------+-------------+-------------+
|          1|     34|     Internet|  Hepsiburada|
|          2|     34|     Internet|     Trendyol|
|          3|     34|     Internet|          n11|
|          4|     35|     Internet| Gittigidiyor|
+-----------+-------+-------------+-------------+



In [10]:
spark.sql(f"SELECT * FROM DimProduct LIMIT 5").show()

+----------+------------+------------+------+------+--------+----------+
|product_id|product_code|product_type|colour|  size|material|unit_price|
+----------+------------+------------+------+------+--------+----------+
|         1| BTS001BLC-S|      Tshirt| Black| Small|  Cotton|        25|
|         2| BTS001BLC-M|      Tshirt| Black|Medium|  Cotton|        25|
|         3| BTS001BLC-L|      Tshirt| Black| Large|  Cotton|        25|
|         4|BTS001BLC-XL|      Tshirt| Black|XLarge|  Cotton|        25|
|         5|  BTS001WC-S|      Tshirt| White| Small|  Cotton|        25|
+----------+------------+------------+------+------+--------+----------+



In [11]:
spark.sql(f"SELECT * FROM DimSupplier LIMIT 5").show()

+-----------+-------+------------+--------+
|supplier_id|city_id|    sup_name|sup_type|
+-----------+-------+------------+--------+
|          1|      6|  Moda Kumas|   Shirt|
|          2|     34|Kumas Sanati|  Tshirt|
|          3|      7|Side Tekstil|Trousers|
|          4|     35| Ege Ruzgari|  Jacket|
+-----------+-------+------------+--------+



In [12]:
spark.sql(f"SELECT * FROM DimDate LIMIT 5").show()

+----------+---------+----+-----+-------+----+
|      date|      day|week|month|quarter|year|
+----------+---------+----+-----+-------+----+
|2023-01-01|   Sunday|   1|    1|     Q1|2023|
|2023-01-02|   Monday|   1|    1|     Q1|2023|
|2023-01-03|  Tuesday|   1|    1|     Q1|2023|
|2023-01-04|Wednesday|   1|    1|     Q1|2023|
|2023-01-05| Thursday|   1|    1|     Q1|2023|
+----------+---------+----+-----+-------+----+



In [13]:
spark.sql(f"SELECT * FROM DimCustomer LIMIT 5").show()

+-----------+-------+-------+--------+------+----------+-------------+--------------------+
|customer_id|city_id|   name| surname|gender|birth_date|        phone|               email|
+-----------+-------+-------+--------+------+----------+-------------+--------------------+
|          1|     30| Jazmin|  Burril|Female|1958-09-22|(493) 8889636|jburril0@soundclo...|
|          2|     25| Dalila|   Faers|Female|2000-11-08|(404) 6357120|dfaers1@sitemeter...|
|          3|     15|Wayland|Walework|  Male|1976-03-08|(277) 1691679|wwalework2@quantc...|
|          4|     42|Amberly|  Haquin|Female|1948-10-08|(460) 2147509|ahaquin3@telegrap...|
|          5|     41|Garrett|   Frear|  Male|1957-09-25|(858) 3767105|     gfrear4@tiny.cc|
+-----------+-------+-------+--------+------+----------+-------------+--------------------+



In [14]:
spark.sql("Select * from DimRegion Limit 8").show()

+-------+--------------+-----------------+
|city_id|     city_name|      region_name|
+-------+--------------+-----------------+
|      1|         Adana|       Ic Anadolu|
|      2|      Adiyaman|Guneydogu Anadolu|
|      3|Afyonkarahisar|       Ic Anadolu|
|      4|          Agri|     Dogu Anadolu|
|      5|        Amasya|        Karadeniz|
|      6|        Ankara|       Ic Anadolu|
|      7|       Antalya|          Akdeniz|
|      8|        Artvin|        Karadeniz|
+-------+--------------+-----------------+



In [15]:
spark.sql("""
SELECT
    *
FROM  DimRegion cus
Left Outer Join DimCustomer reg
ON  reg.city_id = cus.city_id """).show()

+-------+---------+-----------------+-----------+-------+---------+-----------+------+----------+-------------+--------------------+
|city_id|city_name|      region_name|customer_id|city_id|     name|    surname|gender|birth_date|        phone|               email|
+-------+---------+-----------------+-----------+-------+---------+-----------+------+----------+-------------+--------------------+
|      1|    Adana|       Ic Anadolu|        971|      1|     Dodi|      Meins|Female|1947-02-01|(221) 9463628|dmeinsqy@canalblo...|
|      1|    Adana|       Ic Anadolu|        925|      1|  Harmony|     Saffin|Female|1984-07-30|(650) 4415044| hsaffinpo@webmd.com|
|      1|    Adana|       Ic Anadolu|        903|      1|    Diena|   Kidstoun|Female|1998-03-28|(911) 6731826| dkidstounp2@bbb.org|
|      1|    Adana|       Ic Anadolu|        838|      1| Kathleen|        Wyd|Female|1952-06-09|(731) 4902859|   kwydn9@smh.com.au|
|      1|    Adana|       Ic Anadolu|        729|      1|   Giffie|  

In [16]:
spark.sql("""
SELECT
    *
FROM  DimRegion cus
Inner Join DimCustomer reg
ON  reg.city_id = cus.city_id """).show()

+-------+-------------+-----------------+-----------+-------+---------+-------------+------+----------+-------------+--------------------+
|city_id|    city_name|      region_name|customer_id|city_id|     name|      surname|gender|birth_date|        phone|               email|
+-------+-------------+-----------------+-----------+-------+---------+-------------+------+----------+-------------+--------------------+
|     30|      Hakkari|     Dogu Anadolu|          1|     30|   Jazmin|       Burril|Female|1958-09-22|(493) 8889636|jburril0@soundclo...|
|     25|      Erzurum|     Dogu Anadolu|          2|     25|   Dalila|        Faers|Female|2000-11-08|(404) 6357120|dfaers1@sitemeter...|
|     15|       Burdur|          Akdeniz|          3|     15|  Wayland|     Walework|  Male|1976-03-08|(277) 1691679|wwalework2@quantc...|
|     42|        Konya|       Ic Anadolu|          4|     42|  Amberly|       Haquin|Female|1948-10-08|(460) 2147509|ahaquin3@telegrap...|
|     41|      Kocaeli|    

<div style="background-color: white; padding: 10px; border-bottom: 4px solid #C2172D;">
    <a id="spark_sql_practices">
        <h3 style="color: #C2172D">4. Spark SQL Practices</h3>
    </a>
</div>

**<a id="selecting_columns">Selecting columns</a>**

In [17]:
spark.sql("SELECT customer_id, name, surname, birth_date FROM DimCustomer LIMIT 5").show()

+-----------+-------+--------+----------+
|customer_id|   name| surname|birth_date|
+-----------+-------+--------+----------+
|          1| Jazmin|  Burril|1958-09-22|
|          2| Dalila|   Faers|2000-11-08|
|          3|Wayland|Walework|1976-03-08|
|          4|Amberly|  Haquin|1948-10-08|
|          5|Garrett|   Frear|1957-09-25|
+-----------+-------+--------+----------+



In [18]:
df_cus.select("customer_id", "name", "surname", "birth_date").show(5)

+-----------+-------+--------+----------+
|customer_id|   name| surname|birth_date|
+-----------+-------+--------+----------+
|          1| Jazmin|  Burril|1958-09-22|
|          2| Dalila|   Faers|2000-11-08|
|          3|Wayland|Walework|1976-03-08|
|          4|Amberly|  Haquin|1948-10-08|
|          5|Garrett|   Frear|1957-09-25|
+-----------+-------+--------+----------+
only showing top 5 rows



**<a id="data_manipulation">Data manipulation: </a>** Calculating the ages from date of birth data.

In [19]:
spark.sql("""
SELECT
    customer_id
    ,name
    ,surname
    ,YEAR(CURRENT_DATE()) - YEAR(birth_date) AS age
FROM DimCustomer
LIMIT 5
""").show()

+-----------+-------+--------+---+
|customer_id|   name| surname|age|
+-----------+-------+--------+---+
|          1| Jazmin|  Burril| 66|
|          2| Dalila|   Faers| 24|
|          3|Wayland|Walework| 48|
|          4|Amberly|  Haquin| 76|
|          5|Garrett|   Frear| 67|
+-----------+-------+--------+---+



In [20]:
(
    df_cus.withColumn("age", F.year(F.current_date()) - F.year("birth_date"))
    .select("customer_id", "name", "surname", "age")
    .show(5)
)

+-----------+-------+--------+---+
|customer_id|   name| surname|age|
+-----------+-------+--------+---+
|          1| Jazmin|  Burril| 66|
|          2| Dalila|   Faers| 24|
|          3|Wayland|Walework| 48|
|          4|Amberly|  Haquin| 76|
|          5|Garrett|   Frear| 67|
+-----------+-------+--------+---+
only showing top 5 rows



**<a id="filtering_rows">Filtering rows</a>**

In [21]:
spark.sql("""
SELECT
    name
    ,surname
    ,age
FROM
(
    SELECT
        customer_id
        ,name
        ,surname
        ,YEAR(CURRENT_DATE()) - YEAR(birth_date) AS age
    FROM DimCustomer
)
WHERE age >= 30
LIMIT 5
""").show()

+-------+--------+---+
|   name| surname|age|
+-------+--------+---+
| Jazmin|  Burril| 66|
|Wayland|Walework| 48|
|Amberly|  Haquin| 76|
|Garrett|   Frear| 67|
|  Horst|   Isted| 49|
+-------+--------+---+



In [22]:
(
    df_cus.withColumn("age", F.year(F.current_date()) - F.year("birth_date"))
    .select("name", "surname", "age")
    .filter(F.col("age") >= 30)
    .show(5)
)

+-------+--------+---+
|   name| surname|age|
+-------+--------+---+
| Jazmin|  Burril| 66|
|Wayland|Walework| 48|
|Amberly|  Haquin| 76|
|Garrett|   Frear| 67|
|  Horst|   Isted| 49|
+-------+--------+---+
only showing top 5 rows



**<a id="aggregating_data">Aggregating data</a>**

In [23]:
spark.sql("""
SELECT
    order_id
    ,SUM(quantity) AS total_quantity
    ,SUM(total_amt) AS total_amount
FROM FactSale
GROUP BY order_id
ORDER BY total_quantity DESC
LIMIT 10
""").show()

+--------+--------------+------------+
|order_id|total_quantity|total_amount|
+--------+--------------+------------+
|    3647|            13|         521|
|    2574|            13|         488|
|    3515|            13|         402|
|     101|            12|         359|
|     440|            12|         426|
|    3763|            12|         323|
|    1585|            12|         488|
|    3289|            12|         327|
|    1382|            11|         452|
|    1752|            11|         298|
+--------+--------------+------------+



In [24]:
(
    df_sal.groupBy("order_id").agg(
        F.sum("quantity").alias("total_quantity"),
        F.sum("total_amt").alias("total_amount")
    ).orderBy("total_quantity", ascending=False)
    .show(10)
)

+--------+--------------+------------+
|order_id|total_quantity|total_amount|
+--------+--------------+------------+
|    3647|            13|         521|
|    2574|            13|         488|
|    3515|            13|         402|
|     101|            12|         359|
|     440|            12|         426|
|    3763|            12|         323|
|    1585|            12|         488|
|    3289|            12|         327|
|    2337|            11|         357|
|    3743|            11|         359|
+--------+--------------+------------+
only showing top 10 rows



**<a id="joining">Joining</a>**

In [25]:
spark.sql("""
SELECT
    region_name
    ,AVG(YEAR(CURRENT_DATE()) - YEAR(birth_date)) AS age
FROM DimCustomer cus
INNER JOIN DimRegion reg
ON cus.city_id = reg.city_id
GROUP BY region_name
ORDER BY age DESC
""").show()

+-----------------+------------------+
|      region_name|               age|
+-----------------+------------------+
|          Akdeniz| 50.81521739130435|
|     Dogu Anadolu| 50.13095238095238|
|Guneydogu Anadolu| 48.58119658119658|
|          Marmara|48.189542483660134|
|       Ic Anadolu| 48.07772020725388|
|        Karadeniz| 47.75121951219512|
|              Ege|46.888888888888886|
+-----------------+------------------+



In [26]:
(
    df_cus
    .join(df_reg, df_cus.city_id == df_reg.city_id)
    .groupBy("region_name").agg(
        F.avg(F.year(F.current_date()) - F.year("birth_date")).alias("age")
    )
    .orderBy("age", ascending=False)
    .show()
)

+-----------------+------------------+
|      region_name|               age|
+-----------------+------------------+
|          Akdeniz| 50.81521739130435|
|     Dogu Anadolu| 50.13095238095238|
|Guneydogu Anadolu| 48.58119658119658|
|          Marmara|48.189542483660134|
|       Ic Anadolu| 48.07772020725388|
|        Karadeniz| 47.75121951219512|
|              Ege|46.888888888888886|
+-----------------+------------------+



<div style="background-color: white; padding: 10px; border-bottom: 4px solid #C2172D;">
    <a id="case_studies">
        <h3 style="color: #C2172D">5. Case Studies</h3>
    </a>  
</div>

<div style="background-color: white; padding: 10px;">
    <a id="assignment_1">
        <h4 style="color: #0D9276">Assignment 1: Jacket sales per region</h3>
    </a>
</div>
<br>
<h4>
    Write SparkSQL and Python API scripts that results: Region-based total quantity and amount of jacket sales between June and August 2023.
</h4>
<p>The expected out is as follows: </p>

| region_name       | product_type | total_quantity | total_amount |   |
|-------------------|--------------|----------------|--------------|---|
| Marmara           | Jacket       | 213            | 8358         |   |
| Dogu Anadolu      | Jacket       | 284            | 11547        |   |
| Guneydogu Anadolu | Jacket       | 176            | 6981         |   |
| Ic Anadolu        | Jacket       | 260            | 10496        |   |
| Akdeniz           | Jacket       | 162            | 6637         |   |
| Karadeniz         | Jacket       | 310            | 12582        |   |
| Ege               | Jacket       | 101            | 3953      


### External links
- https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.join.html
- https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.filter.html
- https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.groupBy.html   |   |

In [27]:
# Your Spark SQL Solution:
spark.sql("""SELECT region_name, product_type, SUM(quantity) as total_quantity, SUM(total_amt) as total_amount
FROM DimRegion reg
INNER JOIN DimCustomer cus ON reg.city_id = cus.city_id
INNER JOIN FactSale sal ON sal.customer_id = cus.customer_id
INNER JOIN DimProduct pro ON sal.product_id = pro.product_id
INNER JOIN DimDate dat ON sal.date = dat.date
WHERE pro.product_type = 'Jacket'
AND dat.month >= 6 AND dat.month <= 8
GROUP BY region_name, product_type
""").show()

+-----------------+------------+--------------+------------+
|      region_name|product_type|total_quantity|total_amount|
+-----------------+------------+--------------+------------+
|          Marmara|      Jacket|           213|        8358|
|     Dogu Anadolu|      Jacket|           284|       11547|
|Guneydogu Anadolu|      Jacket|           176|        6981|
|       Ic Anadolu|      Jacket|           260|       10496|
|          Akdeniz|      Jacket|           162|        6637|
|        Karadeniz|      Jacket|           310|       12582|
|              Ege|      Jacket|           101|        3953|
+-----------------+------------+--------------+------------+



In [28]:
# Your PySpark Solution:
( df_sal
             .join(df_cus, "customer_id")
             .join(df_reg, "city_id")
             .join(df_pro, "product_id")
             .join(df_date, "date")
             .where((F.col("product_type") == "Jacket") & (F.col("month") >= 6) & (F.col("month") <= 8))
             .groupBy("region_name", "product_type")
             .agg(F.sum("quantity").alias("total_quantity"),
                  F.sum("total_amt").alias("total_amount"))
             .show())


+-----------------+------------+--------------+------------+
|      region_name|product_type|total_quantity|total_amount|
+-----------------+------------+--------------+------------+
|          Marmara|      Jacket|           213|        8358|
|     Dogu Anadolu|      Jacket|           284|       11547|
|Guneydogu Anadolu|      Jacket|           176|        6981|
|       Ic Anadolu|      Jacket|           260|       10496|
|          Akdeniz|      Jacket|           162|        6637|
|        Karadeniz|      Jacket|           310|       12582|
|              Ege|      Jacket|           101|        3953|
+-----------------+------------+--------------+------------+



<div style="background-color: white; padding: 10px;">
    <a id="assignment_2">
        <h4 style="color: #0D9276">Assignment 2: Maximum turnover of the retailer regions</h3>
    </a>
</div>
<br>
<h4>
    Find the maximum turnover region of each retailer, and obtain total amount for each retailer and region.
</h4>
<p>The expected out is as follows: </p>

| retailer_id | retailer_name | region_name | total_amount |
|-------------|---------------|-------------|--------------|
| 1           | Hepsiburada   | Karadeniz   | 42642        |
| 2           | Trendyol      | Ic Anadolu  | 71689        |
| 3           | n11           | Ic Anadolu  | 11995        |
| 4           | Gittigidiyor  | Karadeniz   | 16081        |

<br>

In [29]:
# Your Spark SQL Solution:
spark.sql("""SELECT
    retailer_name,
    region_name,
    total_amount
FROM (
    SELECT
        ret.retailer_id,
        ret.retailer_name,
        reg.region_name,
        SUM(sal.total_amt) AS total_amount,
        MAX(SUM(sal.total_amt)) OVER (PARTITION BY ret.retailer_id) AS max_total_amount
    FROM
        DimRegion reg
    INNER JOIN DimCustomer cus ON reg.city_id = cus.city_id
    INNER JOIN FactSale sal ON sal.customer_id = cus.customer_id
    INNER JOIN DimRetailer ret ON ret.retailer_id = sal.retailer_id
    GROUP BY
        ret.retailer_id,
        ret.retailer_name,
        reg.region_name
) subquery
WHERE
    total_amount = max_total_amount





 """).show()


+-------------+-----------+------------+
|retailer_name|region_name|total_amount|
+-------------+-----------+------------+
|  Hepsiburada|  Karadeniz|       42642|
|     Trendyol| Ic Anadolu|       71689|
|          n11| Ic Anadolu|       11995|
| Gittigidiyor|  Karadeniz|       16081|
+-------------+-----------+------------+



In [31]:
from pyspark.sql import Window
# Your PySpark Solution:
result_df = (df_sal.join(df_cus, "customer_id")
                    .join(df_reg, "city_id")
                    .join(df_ret, "retailer_id")
                    .groupBy(df_ret.retailer_id, df_ret.retailer_name, df_reg.region_name)
                    .agg(F.sum("total_amt").alias("total_amount"))
                    .withColumn("max_total_amount", F.max("total_amount").over(Window.partitionBy("retailer_id")))
                    .filter(F.col("total_amount") == F.col("max_total_amount"))
                    .select(df_ret.retailer_name, df_reg.region_name, "total_amount")
            )

result_df.show()

+-------------+-----------+------------+
|retailer_name|region_name|total_amount|
+-------------+-----------+------------+
|  Hepsiburada|  Karadeniz|       42642|
|     Trendyol| Ic Anadolu|       71689|
|          n11| Ic Anadolu|       11995|
| Gittigidiyor|  Karadeniz|       16081|
+-------------+-----------+------------+

