In [0]:
%sql
truncate table raw_customer_data;
truncate table raw_product_data;
truncate table raw_order_data;
truncate table enriched_customer_data;
truncate table enriched_product_data;
truncate table enriched_order_with_cust_and_product;
truncate table enriched_aggregated_data;

-- CREATE TABLE IF NOT EXISTS raw_customer_data (
--   Customer_ID STRING,
--   Customer_Name STRING,
--   email STRING,
--   phone STRING,
--   address STRING,
--   Segment STRING,
--   Country STRING,
--   City STRING,
--   State STRING,
--   Postal_Code STRING,
--   Region STRING
-- )
-- USING DELTA
-- LOCATION '/mnt/raw/customer_data';
--select * from raw_customer_data;
--show create table raw_customer_data;
--vacuum  raw_customer_data;


-- CREATE TABLE raw_product_data (
--   Product_ID STRING,
--   Category STRING,
--   Sub_Category STRING,
--   Product_Name STRING,
--   State STRING,
--   Price_per_product DECIMAL(27,2)
-- )
-- USING DELTA
-- LOCATION '/mnt/raw/product_data';
--select * from raw_product_data
--vacuum raw_product_data
--drop table raw_product_data



-- CREATE TABLE raw_order_data (
--   `Row_ID` BIGINT,
--   `Order_ID` STRING,
--   `Order_Date` STRING,
--   `Ship_Date` STRING,
--   `Ship_Mode` STRING,
--   `Customer_ID` STRING,
--   `Product_ID` STRING,
--    Quantity INTEGER,
--    Price DECIMAL(27,6),
--    Discount DECIMAL(27,6),
--    Profit DECIMAL(27,6)
-- )
-- USING DELTA
-- LOCATION '/mnt/raw/order_data_1';
--select * from raw_order_data
--vacuum raw_order_data
--drop table raw_order_data


-- CREATE TABLE IF NOT EXISTS enriched_customer_data (
--   Customer_ID STRING,
--   Customer_Name STRING,
--   email STRING,
--   phone STRING,
--   address STRING,
--   Segment STRING,
--   Country STRING,
--   City STRING,
--   State STRING,
--   Postal_Code STRING,
--   Region STRING
-- )
-- USING DELTA
-- LOCATION '/mnt/enriched/customer_data';
--select * from enriched_customer_data;


-- CREATE TABLE enriched_product_data (
--   Product_ID STRING,
--   Category STRING,
--   Sub_Category STRING,
--   Product_Name STRING,
--   State STRING,
--   Price_per_product DECIMAL(27,2)
-- )
-- USING DELTA
-- LOCATION '/mnt/enriched/product_data';
--select * from enriched_product_data;
--select Product_ID from enriched_product_data group by Product_ID having count(*)>1;

-- CREATE TABLE enriched_order_with_cust_and_product(
-- Order_ID STRING,
-- Order_Date DATE,
-- Ship_Date DATE,
-- Ship_Mode STRING,
-- Customer_ID STRING,
-- Product_ID STRING,
-- Customer_Name STRING,
-- Country STRING,
-- Category STRING,
-- Sub_Category STRING,
-- Quantity INTEGER,
-- Price DECIMAL(27,2),
-- Discount DECIMAL(27,2),
-- Profit DECIMAL(27,2))
-- USING DELTA
-- LOCATION '/mnt/enriched/enriched_order_with_cust_and_product';
-- drop table enriched_order_with_cust_and_product;
-- select * from enriched_order_with_cust_and_product;

-- CREATE TABLE enriched_aggregated_data(
--     `year` int,
--     product_category STRING,
--     product_sub_category STRING,
--     customer_id STRING,
--     customer_name STRING,
--     net_proft DECIMAL(27,2))
--     USING DELTA
--     LOCATION '/mnt/enriched/enriched_aggregated_data';

-- drop table enriched_aggregated_data;
--select * from enriched_aggregated_data;

In [0]:
# ------------------- Step 1: Extract From file and load into raw table ------------------- #

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DecimalType, DoubleType, LongType, IntegerType
from decimal import Decimal
from pyspark.sql.functions import col

