In [82]:
import os
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col


In [83]:
spark = SparkSession.builder \
    .appName("Local PySpark ETL") \
    .master("local[*]") \
    .getOrCreate()

spark

### Extracting data from landing_zone

In [84]:
df_orders = spark.read.format("csv") \
    .option("header", "true") \
    .option("encoding", "UTF-8") \
    .load("data/landing_zone/orders/19980505/orders_initial.csv")

df_categories = spark.read.format("csv") \
    .option("header", "true") \
    .option("encoding", "UTF-8") \
    .load("data/landing_zone/categories/categories.csv")

df_customers = spark.read.format("csv") \
    .option("header", "true") \
    .option("encoding", "UTF-8") \
    .load("data/landing_zone/customers/customers.csv")

df_orders_details = spark.read.format("csv") \
    .option("header", "true") \
    .option("encoding", "UTF-8") \
    .load("data/landing_zone/orders_details/orders_details.csv")

df_products = spark.read.format("csv") \
    .option("header", "true") \
    .option("encoding", "UTF-8") \
    .load("data/landing_zone/products/products.csv")

df_suppliers = spark.read.format("csv") \
    .option("header", "true") \
    .option("encoding", "UTF-8") \
    .load("data/landing_zone/suppliers/suppliers.csv")

df_products.show(5)
df_products.printSchema()

+---------+--------------------+----------+----------+-------------------+---------+------------+------------+------------+------------+
|productid|         productname|supplierid|categoryid|    quantityperunit|unitprice|unitsinstock|unitsonorder|reorderlevel|discontinued|
+---------+--------------------+----------+----------+-------------------+---------+------------+------------+------------+------------+
|        1|                Chai|         8|         1| 10 boxes x 30 bags|       18|          39|           0|          10|           1|
|        2|               Chang|         1|         1| 24 - 12 oz bottles|       19|          17|          40|          25|           1|
|        3|       Aniseed Syrup|         1|         2|12 - 550 ml bottles|       10|          13|          70|          25|           0|
|        4|Chef Anton's Caju...|         2|         2|     48 - 6 oz jars|       22|          53|           0|           0|           0|
|        5|Chef Anton's Gumb...|         

### RAW ZONE

In [85]:
# Rename columns for join
df_orders_raw = df_orders.withColumnRenamed("orderid", "order_id")
df_orders_details_raw = df_orders_details.withColumnRenamed("orderid", "order_id")
df_products_raw = df_products.withColumnRenamed("productid", "product_id")
df_categories_raw = df_categories.withColumnRenamed("categoryid", "category_id")
df_customers_raw = df_customers.withColumnRenamed("customerid", "customer_id")

In [87]:
from pyspark.sql.functions import col, to_date
from pyspark.sql.types import IntegerType, DoubleType

df_orders_raw = df_orders_raw \
    .withColumn("order_id", col("order_id").cast(IntegerType())) \
    .withColumn("employeeid", col("employeeid").cast(IntegerType())) \
    .withColumn("orderdate", to_date("orderdate", "yyyy-MM-dd")) \
    .withColumn("requireddate", to_date("requireddate", "yyyy-MM-dd")) \
    .withColumn("shippeddate", to_date("shippeddate", "yyyy-MM-dd"))

df_orders_details_raw = df_orders_details_raw \
    .withColumn("productid", col("productid").cast(IntegerType())) \
    .withColumn("unitprice", col("unitprice").cast(DoubleType())) \
    .withColumn("quantity", col("quantity").cast(IntegerType())) \
    .withColumn("discount", col("discount").cast(DoubleType()))


In [88]:

raw_orders_path = r"D:\Study-By_Myself-Knowledge\PySpark_pro\AWS_PySpark_Workshop\local\data-pipeline-with-PySpark\data\raw_zone\orders_raw"
raw_order_details_path = r"D:\Study-By_Myself-Knowledge\PySpark_pro\AWS_PySpark_Workshop\local\data-pipeline-with-PySpark\data\raw_zone\orders_details_raw"

os.makedirs(raw_orders_path, exist_ok=True)
os.makedirs(raw_order_details_path, exist_ok=True)

# Spark -> Pandas
orders_pdf = df_orders_raw.toPandas()
order_details_pdf = df_orders_details_raw.toPandas()

orders_pdf.to_csv(
    os.path.join(raw_orders_path, "orders_raw.csv"),
    index=False,
    encoding="utf-8-sig"
)
order_details_pdf.to_csv(
    os.path.join(raw_order_details_path, "orders_details_raw.csv"),
    index=False,
    encoding="utf-8-sig"
)


### SERVING ZONE

In [89]:
# join orders + order_details
df_fact_orders_items = df_orders_raw.join(
    df_orders_details_raw,
    on="order_id",
    how="inner"
)
df_fact_orders_items.show()

