# 1. Setup

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col

import glob
import time

In [None]:
spark = SparkSession.builder.appName("etl_sales").getOrCreate()

# 2. Funções, Variaves e Constantes

In [None]:
list_tables = ["customer","employee"]

In [None]:
path_csv = path_landing + "*.csv"
path_landing = "/home/jovyan/work/dataset/store/landing/"
path_normalization = "/home/jovyan/work/dataset/store/normalization/"
path_consolidation = "/home/jovyan/work/dataset/store/consolidation/"

In [None]:
def createTempView(path, table):
    df = spark.read.parquet(path)
    df.createOrReplaceTempView(table)
    
def q(query, n=30):
    return spark.sql(query)    

def readParquet(path):
    return spark.read.format("parquet").load(path)

def saveParquet(df, path):
    df.write.mode("overwrite").format("parquet").save(path)
    
def readCSV(path):
    df = (
        spark
        .read
        .format("csv")
        .option("header", True)
        .option("inferSchema", True)
        .option("delimiter", ";")
        .load(path)
    )
    return df

# 3. De Landing para Normalization

In [None]:
df = readParquet(path_normalization + "customer")
df.show(5)

In [None]:
for file in glob.glob(path_csv):  

    file_name = os.path.basename(file).replace(".csv","")

    df_csv = readCSV(file) 

    if list_tables.count(file_name):
        df_csv = df_csv.withColumn("gender", when(col("gender") == "F", "Masculino").otherwise("Feminino"))    
        saveParquet(df_csv, path_normalization + file_name)
        
    time.sleep(1)


# 4. De Normalization para Consolidation

- Tabela Fato **Sales**

In [None]:
createTempView(path_normalization + "sale", "sale")
createTempView(path_normalization + "sale_item", "sale_item")
createTempView(path_normalization + "product", "product")

In [None]:
df_fact_sales = spark.sql("""
        SELECT s.id sale_key
             , s.id_customer AS customer_key
             , s.id_branch   AS branch_key
             , s.id_employee AS employee_key
             , p.id          AS product_key
             , si.quantity 
             , p.cost_price 
             , p.sale_price 
             , (si.quantity * p.cost_price) AS amount_cost_price
             , (si.quantity * p.sale_price) as amount_sale_price
        FROM sale s INNER JOIN sale_item si ON s.id  = si.id_sale 
                    INNER JOIN product p    ON p.id  = si.id_product          
        ORDER BY s.id ;             
""")

saveParquet(df_fact_sales, path_consolidation + "fact-sales")

- Tabela Dimensão **Product**

In [None]:
createTempView(path_normalization + "product", "product")
createTempView(path_normalization + "product_group", "product_group")
createTempView(path_normalization + "supplier", "supplier")

In [None]:
df_dim_product = spark.sql("""
    SELECT p.id    as product_key
         , p.name  AS product_name
         , pg.name AS product_group_name
         , s.name  AS supplier_name
      FROM product p INNER JOIN product_group pg ON p.id_product_group = pg.id
                     INNER JOIN supplier s       ON p.id_supplier      = s.id
    ORDER BY p.id;   
""")

saveParquet(df_dim_product, path_consolidation + "dim-product")

- Tabela Dimensão **Customer**

In [None]:
createTempView(path_normalization + "customer", "customer")
createTempView(path_normalization + "district", "district")
createTempView(path_normalization + "city", "city")
createTempView(path_normalization + "state", "state")
createTempView(path_normalization + "zone", "zone")
createTempView(path_normalization + "marital_status", "marital_status")

In [None]:
df_dim_customer = spark.sql("""
    SELECT c.id    AS  customer_key
         , c.name  AS  customer_name
         , c.income 
         , c.gender
         , d.name  AS district_name
         , m.name  as marital_status
         , z.name  AS zone_name
         , cit.name AS city_name
         , s.name  AS state_name
      FROM customer c INNER JOIN district d       ON d.id   = c.id_district               
                      INNER JOIN city cit         ON cit.id = d.id_city
                      INNER JOIN state s          ON s.id   = cit.id_state
                      INNER JOIN zone z           ON z.id   = d.id_zone
                      INNER JOIN marital_status m ON m.id   = c.id_marital_status
   ORDER BY c.id;        
        """)

saveParquet(df_dim_customer, path_consolidation + "dim-customer")

- Tabela Dimensão **Branch**

In [None]:
createTempView(path_normalization + "branch", "branch")
createTempView(path_normalization + "district", "district")
createTempView(path_normalization + "city", "city")
createTempView(path_normalization + "state", "state")
createTempView(path_normalization + "zone", "zone")

In [None]:
df_dim_branch = spark.sql("""
    SELECT bc.id    AS branch_key
         , bc.name  AS branch_name
         , dt.name  AS district_name
         , z.name   AS zone_name
         , cit.name AS city_name
         , s.name   AS state_name
      FROM branch bc  INNER JOIN district dt ON dt.id  = bc.id_district              
                      INNER JOIN city cit    ON cit.id = dt.id_city
                      INNER JOIN state s     ON s.id   = cit.id_state
                      INNER JOIN zone z      ON z.id   = dt.id_zone
      ORDER BY bc.id;    
""")

saveParquet(df_dim_branch, path_consolidation + "dim-branch")

- Tabela Dimensão **Employee**

In [None]:
createTempView(path_normalization + "employee", "employee")
createTempView(path_normalization + "department", "department")

In [None]:
df_dim_employee = spark.sql("""
    SELECT e.id    AS  employee_key
         , e.name  AS  employee_name
         , d.name  AS  department_name       
      FROM employee e INNER JOIN department d ON d.id = e.id_department
     ORDER BY e.id;     
""")

saveParquet(df_dim_employee, path_consolidation + "dim-employee")

# Final

In [None]:
spark.stop()