In [0]:
!pip --disable-pip-version-check instal -r requirement.txt

In [0]:
import sys
import os
import warnings
import pandas as pd

src_path = os.path.abspath('/Workspace/Repos/srivastav_amol@yahoo.com/Ecommerce-Sales-Data-Processing/src')
if src_path not in sys.path:
    sys.path.insert(0, src_path)

from src.config import read_config
from src.data_quality_checks import data_quality_checks
from src.create_raw_tables import clean_and_save_tables
from src.create_enriched_tables import enrich_and_save_tables

In [0]:

# Read the config file

config = read_config(config_path="../configs/config.yaml")

data_quality_checks(
    config["customers_path"],
    config["products_path"],
    config["orders_path"]
)

In [0]:


# Define required columns for customers
required_columns = [
    "Customer ID",
    "Customer Name",
    "email",
    "phone",
    "address"
]

# Read the Excel file into a pandas DataFrame
pdf = pd.read_excel(
    config["customers_path"],
    engine="openpyxl"
)

# Ensure 'phone' column contains only string values
pdf["phone"] = pdf["phone"].astype(str)

# Check for nulls in key columns and handle them
null_columns = pdf[required_columns].isnull().any()
if null_columns.any():
    missing = list(null_columns[null_columns].index)
    warnings.warn(
        f"Null values found in columns: {missing}. Filling with default values."
    )
    if "Customer Name" in missing:
        pdf["Customer Name"].fillna("Unknown", inplace=True)

# Create a Spark DataFrame from the pandas DataFrame
df_customers = spark.createDataFrame(pdf)

# Read the CSV file into a Spark DataFrame
df_products = spark.read.csv(
    config["products_path"],
    header=True,
    inferSchema=True
)

df_products = df_products.dropDuplicates()
# Read the JSON file into a Spark DataFrame
df_orders = spark.read.option("multiline", "true").json(
    config["orders_path"]
)

df_orders = df_orders.dropDuplicates()

In [0]:
# check if null Customer Name is present in dataframe

df_customers.filter(df_customers["Customer Name"]=="Unknown").display()

# Check duplicate rows in df_products
total_rows_products = df_products.count()
unique_rows_products = df_products.dropDuplicates().count()
duplicate_rows_products = total_rows_products - unique_rows_products
print("Duplicate rows in df_products:", duplicate_rows_products)

# Check duplicate rows in df_orders
total_rows_orders = df_orders.count()
unique_rows_orders = df_orders.dropDuplicates().count()
duplicate_rows_orders = total_rows_orders - unique_rows_orders
print("Duplicate rows in df_orders:", duplicate_rows_orders)


In [0]:
# Call the function to clean and save raw tables
clean_and_save_tables(df_customers, df_products, df_orders, config)

In [0]:
enrich_and_save_tables(*clean_and_save_tables(df_customers, df_products, df_orders, config), config)

In [0]:
enriched_dfs = enrich_and_save_tables(*clean_and_save_tables(df_customers, df_products, df_orders, config), config)
for df in enriched_dfs:
    display(df.limit(5))

In [0]:
# Profit by Year
display(spark.sql("""
    SELECT
        year,
        SUM(total_profit) AS total_profit
    FROM amol_uc.default.profit_agg_by_year_category_subcat_customer
    GROUP BY year
    ORDER BY year
"""))

In [0]:
display(spark.sql("""
    SELECT
        year,
        category,
        SUM(total_profit) AS total_profit

    FROM amol_uc.default.profit_agg_by_year_category_subcat_customer
    GROUP BY year, category
    ORDER BY year, category
    """))

In [0]:
%sql
SELECT
    customer_name,
    COUNT(DISTINCT sub_category) AS total_sub_categories,
    ROUND(SUM(total_profit), 2) AS total_profit
FROM amol_uc.default.profit_agg_by_year_category_subcat_customer
GROUP BY customer_name
ORDER BY total_profit DESC

In [0]:
# Profit by Customer + Year
display(spark.sql("""
    SELECT
        customer_name,
        year,
        ROUND(SUM(total_profit), 2) AS total_profit
    FROM amol_uc.default.profit_agg_by_year_category_subcat_customer
    GROUP BY customer_name, year
    ORDER BY customer_name, year
"""))

In [0]:
# # Get table names as a list using DataFrame operations only
# tables_df = spark.sql("SHOW TABLES IN amol_uc.default").select("tableName")
# tables = [row.tableName for row in tables_df.collect()]

# # Drop each table
# for table in tables:
#     spark.sql(f"DROP TABLE amol_uc.default.{table}")