In [0]:
!pytest --assert=plain --cache-clear ./test/

In [0]:
import importlib

import importlib
import etl.constants
import etl.data_extractor
import etl.data_standardizer
import etl.data_transformer

from etl.constants import *
from etl.data_extractor import DataExtractor
from etl.data_standardizer import DataStandardizer
from etl.data_transformer import DataTransformer

importlib.reload(etl.constants)
importlib.reload(etl.data_extractor)
importlib.reload(etl.data_standardizer)
importlib.reload(etl.data_transformer)

In [0]:
extractor = DataExtractor(spark)
raw_dataframes = extractor.extract()


In [0]:
standardizer = DataStandardizer(raw_dataframes)
cleaned_dataframes = standardizer.process()


In [0]:
products_cleaned_df = cleaned_dataframes[PRODUCTS_KEY]
customers_cleaned_df = cleaned_dataframes[CUSTOMERS_KEY]
orders_cleaned_df = cleaned_dataframes[ORDERS_KEY]

spark.sql("CREATE DATABASE IF NOT EXISTS ecom_sales_spark.silver")

products_cleaned_df.write.mode("overwrite").saveAsTable("ecom_sales_spark.silver.products")
customers_cleaned_df.write.mode("overwrite").saveAsTable("ecom_sales_spark.silver.customers")
orders_cleaned_df.write.mode("overwrite").saveAsTable("ecom_sales_spark.silver.orders")

In [0]:
transformer = DataTransformer(cleaned_dataframes)
transformed_dataframes = transformer.process()

In [0]:
spark.sql("CREATE DATABASE IF NOT EXISTS ecom_sales_spark.gold")

transformed_dataframes[ENRICHED_ORDERS_KEY].write.mode("overwrite").saveAsTable(f"ecom_sales_spark.gold.{ENRICHED_ORDERS_KEY}")

transformed_dataframes[AGGREGATED_PROFIT_KEY].write.mode("overwrite").saveAsTable(f"ecom_sales_spark.gold.{AGGREGATED_PROFIT_KEY}")


In [0]:
%sql
SELECT 
  order_year, 
  ROUND(SUM(total_profit), 2) AS profit_by_year
FROM gold.aggregated_profit
GROUP BY order_year
ORDER BY order_year;

In [0]:
%sql
SELECT 
  order_year, 
  category, 
  ROUND(SUM(total_profit), 2) AS profit_by_year_category
FROM gold.aggregated_profit
GROUP BY order_year, category
ORDER BY order_year, category;

In [0]:
%sql
SELECT 
  customer_name, 
  ROUND(SUM(total_profit), 2) AS profit_by_customer
FROM gold.aggregated_profit
GROUP BY customer_name
ORDER BY profit_by_customer DESC;

In [0]:
%sql
SELECT 
  customer_name, 
  order_year, 
  ROUND(SUM(total_profit), 2) AS profit_by_customer_year
FROM gold.aggregated_profit
GROUP BY customer_name, order_year
ORDER BY customer_name, order_year;

In [0]:
!pytest --assert=plain --cache-clear ./