+--------+----------+----------+----------+------------+-----------+-------+-------+--------------------+--------------------+--------------+----------+--------------+-----------+---------+---------+--------+--------+
|order_id|customerid|employeeid| orderdate|requireddate|shippeddate|shipvia|freight|            shipname|         shipaddress|      shipcity|shipregion|shippostalcode|shipcountry|productid|unitprice|quantity|discount|
+--------+----------+----------+----------+------------+-----------+-------+-------+--------------------+--------------------+--------------+----------+--------------+-----------+---------+---------+--------+--------+
|   10248|     VINET|         5|1996-07-04|  1996-08-01| 1996-07-16|      3|  32.38|Vins et alcools C...|  59 rue de l'Abbaye|         Reims|      NULL|         51100|     France|       72|     34.8|       5|     0.0|
|   10248|     VINET|         5|1996-07-04|  1996-08-01| 1996-07-16|      3|  32.38|Vins et alcools C...|  59 rue de l'Abbaye|  

In [90]:
from pyspark.sql.functions import year, month, dayofmonth

df_fact_orders_items = df_fact_orders_items \
    .withColumn("year", year("orderdate")) \
    .withColumn("month", month("orderdate")) \
    .withColumn("day", dayofmonth("orderdate")) \
    .withColumn("year_be", year("orderdate") + 543)
df_fact_orders_items.show()


+--------+----------+----------+----------+------------+-----------+-------+-------+--------------------+--------------------+--------------+----------+--------------+-----------+---------+---------+--------+--------+----+-----+---+-------+
|order_id|customerid|employeeid| orderdate|requireddate|shippeddate|shipvia|freight|            shipname|         shipaddress|      shipcity|shipregion|shippostalcode|shipcountry|productid|unitprice|quantity|discount|year|month|day|year_be|
+--------+----------+----------+----------+------------+-----------+-------+-------+--------------------+--------------------+--------------+----------+--------------+-----------+---------+---------+--------+--------+----+-----+---+-------+
|   10248|     VINET|         5|1996-07-04|  1996-08-01| 1996-07-16|      3|  32.38|Vins et alcools C...|  59 rue de l'Abbaye|         Reims|      NULL|         51100|     France|       72|     34.8|       5|     0.0|1996|    7|  4|   2539|
|   10248|     VINET|         5|1996

In [91]:
df_fact_orders_items.printSchema()