class RawExtractAndLoad():
    def __init__(self):
        self.spark = SparkSession.builder.getOrCreate()
        self.spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
        self.customer_source_path = '/FileStore/tables/Customer.xlsx'
        self.product_source_path = '/FileStore/tables/Products.csv'
        self.order_source_path = '/FileStore/tables/Orders.json'

        self.customer_schema = StructType([
            StructField("Customer_ID", StringType(), True),
            StructField("Customer_Name", StringType(), True),
            StructField("email", StringType(), True),
            StructField("phone", StringType(), True),
            StructField("address", StringType(), True),
            StructField("Segment", StringType(), True),
            StructField("Country", StringType(), True),
            StructField("City", StringType(), True),
            StructField("State", StringType(), True),
            StructField("Postal_Code", StringType(), True),
            StructField("Region", StringType(), True)
        ])

        self.product_schema = StructType([
            StructField("Product_ID", StringType(), True),
            StructField("Category", StringType(), True),
            StructField("Sub_Category", StringType(), True),
            StructField("Product_Name", StringType(), True),
            StructField("State", StringType(), True),
            StructField("Price_per_product", DecimalType(27,2), True) 
        ])

    def customer_file_read(self, customer_schema=None, customer_source_path=None):
        if customer_schema is None:
            customer_schema = self.customer_schema
        if customer_source_path is None:
            customer_source_path = self.customer_source_path

        customer = spark.read.format("com.crealytics.spark.excel")\
            .schema(customer_schema)\
            .option("header", "true")\
            .load(customer_source_path)

        return customer

    def customer_raw_data_load(self):
        self.customer_file_read().write.format("delta").mode("append").saveAsTable("raw_customer_data")
    
    def product_file_read(self, product_schema=None, product_source_path=None):
        if product_schema is None:
            product_schema = self.product_schema
        if product_source_path is None:
            product_source_path = self.product_source_path

        product = spark.read.format("csv").schema(product_schema).option("header","true").option("quote", "\"").option("escape", "\"").load(product_source_path)

        return product

    def product_raw_data_load(self):
        self.product_file_read().write.format("delta").mode("append").saveAsTable("raw_product_data")


    def order_file_read(self, order_source_path=None):
        if order_source_path is None:
            order_source_path = self.order_source_path

        order = spark.read.format("json").option("multiline", "true").option("header","true").load(order_source_path)

        order_renamed = order.toDF(*[c.strip().replace(" ", "_") for c in order.columns])
        order_casted = order_renamed \
            .withColumn("Price", col("Price").cast(DecimalType(27, 6))) \
            .withColumn("Discount", col("Discount").cast(DecimalType(27, 6))) \
            .withColumn("Profit", col("Profit").cast(DecimalType(27, 6))) \
            .withColumn("Quantity", col("Quantity").cast("int")) \
            .withColumn("Row_ID", col("Row_ID").cast("long"))

        return order_casted

    def order_raw_data_load(self):
        self.order_file_read().write.format("delta").mode("append").saveAsTable("raw_order_data")







In [0]:
# ------------------- Step 2: Unit Test Cases for raw data sets ------------------- #
import pytest
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, round, lit
from decimal import Decimal

spark = SparkSession.builder.getOrCreate()


def test_raw_data_check(spark):
    # Arrange
    raw_obj = RawExtractAndLoad()
    
    # Act
    df = raw_obj.customer_file_read(None,'/FileStore/tables/Customer_test.xlsx')
    
    # Assert: check that the DataFrame has expected columns for raw_customer table.
    expected_columns = [
        "Customer_ID", "Customer_Name", "email", "phone", "address",
        "Segment", "Country", "City", "State", "Postal_Code", "Region"
    ]
    assert df.columns == expected_columns
    # print(df.count())
    assert df.count() == 2
    assert dict(df.dtypes)["Postal_Code"] == "string"
    row = df.filter(col("Customer_ID") == 'PW-19240').first()
    assert row["Customer_Name"] == "Pierre Wener"

    # Assert: check that the DataFrame has expected columns for raw_product table.
    df = raw_obj.product_file_read(None,'/FileStore/tables/Products_test.csv')

    expected_columns = [
        "Product_ID", "Category", "Sub_Category",
        "Product_Name", "State", "Price_per_product"
    ]
    assert df.columns == expected_columns
    # print(df.count())
    assert df.count() == 2
    assert dict(df.dtypes)["Price_per_product"] == "decimal(27,2)"
    row = df.filter(col("Product_ID") == 'FUR-CH-10002961').first()
    assert row["Price_per_product"] == Decimal("81.88")

    print("All theTest passed for the raw_customet table ✅")
    

