<img src="assets/img/huawei_logo.png"  style="width:160px;"/>
<div style="background-color: white; padding: 10px; border-bottom: 6px solid #C2172D;">
    <h2 style="color: black" id="introduction">Batch Data Processing with Apache Spark</h2>
    <p>Tolgahan Cepel - Mert Akat</p>
    <p></p>
</div>


## [Contents](#contents)
1. [Introduction](#introduction)
2. [Importing the libraries](#library)
3. [Reading the data](#read_data)
4. [SparkSQL API Practices](#spark_sql_practices)
   * [Selecting columns](#selecting_columns)
   * [Data manipulation](#data_manipulation)
   * [Filtering rows](#filtering_rows)
   * [Aggregating data](#aggregating_data)
   * [Joining](#joining)
5. [Case Studies](#assignments)
   * [Assignment 1: Jacket sales per region](#assignment_1)
   * [Assignment 2: Maximum turnover of the retailer regions](#assignment_2)

<div style="background-color: white; padding: 10px; border-bottom: 4px solid #C2172D;">
    <a id="introduction">
        <h3 style="color: #C2172D">1. Introduction</h3>
    </a>  
</div>

<img src="assets/img/data_model.svg"  style="width:1000px; padding: 20px"/>

#### SQL Tables Description
- **FactSale:** Sales transactions fact table
- **FactPurchase:** Purchases fact table
- **DimRetailer:** Retailer details dimension table
- **DimCustomer:** Customer details dimension table
- **DimProduct:** Product details dimension table
- **DimRegion:** Region details dimension table
- **DimDate:** Date dimension table
- **DimSupplier:** Supplier details dimension table

In [None]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#Check this site for the latest download link https://www.apache.org/dyn/closer.lua/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j

import os
import sys
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"


import findspark
findspark.init()
findspark.find()

import pyspark

from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark= SparkSession \
       .builder \
       .appName("Our First Spark Example") \
       .getOrCreate()

spark

Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Get:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Get:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [830 kB]
Get:4 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
Hit:5 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:6 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Hit:7 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Get:9 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
Get:10 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [109 kB]
Get:11 http://archive.ubuntu.com/ubuntu jammy-updates/main amd64 Packages [2,069 kB]
Get:12 http://security.ubuntu.com/ubuntu jammy-security/universe amd64 Packages [1,082 kB]
Get:13 http://arch

<div style="background-color: white; padding: 10px; border-bottom: 4px solid #C2172D;">
    <a id="library">
        <h3 style="color: #C2172D">2. Importing the libraries</h3>
    </a>
</div>

In [None]:
from pyspark.sql import SparkSession, functions as F

<div style="background-color: white; padding: 10px; border-bottom: 4px solid #C2172D;">
    <a id="read_data">
        <h3 style="color: #C2172D">3. Reading the data</h3>
    </a>
</div>

In [None]:
# Creating new SparkSession instance
spark = SparkSession.builder.appName("PySparkExample").getOrCreate()

# Reading parquet data and assigning to DataFrame variables
df_pur = spark.read.parquet("/content/drive/MyDrive/DEM DATA/data/purchase")
df_sal = spark.read.parquet("/content/drive/MyDrive/DEM DATA/data/sale")
df_cus = spark.read.parquet("/content/drive/MyDrive/DEM DATA/data/customer")
df_ret = spark.read.parquet("/content/drive/MyDrive/DEM DATA/data/retailer")
df_pro = spark.read.parquet("/content/drive/MyDrive/DEM DATA/data/product")
df_sup = spark.read.parquet("/content/drive/MyDrive/DEM DATA/data/supplier")
df_reg = spark.read.parquet("/content/drive/MyDrive/DEM DATA/data/region")
df_date = spark.read.parquet("/content/drive/MyDrive/DEM DATA/data/date")

# Creating temporary view tables for Spark SQL queries
df_cus.createOrReplaceTempView("DimCustomer")
df_pur.createOrReplaceTempView("FactPurchase")
df_sal.createOrReplaceTempView("FactSale")
df_ret.createOrReplaceTempView("DimRetailer")
df_pro.createOrReplaceTempView("DimProduct")
df_sup.createOrReplaceTempView("DimSupplier")
df_reg.createOrReplaceTempView("DimRegion")
df_date.createOrReplaceTempView("DimDate")

<div style="background-color: white; padding: 10px; border-bottom: 4px solid #C2172D;">
    <a id="spark_sql_practices">
        <h3 style="color: #C2172D">4. Spark SQL Practices</h3>
    </a>
</div>

**<a id="selecting_columns">Selecting columns</a>**

In [None]:
spark.sql("SELECT customer_id, name, surname, birth_date FROM DimCustomer LIMIT 5").show()

+-----------+-------+--------+----------+
|customer_id|   name| surname|birth_date|
+-----------+-------+--------+----------+
|          1| Jazmin|  Burril|1958-09-22|
|          2| Dalila|   Faers|2000-11-08|
|          3|Wayland|Walework|1976-03-08|
|          4|Amberly|  Haquin|1948-10-08|
|          5|Garrett|   Frear|1957-09-25|
+-----------+-------+--------+----------+



In [None]:
df_cus.select("customer_id", "name", "surname", "birth_date").show(5)

+-----------+-------+--------+----------+
|customer_id|   name| surname|birth_date|
+-----------+-------+--------+----------+
|          1| Jazmin|  Burril|1958-09-22|
|          2| Dalila|   Faers|2000-11-08|
|          3|Wayland|Walework|1976-03-08|
|          4|Amberly|  Haquin|1948-10-08|
|          5|Garrett|   Frear|1957-09-25|
+-----------+-------+--------+----------+
only showing top 5 rows



**<a id="data_manipulation">Data manipulation: </a>** Calculating the ages from date of birth data.

In [None]:
spark.sql("""
SELECT
    customer_id
    ,name
    ,surname
    ,YEAR(CURRENT_DATE()) - YEAR(birth_date) AS age
FROM DimCustomer
LIMIT 5
""").show()

+-----------+-------+--------+---+
|customer_id|   name| surname|age|
+-----------+-------+--------+---+
|          1| Jazmin|  Burril| 66|
|          2| Dalila|   Faers| 24|
|          3|Wayland|Walework| 48|
|          4|Amberly|  Haquin| 76|
|          5|Garrett|   Frear| 67|
+-----------+-------+--------+---+



In [None]:
(
    df_cus.withColumn("age", F.year(F.current_date()) - F.year("birth_date"))
    .select("customer_id", "name", "surname", "age")
    .show(5)
)

+-----------+-------+--------+---+
|customer_id|   name| surname|age|
+-----------+-------+--------+---+
|          1| Jazmin|  Burril| 66|
|          2| Dalila|   Faers| 24|
|          3|Wayland|Walework| 48|
|          4|Amberly|  Haquin| 76|
|          5|Garrett|   Frear| 67|
+-----------+-------+--------+---+
only showing top 5 rows



**<a id="filtering_rows">Filtering rows</a>**

In [None]:
spark.sql("""
SELECT
    name
    ,surname
    ,age
FROM
(
    SELECT
        customer_id
        ,name
        ,surname
        ,YEAR(CURRENT_DATE()) - YEAR(birth_date) AS age
    FROM DimCustomer
)
WHERE age >= 30
LIMIT 5
""").show()

+-------+--------+---+
|   name| surname|age|
+-------+--------+---+
| Jazmin|  Burril| 66|
|Wayland|Walework| 48|
|Amberly|  Haquin| 76|
|Garrett|   Frear| 67|
|  Horst|   Isted| 49|
+-------+--------+---+



In [None]:
(
    df_cus.withColumn("age", F.year(F.current_date()) - F.year("birth_date"))
    .select("name", "surname", "age")
    .filter(F.col("age") >= 30)
    .show(5)
)

+-------+--------+---+
|   name| surname|age|
+-------+--------+---+
| Jazmin|  Burril| 66|
|Wayland|Walework| 48|
|Amberly|  Haquin| 76|
|Garrett|   Frear| 67|
|  Horst|   Isted| 49|
+-------+--------+---+
only showing top 5 rows



**<a id="aggregating_data">Aggregating data</a>**

In [None]:
spark.sql("""
SELECT
    order_id
    ,SUM(quantity) AS total_quantity
    ,SUM(total_amt) AS total_amount
FROM FactSale
GROUP BY order_id
ORDER BY total_quantity DESC
LIMIT 10
""").show()

+--------+--------------+------------+
|order_id|total_quantity|total_amount|
+--------+--------------+------------+
|    3647|            13|         521|
|    2574|            13|         488|
|    3515|            13|         402|
|     101|            12|         359|
|     440|            12|         426|
|    3763|            12|         323|
|    1585|            12|         488|
|    3289|            12|         327|
|    1382|            11|         452|
|    1752|            11|         298|
+--------+--------------+------------+



In [None]:
(
    df_sal.groupBy("order_id").agg(
        F.sum("quantity").alias("total_quantity"),
        F.sum("total_amt").alias("total_amount")
    ).orderBy("total_quantity", ascending=False)
    .show(10)
)

+--------+--------------+------------+
|order_id|total_quantity|total_amount|
+--------+--------------+------------+
|    3647|            13|         521|
|    2574|            13|         488|
|    3515|            13|         402|
|     101|            12|         359|
|     440|            12|         426|
|    3763|            12|         323|
|    1585|            12|         488|
|    3289|            12|         327|
|    2337|            11|         357|
|    3743|            11|         359|
+--------+--------------+------------+
only showing top 10 rows



**<a id="joining">Joining</a>**

In [None]:
spark.sql("""
SELECT
    region_name
    ,AVG(YEAR(CURRENT_DATE()) - YEAR(birth_date)) AS age
FROM DimCustomer cus
INNER JOIN DimRegion reg
ON cus.city_id = reg.city_id
GROUP BY region_name
ORDER BY age DESC
""").show()

+-----------------+------------------+
|      region_name|               age|
+-----------------+------------------+
|          Akdeniz| 50.81521739130435|
|     Dogu Anadolu| 50.13095238095238|
|Guneydogu Anadolu| 48.58119658119658|
|          Marmara|48.189542483660134|
|       Ic Anadolu| 48.07772020725388|
|        Karadeniz| 47.75121951219512|
|              Ege|46.888888888888886|
+-----------------+------------------+



In [None]:
(
    df_cus
    .join(df_reg, df_cus.city_id == df_reg.city_id)
    .groupBy("region_name").agg(
        F.avg(F.year(F.current_date()) - F.year("birth_date")).alias("age")
    )
    .orderBy("age", ascending=False)
    .show()
)

+-----------------+------------------+
|      region_name|               age|
+-----------------+------------------+
|          Akdeniz| 50.81521739130435|
|     Dogu Anadolu| 50.13095238095238|
|Guneydogu Anadolu| 48.58119658119658|
|          Marmara|48.189542483660134|
|       Ic Anadolu| 48.07772020725388|
|        Karadeniz| 47.75121951219512|
|              Ege|46.888888888888886|
+-----------------+------------------+



<div style="background-color: white; padding: 10px; border-bottom: 4px solid #C2172D;">
    <a id="case_studies">
        <h3 style="color: #C2172D">5. Case Studies</h3>
    </a>  
</div>

<div style="background-color: white; padding: 10px;">
    <a id="assignment_1">
        <h4 style="color: #0D9276">Assignment 1: Jacket sales per region</h3>
    </a>
</div>
<br>
<h4>
    Write SparkSQL and Python API scripts that results: Region-based total quantity and amount of jacket sales between June and August 2023.
</h4>
<p>The expected out is as follows: </p>

| region_name       | product_type | total_quantity | total_amount |   |
|-------------------|--------------|----------------|--------------|---|
| Marmara           | Jacket       | 213            | 8358         |   |
| Dogu Anadolu      | Jacket       | 284            | 11547        |   |
| Guneydogu Anadolu | Jacket       | 176            | 6981         |   |
| Ic Anadolu        | Jacket       | 260            | 10496        |   |
| Akdeniz           | Jacket       | 162            | 6637         |   |
| Karadeniz         | Jacket       | 310            | 12582        |   |
| Ege               | Jacket       | 101            | 3953      


### External links
- https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.join.html
- https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.filter.html
- https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.groupBy.html   |   |

In [118]:
query = """
SELECT
    reg.region_name,
    p.product_type,
    SUM(s.quantity) AS total_quantity,
    SUM(s.total_amt) AS total_amount
FROM
    FactSale s
INNER JOIN DimProduct p ON s.product_id = p.product_id
INNER JOIN DimDate d ON s.date = d.date
JOIN DimCustomer cus ON s.customer_id = cus.customer_id
JOIN DimRegion reg ON cus.city_id = reg.city_id
WHERE
    p.product_type = 'Jacket'
    AND d.date BETWEEN '2023-06-01' AND '2023-08-31'
GROUP BY
    reg.region_name, p.product_type
"""
result_df = spark.sql(query)
result_df.show()



+-----------------+------------+--------------+------------+
|      region_name|product_type|total_quantity|total_amount|
+-----------------+------------+--------------+------------+
|          Marmara|      Jacket|           213|        8358|
|     Dogu Anadolu|      Jacket|           284|       11547|
|Guneydogu Anadolu|      Jacket|           176|        6981|
|       Ic Anadolu|      Jacket|           260|       10496|
|          Akdeniz|      Jacket|           162|        6637|
|        Karadeniz|      Jacket|           310|       12582|
|              Ege|      Jacket|           101|        3953|
+-----------------+------------+--------------+------------+



In [119]:
from pyspark.sql import functions as F

# 'Jacket' tipindeki ürünler için filtreleme ve tarih aralığına göre filtreleme yapıyoruz
filtered_sales = df_sal \
    .join(df_pro, df_sal.product_id == df_pro.product_id) \
    .join(df_date, df_sal.date == df_date.date) \
    .join(df_cus, df_sal.customer_id == df_cus.customer_id) \
    .join(df_reg, df_cus.city_id == df_reg.city_id) \
    .filter((df_pro.product_type == "Jacket") & (df_date.date >= F.lit("2023-06-01")) & (df_date.date <= F.lit("2023-08-31")))

# Bölge ve ürün tipine göre gruplama yaparak toplam miktarı ve toplam tutarı hesaplıyoruz
result_df = filtered_sales.groupBy("region_name", "product_type") \
    .agg(
        F.sum("quantity").alias("total_quantity"),
        F.sum("total_amt").alias("total_amount")
    )

# Sonuçları gösteriyor
result_df.show()


+-----------------+------------+--------------+------------+
|      region_name|product_type|total_quantity|total_amount|
+-----------------+------------+--------------+------------+
|          Marmara|      Jacket|           213|        8358|
|     Dogu Anadolu|      Jacket|           284|       11547|
|Guneydogu Anadolu|      Jacket|           176|        6981|
|       Ic Anadolu|      Jacket|           260|       10496|
|          Akdeniz|      Jacket|           162|        6637|
|        Karadeniz|      Jacket|           310|       12582|
|              Ege|      Jacket|           101|        3953|
+-----------------+------------+--------------+------------+



<div style="background-color: white; padding: 10px;">
    <a id="assignment_2">
        <h4 style="color: #0D9276">Assignment 2: Maximum turnover of the retailer regions</h3>
    </a>
</div>
<br>
<h4>
    Find the maximum turnover region of each retailer, and obtain total amount for each retailer and region.
</h4>
<p>The expected out is as follows: </p>

| retailer_id | retailer_name | region_name | total_amount |
|-------------|---------------|-------------|--------------|
| 1           | Hepsiburada   | Karadeniz   | 42642        |
| 2           | Trendyol      | Ic Anadolu  | 71689        |
| 3           | n11           | Ic Anadolu  | 11995        |
| 4           | Gittigidiyor  | Karadeniz   | 16081        |

<br>

In [103]:
query = """
WITH RegionSales AS (
    SELECT
        ret.retailer_id,
        ret.retailer_name,
        reg.region_name,
        SUM(s.total_amt) AS total_sales_amount
    FROM
        FactSale s
    JOIN DimRetailer ret ON s.retailer_id = ret.retailer_id
    JOIN DimCustomer cus ON s.customer_id = cus.customer_id
    JOIN DimRegion reg ON cus.city_id = reg.city_id
    GROUP BY
        ret.retailer_id, ret.retailer_name, reg.region_name
),
MaxSales AS (
    SELECT
        retailer_id,
        MAX(total_sales_amount) AS max_sales
    FROM
        RegionSales
    GROUP BY
        retailer_id
)
SELECT
    m.retailer_id,
    rs.retailer_name,
    rs.region_name,
    m.max_sales AS total_amount
FROM
    MaxSales m
JOIN
    RegionSales rs ON m.retailer_id = rs.retailer_id AND m.max_sales = rs.total_sales_amount
ORDER BY
    m.retailer_id


"""
result_df = spark.sql(query)
result_df.show()


+-----------+-------------+-----------+------------+
|retailer_id|retailer_name|region_name|total_amount|
+-----------+-------------+-----------+------------+
|          1|  Hepsiburada|  Karadeniz|       42642|
|          2|     Trendyol| Ic Anadolu|       71689|
|          3|          n11| Ic Anadolu|       11995|
|          4| Gittigidiyor|  Karadeniz|       16081|
+-----------+-------------+-----------+------------+



In [124]:
from pyspark.sql import functions as F

# Perakendeci ve bölgeye göre toplam satış miktarını hesaplamak için birleştirme ve agregasyon işlemi yapılıyor
region_sales = df_sal.join(df_ret, "retailer_id") \
    .join(df_cus, "customer_id") \
    .join(df_reg, df_cus["city_id"] == df_reg["city_id"]) \
    .groupBy("retailer_id", "retailer_name", "region_name") \
    .agg(F.sum("total_amt").alias("total_sales_amount"))

# Her perakendeci için maksimum satış miktarını bulma
max_sales = region_sales.groupBy("retailer_id") \
    .agg(F.max("total_sales_amount").alias("max_sales"))

# Maksimum satış miktarına ve ilgili bölgelere göre perakendecileri birleştirme
result = max_sales.join(region_sales, (max_sales["retailer_id"] == region_sales["retailer_id"]) & \
                        (max_sales["max_sales"] == region_sales["total_sales_amount"])) \
    .select(max_sales["retailer_id"], "retailer_name", "region_name", "max_sales") \
    .orderBy("retailer_id")

# Sonucu göster
result.show()


+-----------+-------------+-----------+---------+
|retailer_id|retailer_name|region_name|max_sales|
+-----------+-------------+-----------+---------+
|          1|  Hepsiburada|  Karadeniz|    42642|
|          2|     Trendyol| Ic Anadolu|    71689|
|          3|          n11| Ic Anadolu|    11995|
|          4| Gittigidiyor|  Karadeniz|    16081|
+-----------+-------------+-----------+---------+