root
 |-- order_id: integer (nullable = true)
 |-- customerid: string (nullable = true)
 |-- employeeid: integer (nullable = true)
 |-- orderdate: date (nullable = true)
 |-- requireddate: date (nullable = true)
 |-- shippeddate: date (nullable = true)
 |-- shipvia: string (nullable = true)
 |-- freight: string (nullable = true)
 |-- shipname: string (nullable = true)
 |-- shipaddress: string (nullable = true)
 |-- shipcity: string (nullable = true)
 |-- shipregion: string (nullable = true)
 |-- shippostalcode: string (nullable = true)
 |-- shipcountry: string (nullable = true)
 |-- productid: integer (nullable = true)
 |-- unitprice: double (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- discount: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- year_be: integer (nullable = true)



In [92]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, DoubleType, DateType

df_serving = df_fact_orders_items \
    .withColumn("order_id", col("order_id").cast(IntegerType())) \
    .withColumn("employeeid", col("employeeid").cast(IntegerType())) \
    .withColumn("shipvia", col("shipvia").cast(IntegerType())) \
    .withColumn("freight", col("freight").cast(DoubleType())) \
    .withColumn("productid", col("productid").cast(IntegerType())) \
    .withColumn("unitprice", col("unitprice").cast(DoubleType())) \
    .withColumn("quantity", col("quantity").cast(IntegerType())) \
    .withColumn("discount", col("discount").cast(DoubleType()))



In [93]:
#Add Sales Metrics
df_serving = df_serving \
    .withColumn("gross_sales", col("unitprice") * col("quantity")) \
    .withColumn(
        "net_sales",
        col("unitprice") * col("quantity") * (1 - col("discount"))
    ) \
    .withColumn(
        "discount_amount",
        col("unitprice") * col("quantity") * col("discount")
    )
df_serving.show()


+--------+----------+----------+----------+------------+-----------+-------+-------+--------------------+--------------------+--------------+----------+--------------+-----------+---------+---------+--------+--------+----+-----+---+-------+------------------+------------------+------------------+
|order_id|customerid|employeeid| orderdate|requireddate|shippeddate|shipvia|freight|            shipname|         shipaddress|      shipcity|shipregion|shippostalcode|shipcountry|productid|unitprice|quantity|discount|year|month|day|year_be|       gross_sales|         net_sales|   discount_amount|
+--------+----------+----------+----------+------------+-----------+-------+-------+--------------------+--------------------+--------------+----------+--------------+-----------+---------+---------+--------+--------+----+-----+---+-------+------------------+------------------+------------------+
|   10248|     VINET|         5|1996-07-04|  1996-08-01| 1996-07-16|      3|  32.38|Vins et alcools C...| 

In [94]:
from pyspark.sql.functions import when
df_serving = df_serving.withColumn(
    "has_discount",
    when(col("discount") > 0, "Y").otherwise("N")
)
df_serving.show()


+--------+----------+----------+----------+------------+-----------+-------+-------+--------------------+--------------------+--------------+----------+--------------+-----------+---------+---------+--------+--------+----+-----+---+-------+------------------+------------------+------------------+------------+
|order_id|customerid|employeeid| orderdate|requireddate|shippeddate|shipvia|freight|            shipname|         shipaddress|      shipcity|shipregion|shippostalcode|shipcountry|productid|unitprice|quantity|discount|year|month|day|year_be|       gross_sales|         net_sales|   discount_amount|has_discount|
+--------+----------+----------+----------+------------+-----------+-------+-------+--------------------+--------------------+--------------+----------+--------------+-----------+---------+---------+--------+--------+----+-----+---+-------+------------------+------------------+------------------+------------+
|   10248|     VINET|         5|1996-07-04|  1996-08-01| 1996-07-16

In [95]:
#Shipping Intelligence
df_serving = df_serving.withColumn(
    "shipping_type",
    when(col("freight") > 50, "HIGH_COST")
    .when(col("freight") > 20, "MEDIUM_COST")
    .otherwise("LOW_COST")
)
df_serving.show()


+--------+----------+----------+----------+------------+-----------+-------+-------+--------------------+--------------------+--------------+----------+--------------+-----------+---------+---------+--------+--------+----+-----+---+-------+------------------+------------------+------------------+------------+-------------+
|order_id|customerid|employeeid| orderdate|requireddate|shippeddate|shipvia|freight|            shipname|         shipaddress|      shipcity|shipregion|shippostalcode|shipcountry|productid|unitprice|quantity|discount|year|month|day|year_be|       gross_sales|         net_sales|   discount_amount|has_discount|shipping_type|
+--------+----------+----------+----------+------------+-----------+-------+-------+--------------------+--------------------+--------------+----------+--------------+-----------+---------+---------+--------+--------+----+-----+---+-------+------------------+------------------+------------------+------------+-------------+
|   10248|     VINET|    

In [96]:
df_serving= df_serving.withColumn(
    "order_size",
    when(col("quantity") < 10, "SMALL")
    .when(col("quantity") < 30, "MEDIUM")
    .otherwise("LARGE")
)
df_serving.show()

+--------+----------+----------+----------+------------+-----------+-------+-------+--------------------+--------------------+--------------+----------+--------------+-----------+---------+---------+--------+--------+----+-----+---+-------+------------------+------------------+------------------+------------+-------------+----------+
|order_id|customerid|employeeid| orderdate|requireddate|shippeddate|shipvia|freight|            shipname|         shipaddress|      shipcity|shipregion|shippostalcode|shipcountry|productid|unitprice|quantity|discount|year|month|day|year_be|       gross_sales|         net_sales|   discount_amount|has_discount|shipping_type|order_size|
+--------+----------+----------+----------+------------+-----------+-------+-------+--------------------+--------------------+--------------+----------+--------------+-----------+---------+---------+--------+--------+----+-----+---+-------+------------------+------------------+------------------+------------+-------------+----

In [97]:
from pyspark.sql.functions import col, date_format, to_date

df_serving = df_serving.withColumn(
    "order_date",
    to_date(col("orderdate"))
).withColumn(
    "month_name",
    date_format(col("order_date"), "MMM")
)
df_serving.show()

+--------+----------+----------+----------+------------+-----------+-------+-------+--------------------+--------------------+--------------+----------+--------------+-----------+---------+---------+--------+--------+----+-----+---+-------+------------------+------------------+------------------+------------+-------------+----------+----------+----------+
|order_id|customerid|employeeid| orderdate|requireddate|shippeddate|shipvia|freight|            shipname|         shipaddress|      shipcity|shipregion|shippostalcode|shipcountry|productid|unitprice|quantity|discount|year|month|day|year_be|       gross_sales|         net_sales|   discount_amount|has_discount|shipping_type|order_size|order_date|month_name|
+--------+----------+----------+----------+------------+-----------+-------+-------+--------------------+--------------------+--------------+----------+--------------+-----------+---------+---------+--------+--------+----+-----+---+-------+------------------+------------------+------

### Export file

In [98]:

df_serving = df_serving.withColumn(
    "order_size",
    when(col("quantity") < 10, "SMALL")
    .when(col("quantity") < 30, "MEDIUM")
    .otherwise("LARGE")
)

serving_folder = r"D:\Study-By_Myself-Knowledge\PySpark_pro\AWS_PySpark_Workshop\local\data-pipeline-with-PySpark\data\serving_zone"
os.makedirs(serving_folder, exist_ok=True)

pdf = df_serving.toPandas()

pdf.to_csv(
    os.path.join(serving_folder, "fact_sale_orders_items.csv"),
    index=False,
    encoding="utf-8-sig"
)