test_raw_data_check(spark)


All theTest passed for the raw_customet table ✅


In [0]:
# ------------------- Step 3: Loads Data in raw tables ------------------- #

raw_obj = RawExtractAndLoad()
# Loading data in raw_customer table
raw_obj.customer_raw_data_load()

# Loading data in raw_product table
raw_obj.product_raw_data_load()

# Loading data in raw_order table
raw_obj.order_raw_data_load()

In [0]:
# ------------------- Step 4: Extract data from raw tables and Do some standardization on raw date --------------------- Then Load To enriched tables-----------------------------------------------#

import re
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    regexp_replace, trim, coalesce, split, size, when, element_at, concat, col, lit, count, row_number, to_date, date_format, year, sum
)
from pyspark.sql.types import StringType, DecimalType
from pyspark.sql.functions import udf
from pyspark.sql import Window

class EnrichedTableLoad:
    def __init__(self):
        self.spark = SparkSession.builder.getOrCreate()

    @staticmethod
    def normalize_phone_static(phone):
        if not phone or phone.startswith("#") or (phone.startswith("-") and len(phone.strip()) < 10):
            return None

        phone = str(phone).strip()
        pattern = re.compile(r"""^
            (?:001[-\.])?                   # Optional country code
            (?:\(?(\d{3})\)?[-\.]?)         # Area code
            (\d{3})[-\.]?(\d{4})            # Main number
            (?:x(\d{1,6}))?                 # Optional extension
            $""", re.VERBOSE)

        match = pattern.match(phone)
        if not match:
            return None

        area, mid, last, ext = match.groups()
        formatted = f"+1-{area}-{mid}-{last}"
        if ext:
            formatted += f" x{ext}"
        return formatted

    @staticmethod
    def is_valid_email_static(email):
        if email is None:
            return "Invalid email"
        pattern = re.compile(r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$")
        return email if pattern.match(email.strip()) else "Invalid email"

    def enriched_cutomer_data(self, df=None):
        if df is None:
            df = self.spark.table("raw_customer_data")

        # Clean and transform customer name
        df = df.withColumn(
            "Customer_Name",
            coalesce(
                regexp_replace(
                    regexp_replace(col("Customer_Name"), r"[^a-zA-Zö' ]|\.\s*\.\s*|[0-9]", "  "),
                    r" {2,}",
                    ""
                ),
                lit('NA')
            )
        )

        df = df.withColumn("word_count", size(split(col("Customer_Name"), " "))) \
               .withColumn("Customer_Name_final",
                   when(col("word_count") > 2,
                        concat(
                            element_at(split(col("Customer_Name"), " "), 1),
                            element_at(split(col("Customer_Name"), " "), 2),
                            lit(" "),
                            element_at(split(col("Customer_Name"), " "), 3)
                        )
                   ).otherwise(col("Customer_Name"))
               )

        # Register UDFs
        normalize_phone_udf = udf(self.normalize_phone_static, StringType())
        valid_email_udf = udf(self.is_valid_email_static, StringType())

        df = df.withColumn("normalized_phone", coalesce(normalize_phone_udf(col("phone")), lit("Invalid Phone number")))
        df = df.withColumn("valid_email", valid_email_udf(col("email")))

        # Final selection
        df = df.select(
            trim(col("Customer_ID")).alias("Customer_ID"),
            trim(col("Customer_Name_final")).alias("Customer_Name"),
            trim(col("valid_email")).alias("email"),
            trim(col("normalized_phone")).alias("phone"),
            trim(col("address")).alias("address"),
            trim(col("Segment")).alias("Segment"),
            trim(col("Country")).alias("Country"),
            trim(col("State")).alias("State"),
            trim(col("Postal_Code")).alias("Postal_Code"),
            trim(col("Region")).alias("Region")
        )
        return df

    def load_enriched_cutomer_data(self):
        self.enriched_cutomer_data().write.format("delta").mode("append").saveAsTable("enriched_customer_data")

    def enriched_product_data(self, df=None):
        if df is None:
            df = self.spark.table("raw_product_data")
        
        w = Window.partitionBy(col("Product_Id")).orderBy(col("Product_Id"))
        rnum = row_number().over(w)
        df = df\
            .withColumn("rnum", rnum)\
                .filter(col("rnum") == 1)\
                    .select(trim(col("Product_ID")).alias("Product_ID"),
                            trim(col("Category")).alias("Category"),
                            trim(col("Sub_Category")).alias("Sub_Category"),
                            trim(col("Product_Name")).alias("Product_Name"),
                            trim(col("State")).alias("State"),
                            coalesce(col("Price_per_product"),lit(0)).alias("Price_per_product"))
        
        
        return df

    def load_enriched_product_data(self):
        self.enriched_product_data().write.format("delta").mode("append").saveAsTable("enriched_product_data")


    def transformed_data_order_cust_product(self, order=None, cust=None, product=None):
        if order is None:
            order = self.spark.table("raw_order_data")
        if cust is None:
            cust = self.spark.table("enriched_customer_data")
        if product is None:
            product = self.spark.table("enriched_product_data")
        
        df = order.alias("o")\
            .join(cust.alias("c"), regexp_replace(trim(col("o.Customer_ID")),"-","") == regexp_replace(trim(("c.Customer_ID")),"-",""),"left")\
                .join(product.alias("p"), regexp_replace(trim(col("o.Product_ID")),'-','') == regexp_replace(trim(("p.Product_ID")),'-',''),"left")
        
        df = df\
            .select(\
                trim(col("o.Order_ID")).alias("Order_ID"),
                to_date(to_date(col("o.Order_Date"), "d/M/yyyy"), "yyyy-MM-dd").alias("Order_Date"),
                to_date(to_date(col("o.Ship_Date"), "d/M/yyyy"), "yyyy-MM-dd").alias("Ship_Date"),
                trim(col("o.Ship_Mode")).alias("Ship_Mode"),
                trim(col("o.Customer_ID")).alias("Customer_ID"),
                trim(col("o.Product_ID")).alias("Product_ID"),
                trim(col("c.Customer_Name")).alias("Customer_Name"),
                trim(col("c.Country")).alias("Country"),
                trim(col("p.Category")).alias("Category"),
                trim(col("p.Sub_Category")).alias("Sub_Category"),
                col("o.Quantity").cast("int").alias("Quantity"),
                col("o.Price").cast(DecimalType(27, 2)).alias("Price"),
                col("o.Discount").cast(DecimalType(27, 2)).alias("Discount"),
                col("o.Profit").cast(DecimalType(27, 2)).alias("Profit")
                )
        return df
    
    def load_transformed_data_order_cust_product(self):
        self.transformed_data_order_cust_product().write.format("delta").mode("append").saveAsTable("enriched_order_with_cust_and_product")

    def aggregated_data_derivation(self, df=None):
        if df is None:
            df = self.spark.table("enriched_order_with_cust_and_product")
        
        df = df\
            .select(\
                year(col("Order_Date")).alias("year"),
                col("Category").alias("product_category"),
                col("Sub_Category").alias("product_sub_category"),
                col("Customer_ID").alias("Customer_ID"),
                col("Customer_Name").alias("Customer_Name"),
                col("Profit").alias("Profit")
                ).groupBy(\
                    col("year"),
                    col("product_category"),
                    col("product_sub_category"),
                    col("Customer_ID"),
                    col("Customer_Name")
                    ).agg(sum(col("Profit")).cast(DecimalType(27,2)).alias("net_proft"))
        return df
    
    def load_aggregated_data(self):
        self.aggregated_data_derivation().write.format("delta").mode("append").saveAsTable("enriched_aggregated_data")

        
        



In [0]:
# ------------------- Step 5: Unit Test cases for enriched transformations and aggregations------------------- #

import pytest
import json
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DecimalType, FloatType
from pyspark.sql.functions import from_json
from decimal import Decimal


def spark():
    return SparkSession.builder.master("local[*]").appName("pytest").getOrCreate()


def sample_data():
    return [
        ("MV-18190", "_Mike Vitt 12313orini", "juliesimon499@gmail.com", "(359)650-7737x153", "Address", "Consumer", "United States", "New York City", "New York", "10035", "East"),
        ("AO-10810", "Anthony O'Donnell", "kimshaw310@gmail.com", "#ERROR!", "Unit 9987 Box 4002\nDPO AA 15069", "Corporate", "United States", "Los Angeles", "California", "90045", "West")
    ]


def customer_schema():
    return StructType([
        StructField("Customer_ID", StringType()),
        StructField("Customer_Name", StringType()),
        StructField("email", StringType()),
        StructField("phone", StringType()),
        StructField("address", StringType()),
        StructField("Segment", StringType()),
        StructField("Country", StringType()),
        StructField("City", StringType()),
        StructField("State", StringType()),
        StructField("Postal_Code", StringType()),
        StructField("Region", StringType()),
    ])

def test_enriched_customer_data_transformation(spark, sample_data, customer_schema):
    etl = EnrichedTableLoad()
    df = spark.createDataFrame(sample_data, schema=customer_schema)
    result_df = etl.enriched_cutomer_data(df)

    # display(result_df)
    result = result_df.collect()
    
    assert result[0]["email"] == "juliesimon499@gmail.com"
    assert result[1]["phone"] == "Invalid Phone number"
    assert result[0]["Customer_Name"].startswith("Mike")

def smaple_product_data():
    return [
    ("TEC-PH-10003811", "Office Supplies", "Binders", "VariCap6 Expandable Binder", "California", "13.84"),
    ("TEC-PH-10003811", "Office Supplies", "Binders", "Avery Binding System Hidden Tab Executive Style Index Sets", "Oklahoma", "5.77")
]

def product_schema():
    return StructType([
            StructField("Product_ID", StringType(), True),
            StructField("Category", StringType(), True),
            StructField("Sub_Category", StringType(), True),
            StructField("Product_Name", StringType(), True),
            StructField("State", StringType(), True),
            StructField("Price_per_product", StringType(), True) 
        ])

def test_enriched_product_data_transformation(spark, smaple_product_data, product_schema):
    etl = EnrichedTableLoad()
    df = spark.createDataFrame(smaple_product_data, schema=product_schema)
    result_df = etl.enriched_product_data(df)

    # display(result_df)  
    assert result_df.count() == 1


def test_transformed_data(order_df, cust, product, expected_df):
    etl = EnrichedTableLoad()
    result_df = etl.transformed_data_order_cust_product(order_df, cust, product)
    # print("result_df")
    # display(result_df)
    # print("expected_df")
    # display(expected_df)
    # print("diffs")
    display(result_df.exceptAll(expected_df))
 
    assert result_df.exceptAll(expected_df).count() == 0
    assert expected_df.exceptAll(result_df).count() == 0

def test_aggregated_data(input_df, expected_df):
    etl = EnrichedTableLoad()
    result_df = etl.aggregated_data_derivation(input_df)
    print("result_df")
    display(result_df)
    print("expected_df")
    display(expected_df)
    print("dffs")
    display(result_df.exceptAll(expected_df))
    assert result_df.exceptAll(expected_df).count() == 0
    assert expected_df.exceptAll(result_df).count() == 0




# Executing test cases for enriched customer data.
spark = spark()
sample_data = sample_data()
customer_schema = customer_schema()
test_enriched_customer_data_transformation(spark,sample_data,customer_schema)

# Executing test cases for enriched product data.
smaple_product_data = smaple_product_data()
product_schema = product_schema()
test_enriched_product_data_transformation(spark, smaple_product_data, product_schema)

# Executing test cases for enriched table which is loaded with order information along with product and customer.
sample_order_data = '''[{
    "Row_ID": 2050,
    "Order_ID": "CA-2016-127698",
    "Order_Date": "1/12/2016",
    "Ship_Date": "1/12/2016",
    "Ship_Mode": "Same Day",
    "Customer_ID": "MV-18190",
    "Product_ID": "TEC-PH-10003811",
    "Quantity": 9,
    "Price": 863.93,
    "Discount": 0.2,
    "Profit": 86.39
  },
  {
    "Row_ID": 3184,
    "Order_ID": "CA-2014-134103",
    "Order_Date": "30/1/2014",
    "Ship_Date": "4/2/2014",
    "Ship_Mode": "Standard Class",
    "Customer_ID": "MV-18190",
    "Product_ID": "OFF-ST-10000991",
    "Quantity": 2,
    "Price": 229.94,
    "Discount": 0.0,
    "Profit": 6.9
  }]'''

order_schema = StructType([
    StructField("Row_ID", IntegerType()),
    StructField("Order_ID", StringType()),
    StructField("Order_Date", StringType()),
    StructField("Ship_Date", StringType()),
    StructField("Ship_Mode", StringType()),
    StructField("Customer_ID", StringType()),
    StructField("Product_ID", StringType()),
    StructField("Quantity", IntegerType()),
    StructField("Price", FloatType()),     # <-- changed
    StructField("Discount", FloatType()),  # <-- changed
    StructField("Profit", FloatType())     # <-- changed
])
parsed_data = json.loads(sample_order_data)
order_df = spark.createDataFrame(parsed_data, schema=order_schema)
# display(order_df)
etl = EnrichedTableLoad()
product = spark.createDataFrame(smaple_product_data, schema=product_schema)
product = etl.enriched_product_data(product)
cust = spark.createDataFrame(sample_data, schema=customer_schema)
cust = etl.enriched_cutomer_data(cust)
# display(cust)
# display(product)
expected_data = [
    ("CA-2016-127698", "2016-12-01", "2016-12-01", "Same Day", "MV-18190", "TEC-PH-10003811", "Mike Vittorini", "United States", "Office Supplies", "Binders", 9, Decimal(863.93), Decimal(0.20), Decimal(86.39)),
    ("CA-2014-134103", "2014-01-30", "2014-02-04", "Standard Class", "MV-18190", "OFF-ST-10000991", "Mike Vittorini", "United States", None, None, 2, Decimal(229.94), Decimal(0.00), Decimal(6.9))
]

expected_schema = schema = StructType([
    StructField("Order_ID", StringType()),
    StructField("Order_Date", StringType()),
    StructField("Ship_Date", StringType()),
    StructField("Ship_Mode", StringType()),
    StructField("Customer_ID", StringType()),
    StructField("Product_ID", StringType()),
    StructField("Customer_Name", StringType()),
    StructField("Country", StringType()),
    StructField("Category", StringType()),
    StructField("Sub_Category", StringType()),
    StructField("Quantity", IntegerType()),
    StructField("Price", DecimalType(27, 2)),
    StructField("Discount", DecimalType(27, 2)),
    StructField("Profit", DecimalType(27, 2))
])

expected_df = spark.createDataFrame(expected_data, expected_schema)
# display(expected_df)
test_transformed_data(order_df, cust, product, expected_df)

# Aggregated test
result_df = etl.transformed_data_order_cust_product(order_df, cust, product)
aggregated_expected_data = [
    (2016, 'Office Supplies', 'Binders', 'MV-18190', 'Mike Vittorini', Decimal('86.39')),
    (2014, None, None, 'MV-18190', 'Mike Vittorini', Decimal('6.90'))
]

aggregated_expected_schema = StructType([
    StructField("Order_Year", IntegerType(), True),
    StructField("Category", StringType(), True),       # Nullable
    StructField("Sub_Category", StringType(), True),   # Nullable
    StructField("Customer_ID", StringType(), True),
    StructField("Customer_Name", StringType(), True),
    StructField("Total_Profit", DecimalType(27, 2), True)
])

expected_agg_df = spark.createDataFrame(aggregated_expected_data, aggregated_expected_schema)
test_aggregated_data(result_df, expected_agg_df)







Order_ID,Order_Date,Ship_Date,Ship_Mode,Customer_ID,Product_ID,Customer_Name,Country,Category,Sub_Category,Quantity,Price,Discount,Profit


result_df


year,product_category,product_sub_category,Customer_ID,Customer_Name,net_proft
2016,Office Supplies,Binders,MV-18190,Mike Vittorini,86.39
2014,,,MV-18190,Mike Vittorini,6.9


expected_df


Order_Year,Category,Sub_Category,Customer_ID,Customer_Name,Total_Profit
2016,Office Supplies,Binders,MV-18190,Mike Vittorini,86.39
2014,,,MV-18190,Mike Vittorini,6.9


dffs


year,product_category,product_sub_category,Customer_ID,Customer_Name,net_proft


In [0]:
# ------------------- Step 6: Loads data to enriched and aggregated tables------------------- #

enrich_load = EnrichedTableLoad()
enrich_load.load_enriched_cutomer_data()
enrich_load.load_enriched_product_data()
enrich_load.load_transformed_data_order_cust_product()
enrich_load.load_aggregated_data()


In [0]:
%sql
select YEAR(Order_Date) as `year`,round(sum(Profit),2) as profit_by_year from enriched_order_with_cust_and_product
group by year(Order_Date) order by year(Order_Date);










year,profit_by_year
2014,39185.75
2015,63073.09
2016,65073.23
2017,111084.96


In [0]:
%sql
select year(Order_Date) as `year`,Category,
round(sum(Profit),2) as profit_by_year_ctgry from enriched_order_with_cust_and_product
group by year(Order_Date),Category order by year(Order_Date), Category;



year,Category,profit_by_year_ctgry
2014,,523.12
2014,Furniture,-5331.04
2014,Office Supplies,22500.32
2014,Technology,21493.35
2015,,583.16
2015,Furniture,3027.18
2015,Office Supplies,24519.39
2015,Technology,34943.36
2016,,404.44
2016,Furniture,6889.5


In [0]:
%sql
select Customer_ID,Customer_Name,
round(sum(Profit),2) as profit_by_cust from enriched_order_with_cust_and_product
group by Customer_ID,Customer_Name order by Customer_ID;



Customer_ID,Customer_Name,profit_by_cust
AA-10315,Alex Avila,-362.91
AA-10375,Allen Armold,277.4
AA-10480,Andrew Allen,436.08
AA-10645,Anna Andreadi,857.8
AB-10015,Aaron Bergman,129.68
AB-10060,Adam Bellavance,2054.6
AB-10105,Adrian Barton,5444.97
AB-10150,Aimee Bixby,313.66
AB-10165,Alan Barnes,220.6
AB-10255,Alejandro Ballentine,264.57


In [0]:
%sql
select year(Order_Date) as `year`,Customer_ID,Customer_Name,
round(sum(Profit),2) as profit_by_cust from enriched_order_with_cust_and_product
group by year(Order_Date),Customer_ID,Customer_Name order by year(Order_Date),Customer_ID;

year,Customer_ID,Customer_Name,profit_by_cust
2014,AA-10315,Alex Avila,280.68
2014,AA-10375,Allen Armold,16.72
2014,AA-10480,Andrew Allen,9.89
2014,AA-10645,Anna Andreadi,77.64
2014,AB-10015,Aaron Bergman,12.82
2014,AB-10105,Adrian Barton,498.73
2014,AB-10150,Aimee Bixby,261.2
2014,AB-10165,Alan Barnes,-1.07
2014,AB-10255,Alejandro Ballentine,6.49
2014,AB-10600,Ann Blume,-23.89


In [0]:
# %fs ls dbfs:/FileStore/tables/
# % dbutils.fs.head("/FileStore/tables/orders.json", 1000)
# %pip install pytest
# dbutils.fs.rm("dbfs:/mnt/raw/customer_data", recurse=True)
# dbutils.fs.rm("dbfs:/mnt/raw/product_data", recurse=True)
# dbutils.fs.rm("dbfs:/mnt/raw/order_data_1", recurse=True)
# dbutils.fs.rm("dbfs:/mnt/enriched/enriched_order_with_cust_and_product", recurse=True)
# dbutils.fs.rm("dbfs:/mnt/enriched/enriched_aggregated_data", recurse=True)