In [115]:
%pip install pyspark pandas kagglehub sqlalchemy ipywidgets

IOStream.flush timed out
Note: you may need to restart the kernel to use updated packages.


In [116]:
import os
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, month, year, to_date, lit, round, desc, to_timestamp, greatest # Added greatest here
from pyspark.sql.types import DoubleType, StringType, IntegerType, DateType, StructType, StructField
import kagglehub
from kagglehub import KaggleDatasetAdapter
import sqlite3
import logging

# Configure logging for better visibility in the notebook
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Global Constants
KAGGLE_DATASET_REF = "kyanyoga/sample-sales-data"
CSV_FILE_NAME = "sales_data_sample.csv"
DB_NAME = "sales_analytics.db"
PARQUET_OUTPUT_DIR = "output/parquet"
SALES_THRESHOLD = 5000
PROFIT_MARGIN_RATE = 0.30 # 30% profit margin assumed for calculation


In [117]:
def initialize_spark_session():
    """Initializes and returns a SparkSession."""
    logging.info("Initializing Spark Session...")
    spark = (
        SparkSession.builder.appName("SalesAnalyticsPipeline")
        .config("spark.memory.offHeap.enabled", "true")
        .config("spark.memory.offHeap.size", "2g")
        .config("spark.driver.memory", "4g") # Increased driver memory to 4GB
        .getOrCreate()
    )
    logging.info("Spark Session initialized.")
    return spark

spark = initialize_spark_session()


2025-06-22 23:33:04,098 - INFO - Initializing Spark Session...
2025-06-22 23:33:04,145 - INFO - Spark Session initialized.


In [118]:
def define_schema():
    """Defines the explicit schema for the sales data."""
    logging.info("Defining explicit schema for the sales data.")
    schema = StructType([
        StructField("ORDERNUMBER", IntegerType(), True),
        StructField("QUANTITYORDERED", IntegerType(), True),
        StructField("PRICEEACH", DoubleType(), True),
        StructField("ORDERLINENUMBER", IntegerType(), True),
        StructField("SALES", DoubleType(), True),
        StructField("ORDERDATE", StringType(), True),
        StructField("STATUS", StringType(), True),
        StructField("QTR_ID", IntegerType(), True),
        StructField("MONTH_ID", IntegerType(), True),
        StructField("YEAR_ID", IntegerType(), True),
        StructField("PRODUCTLINE", StringType(), True),
        StructField("MSRP", IntegerType(), True),
        StructField("PRODUCTCODE", StringType(), True),
        StructField("CUSTOMERNAME", StringType(), True),
        StructField("PHONE", StringType(), True),
        StructField("ADDRESSLINE1", StringType(), True),
        StructField("ADDRESSLINE2", StringType(), True),
        StructField("CITY", StringType(), True),
        StructField("STATE", StringType(), True),
        StructField("POSTALCODE", StringType(), True),
        StructField("COUNTRY", StringType(), True),
        StructField("TERRITORY", StringType(), True),
        StructField("CONTACTLASTNAME", StringType(), True),
        StructField("CONTACTFIRSTNAME", StringType(), True),
        StructField("DEALSIZE", StringType(), True)

    ])
    return schema

def ingest_data(spark_session, kaggle_dataset_ref, csv_file_name, schema):
    """
    Loads raw CSV data from KaggleHub into a PySpark DataFrame.
    
    Args:
        spark_session: The active SparkSession.
        kaggle_dataset_ref (str): Kaggle dataset reference.
        csv_file_name (str): The name of the CSV file within the dataset.
        schema: The explicit schema to apply.
        
    Returns:
        pyspark.sql.DataFrame: The raw PySpark DataFrame.
    """
    logging.info(f"Attempting to load dataset '{csv_file_name}' from KaggleHub...")
    try:
        pandas_df = kagglehub.load_dataset(
            KaggleDatasetAdapter.PANDAS,
            kaggle_dataset_ref,
            csv_file_name,
            # IMPORTANT: Specify encoding to handle potential UnicodeDecodeError
            pandas_kwargs={"encoding": "latin1"} 
        )
        logging.info("Dataset loaded successfully into Pandas DataFrame.")

        # Convert Pandas DataFrame to PySpark DataFrame with the explicit schema
        raw_df = spark_session.createDataFrame(pandas_df, schema=schema)
        logging.info("Converted Pandas DataFrame to PySpark DataFrame with explicit schema.")
        return raw_df
    except Exception as e:
        logging.error(f"Error loading dataset from KaggleHub: {e}", exc_info=True)
        raise

raw_df = ingest_data(spark, KAGGLE_DATASET_REF, CSV_FILE_NAME, define_schema())
raw_df.show(5)
raw_df.printSchema()


In [120]:
def clean_and_validate_data(raw_df, profit_margin_rate):
    """
    Performs data cleaning, validation, and enriches the DataFrame.
    """
    logging.info("Starting data cleaning and validation...")

    # Drop columns that are entirely null (e.g., if a column was added but always empty)
    null_columns_check = [c for c, count in raw_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in raw_df.columns]).collect()[0].asDict().items() if count == raw_df.count()]
    if null_columns_check:
        logging.info(f"Dropping entirely null columns: {null_columns_check}")
        raw_df = raw_df.drop(*null_columns_check)
    
    # Select and cast columns
    cleaned_df = raw_df.select(
        col("ORDERNUMBER").cast(IntegerType()).alias("OrderNumber"),
        to_timestamp(col("ORDERDATE"), "M/d/yyyy H:mm").cast(DateType()).alias("OrderDate"),
        col("STATUS").cast(StringType()).alias("Status"),
        col("PRODUCTLINE").cast(StringType()).alias("ProductLine"),
        col("QUANTITYORDERED").cast(IntegerType()).alias("QuantityOrdered"),
        col("PRICEEACH").cast(DoubleType()).alias("PriceEach"),
        col("SALES").cast(DoubleType()).alias("Sales"),
        col("DEALSIZE").cast(StringType()).alias("DealSize"),
        col("MSRP").cast(IntegerType()).alias("MSRP"),
        col("PRODUCTCODE").cast(StringType()).alias("ProductCode"),
        col("CUSTOMERNAME").cast(StringType()).alias("CustomerName"),
        col("PHONE").cast(StringType()).alias("Phone"),
        col("ADDRESSLINE1").cast(StringType()).alias("AddressLine1"),
        col("ADDRESSLINE2").cast(StringType()).alias("AddressLine2"),
        col("CITY").cast(StringType()).alias("City"),
        col("STATE").cast(StringType()).alias("State"),
        col("POSTALCODE").cast(StringType()).alias("PostalCode"),
        col("COUNTRY").cast(StringType()).alias("Country"),
        col("TERRITORY").cast(StringType()).alias("Territory"),
        col("CONTACTLASTNAME").cast(StringType()).alias("ContactLastName"),
        col("CONTACTFIRSTNAME").cast(StringType()).alias("ContactFirstName"),
        col("QTR_ID").cast(IntegerType()).alias("QTR_ID"),
        col("MONTH_ID").cast(IntegerType()).alias("MONTH_ID"),
        col("YEAR_ID").cast(IntegerType()).alias("YEAR_ID"),
        col("ORDERLINENUMBER").cast(IntegerType()).alias("OrderLineNumber")
    )

    # Fill missing numeric values with 0 and string values with 'N/A'
    numeric_cols = ["QuantityOrdered", "PriceEach", "Sales", "MSRP", "QTR_ID", "MONTH_ID", "YEAR_ID", "OrderLineNumber"]
    string_cols = ["Status", "ProductLine", "DealSize", "CustomerName", "Phone", "AddressLine1",
                   "AddressLine2", "City", "State", "PostalCode", "Country", "Territory",
                   "ContactLastName", "ContactFirstName", "ProductCode"]

    for col_name in numeric_cols:
        if col_name in cleaned_df.columns:
            cleaned_df = cleaned_df.na.fill(0, subset=[col_name])

    for col_name in string_cols:
        if col_name in cleaned_df.columns:
            cleaned_df = cleaned_df.na.fill("N/A", subset=[col_name])
            
    logging.info("Checking for and removing duplicate records...")
    initial_row_count = cleaned_df.count()
    cleaned_df = cleaned_df.dropDuplicates(["OrderNumber", "OrderLineNumber", "ProductCode"])
    deduplicated_row_count = cleaned_df.count()
    if initial_row_count != deduplicated_row_count:
        logging.info(f"Removed {initial_row_count - deduplicated_row_count} duplicate rows.")
    else:
        logging.info("No duplicate rows found.")

    # Filter out records with invalid dates or critical missing/invalid numeric values
    cleaned_df = cleaned_df.filter(
        (col("OrderDate").isNotNull()) &
        (col("Sales").isNotNull()) & (col("Sales") >= 0) &
        (col("QuantityOrdered").isNotNull()) & (col("QuantityOrdered") > 0) &
        (col("MSRP").isNotNull()) & (col("MSRP") > 0) # MSRP must be positive for discount calculation
    )
    logging.info(f"Filtered out records with invalid dates or critical missing/invalid numeric values. Remaining records: {cleaned_df.count()}")

    # Dynamically add and calculate 'DISCOUNT' and 'COMMISSION' if not present
    if "DISCOUNT" not in cleaned_df.columns:
        logging.info("Calculating 'DISCOUNT' column from 'SALES', 'QUANTITYORDERED', and 'MSRP'.")
        # Calculate discount: 1 - (actual_price_per_unit / msrp)
        # Add a check to avoid division by zero and handle cases where sales might be higher than MSRP implies
        # Use greatest to ensure discount is not negative
        cleaned_df = cleaned_df.withColumn(
            "DISCOUNT",
            lit(1) - (col("SALES") / (col("QUANTITYORDERED") * col("MSRP")))
        ).withColumn(
            "DISCOUNT",
            # Cap discount at 0 if the calculated value is negative (meaning sales price was higher than MSRP)
            (col("DISCOUNT")).cast(DoubleType())
        )
        # Ensure discount is not less than 0, as a negative discount doesn't make sense in this context.
        cleaned_df = cleaned_df.withColumn("DISCOUNT", greatest(lit(0.0), col("DISCOUNT")))
        
    if "COMMISSION" not in cleaned_df.columns:
        logging.warning("'COMMISSION' column not found. Adding with default value 0.0.")
        cleaned_df = cleaned_df.withColumn("COMMISSION", lit(0.0).cast(DoubleType()))

    logging.info(f"Adding 'Profit' column with assumed profit margin rate of {profit_margin_rate * 100}%.")
    # Profit calculation now uses the newly derived DISCOUNT
    cleaned_df = cleaned_df.withColumn("Profit", col("Sales") * (1 - col("DISCOUNT")) * lit(profit_margin_rate))
    
    logging.info("Data cleaning and validation complete. Added 'Profit' and calculated 'DISCOUNT' column.")

    cleaned_df.cache()
    logging.info("Cleaned DataFrame cached.")
    return cleaned_df

cleaned_df = clean_and_validate_data(raw_df, PROFIT_MARGIN_RATE)
cleaned_df.show(5)
cleaned_df.printSchema()


2025-06-22 23:33:07,022 - INFO - Starting data cleaning and validation...
2025-06-22 23:33:23,485 - INFO - Checking for and removing duplicate records... 
2025-06-22 23:33:24,947 - INFO - No duplicate rows found.                       
2025-06-22 23:33:26,127 - INFO - Filtered out records with invalid dates or critical missing/invalid numeric values. Remaining records: 2823
2025-06-22 23:33:26,131 - INFO - Calculating 'DISCOUNT' column from 'SALES', 'QUANTITYORDERED', and 'MSRP'.
2025-06-22 23:33:26,198 - INFO - Adding 'Profit' column with assumed profit margin rate of 30.0%.
2025-06-22 23:33:26,227 - INFO - Data cleaning and validation complete. Added 'Profit' and calculated 'DISCOUNT' column.
2025-06-22 23:33:26,269 - INFO - Cleaned DataFrame cached.

+-----------+----------+-------+------------+---------------+---------+-------+--------+----+-----------+--------------------+------------+--------------------+------------+------------+-----+----------+---------+---------+---------------+----------------+------+--------+-------+---------------+-------------------+----------+-----------------+
|OrderNumber| OrderDate| Status| ProductLine|QuantityOrdered|PriceEach|  Sales|DealSize|MSRP|ProductCode|        CustomerName|       Phone|        AddressLine1|AddressLine2|        City|State|PostalCode|  Country|Territory|ContactLastName|ContactFirstName|QTR_ID|MONTH_ID|YEAR_ID|OrderLineNumber|           DISCOUNT|COMMISSION|           Profit|
+-----------+----------+-------+------------+---------------+---------+-------+--------+----+-----------+--------------------+------------+--------------------+------------+------------+-----+----------+---------+---------+---------------+----------------+------+--------+-------+---------------+------------

                                                                                

In [122]:
def run_reporting_queries(cleaned_df, sales_threshold):
    """
    Runs reporting queries using Spark SQL on the cleaned DataFrame.
    """
    logging.info("Starting execution of reporting queries...")

    # Register the cleaned DataFrame as a temporary SQL view
    cleaned_df.createOrReplaceTempView("sales_data")
    logging.info("Cleaned data registered as temporary view 'sales_data' for Spark SQL.")

    # 6.1. Sales greater than a threshold
    logging.info(f"Running Reporting Query: Sales > {sales_threshold}...")
    sales_above_threshold = spark.sql(f"""
        SELECT
            OrderNumber,
            OrderDate,
            CustomerName,
            ProductLine,
            Sales
        FROM
            sales_data
        WHERE
            Sales > {sales_threshold}
        ORDER BY
            Sales DESC
    """)
    sales_above_threshold.show(10)
    logging.info(f"Sales above {sales_threshold} query executed.")

    # 6.2. Profitable categories (ProductLine as Category) - having total profit > 0
    logging.info("Running Reporting Query: Profitable Categories...")
    profitable_categories = spark.sql("""
        SELECT
            ProductLine,
            ROUND(SUM(Profit), 2) AS TotalProfit
        FROM
            sales_data
        GROUP BY
            ProductLine
        HAVING
            SUM(Profit) > 0
        ORDER BY
            TotalProfit DESC
    """)
    profitable_categories.show()
    logging.info("Profitable Categories query executed.")

    # 6.3. Customers with high discount usage (e.g., avg discount > 0.10)
    logging.info("Running Reporting Query: Customers with High Discount Usage...")
    customers_high_discount = spark.sql("""
        SELECT
            CustomerName,
            ROUND(AVG(DISCOUNT), 4) AS AverageDiscountUsed,
            ROUND(SUM(Sales), 2) AS TotalSales
        FROM
            sales_data
        GROUP BY
            CustomerName
        HAVING
            AVG(DISCOUNT) > 0.10
        ORDER BY
            AverageDiscountUsed DESC
    """)
    customers_high_discount.show(10)
    logging.info("Customers with High Discount Usage query executed.")

    return {
        "sales_above_threshold": sales_above_threshold,
        "profitable_categories": profitable_categories,
        "customers_high_discount": customers_high_discount
    }

reporting_queries = run_reporting_queries(cleaned_df, SALES_THRESHOLD)




2025-06-22 23:33:40,217 - INFO - Starting execution of reporting queries...
2025-06-22 23:33:40,233 - INFO - Cleaned data registered as temporary view 'sales_data' for Spark SQL.
2025-06-22 23:33:40,238 - INFO - Running Reporting Query: Sales > 5000...
2025-06-22 23:33:41,249 - INFO - Sales above 5000 query executed.               
2025-06-22 23:33:41,250 - INFO - Running Reporting Query: Profitable Categories...


+-----------+----------+--------------------+------------+-------+
|OrderNumber| OrderDate|        CustomerName| ProductLine|  Sales|
+-----------+----------+--------------------+------------+-------+
|      10407|2005-04-22|The Sharp Gifts W...|Vintage Cars|14082.8|
|      10322|2004-11-04|Online Diecast Cr...|Vintage Cars|12536.5|
|      10424|2005-05-31|Euro Shopping Cha...|Classic Cars|12001.0|
|      10412|2005-05-03|Euro Shopping Cha...|Classic Cars|11887.8|
|      10403|2005-04-08|UK Collectables, ...| Motorcycles|11886.6|
|      10405|2005-04-14|         Mini Caravy|Classic Cars|11739.7|
|      10312|2004-10-21|Mini Gifts Distri...|Classic Cars|11623.7|
|      10333|2004-11-18|     Mini Wheels Co.|Vintage Cars|11336.7|
|      10127|2003-06-03|  Muscle Machine Inc|Classic Cars|11279.2|
|      10150|2003-09-19|Dragon Souveniers...|Classic Cars|10993.5|
+-----------+----------+--------------------+------------+-------+
only showing top 10 rows


2025-06-22 23:33:43,867 - INFO - Profitable Categories query executed.          
2025-06-22 23:33:43,868 - INFO - Running Reporting Query: Customers with High Discount Usage...


+----------------+-----------+
|     ProductLine|TotalProfit|
+----------------+-----------+
|    Classic Cars| 1096862.82|
|    Vintage Cars|  543582.78|
|     Motorcycles|   333088.1|
|Trucks and Buses|  321266.23|
|          Planes|  277960.61|
|           Ships|  204820.59|
|          Trains|   65606.82|
+----------------+-----------+



2025-06-22 23:33:46,714 - INFO - Customers with High Discount Usage query executed.


+--------------------+-------------------+----------+
|        CustomerName|AverageDiscountUsed|TotalSales|
+--------------------+-------------------+----------+
|     Lyon Souveniers|              0.146|  78570.34|
|Anna's Decoration...|             0.1372| 153996.13|
|Motor Mint Distri...|             0.1343|  83682.16|
|  Reims Collectables|             0.1215| 135042.94|
|     Vitachrome Inc.|             0.1212|  88041.26|
|   Toys4GrownUps.com|             0.1205| 104561.96|
|      AV Stores, Co.|             0.1163| 157807.81|
| Suominen Souveniers|             0.1111| 113961.15|
|  Baane Mini Imports|             0.1085| 116599.19|
|  Signal Gift Stores|             0.1049|  82751.08|
+--------------------+-------------------+----------+
only showing top 10 rows


In [123]:
# Combine all DataFrames for storage
all_output_dfs = {**aggregations, **reporting_queries}

# --- Store to SQLite ---
def store_output_to_sqlite(dataframes_to_store, db_name):
    """
    Stores a dictionary of PySpark DataFrames as tables in a local SQLite database.
    """
    logging.info(f"Storing transformed outputs to SQLite database: {db_name}...")
    if os.path.exists(db_name):
        os.remove(db_name)
        logging.info(f"Removed existing database file: {db_name}")

    conn = None
    try:
        conn = sqlite3.connect(db_name)
        for table_name, df in dataframes_to_store.items():
            logging.info(f"Storing '{table_name}' table...")
            df.toPandas().to_sql(table_name, conn, if_exists="replace", index=False)
            logging.info(f"Stored '{table_name}' table.")
        logging.info("All transformed outputs stored successfully in SQLite.")
    except Exception as e:
        logging.error(f"Error storing data to SQLite: {e}", exc_info=True)
    finally:
        if conn:
            conn.close()
            logging.info("SQLite connection closed.")

store_output_to_sqlite(all_output_dfs, DB_NAME)

# --- Store to Parquet ---
def store_output_to_parquet(dataframes_to_store, output_base_path):
    """
    Stores a dictionary of PySpark DataFrames as partitioned Parquet files.
    """
    logging.info(f"Storing transformed outputs as partitioned Parquet files to '{output_base_path}'...")
    os.makedirs(output_base_path, exist_ok=True)

    for table_name, df in dataframes_to_store.items():
        output_path = os.path.join(output_base_path, table_name)
        logging.info(f"Storing '{table_name}' to Parquet at '{output_path}'...")
        try:
            if "OrderDate" in df.columns:
                df_with_ym = df.withColumn("year", year(col("OrderDate"))) \
                               .withColumn("month", month(col("OrderDate")))
                df_with_ym.write.mode("overwrite").partitionBy("year", "month").parquet(output_path)
            elif "SalesYear" in df.columns and "SalesMonth" in df.columns:
                df.write.mode("overwrite").partitionBy("SalesYear", "SalesMonth").parquet(output_path)
            else:
                df.write.mode("overwrite").parquet(output_path)
            logging.info(f"Stored '{table_name}' to Parquet successfully.")
        except Exception as e:
                logging.error(f"Error storing '{table_name}' to Parquet: {e}", exc_info=True)

store_output_to_parquet(all_output_dfs, PARQUET_OUTPUT_DIR)

# --- Generate Sample JSON for Dashboard (Optional - for web app consumption) ---
# For the interactive dashboard, we'll convert a few key dataframes to JSON.
# In a real application, a backend API would serve this data.
# For this demonstration, we'll simulate by hardcoding a snippet of the JSON data
# directly into the React component for quick preview.
# However, if you were to deploy this, you'd save these to static files or a database
# that your frontend can query.

logging.info("Generating sample JSON data for dashboard preview...")
# Example: Convert monthly_sales_by_region to JSON
monthly_sales_json = aggregations["monthly_sales_by_region"].limit(50).toPandas().to_json(orient="records", indent=2)
top_customers_json = aggregations["top_10_customers_by_profit"].toPandas().to_json(orient="records", indent=2)
category_discount_json = aggregations["category_wise_avg_discount"].toPandas().to_json(orient="records", indent=2)
profitable_categories_json = reporting_queries["profitable_categories"].toPandas().to_json(orient="records", indent=2)


2025-06-22 23:33:46,776 - INFO - Storing transformed outputs to SQLite database: sales_analytics.db...
2025-06-22 23:33:46,781 - INFO - Removed existing database file: sales_analytics.db
2025-06-22 23:33:46,784 - INFO - Storing 'monthly_sales_by_region' table...
2025-06-22 23:33:50,104 - INFO - Stored 'monthly_sales_by_region' table.        
2025-06-22 23:33:50,106 - INFO - Storing 'top_10_customers_by_profit' table...
2025-06-22 23:33:53,297 - INFO - Stored 'top_10_customers_by_profit' table.     
2025-06-22 23:33:53,298 - INFO - Storing 'category_wise_avg_discount' table...
2025-06-22 23:33:56,179 - INFO - Stored 'category_wise_avg_discount' table.     
2025-06-22 23:33:56,180 - INFO - Storing 'sales_above_threshold' table...
2025-06-22 23:33:58,701 - INFO - Stored 'sales_above_threshold' table.          
2025-06-22 23:33:58,702 - INFO - Storing 'profitable_categories' table...
2025-06-22 23:34:00,604 - INFO - Stored 'profitable_categories' table.          
2025-06-22 23:34:00,605 - 

In [124]:
print(monthly_sales_json[:500]) # Print first 500 characters

[
  {
    "SalesYear":2003,
    "SalesMonth":1,
    "Country":"Norway",
    "TotalSales":54702.0
  },
  {
    "SalesYear":2003,
    "SalesMonth":1,
    "Country":"Spain",
    "TotalSales":44621.96
  },
  {
    "SalesYear":2003,
    "SalesMonth":1,
    "Country":"USA",
    "TotalSales":18997.3
  },
  {
    "SalesYear":2003,
    "SalesMonth":1,
    "Country":"Germany",
    "TotalSales":11432.34
  },
  {
    "SalesYear":2003,
    "SalesMonth":2,
    "Country":"Denmark",
    "TotalSales":58871.11
  


In [125]:
print("\n--- Top Customers JSON (Sample) ---")
print(top_customers_json)


--- Top Customers JSON (Sample) ---
[
  {
    "CustomerName":"Euro Shopping Channel",
    "TotalProfit":256053.9
  },
  {
    "CustomerName":"Mini Gifts Distributors Ltd.",
    "TotalProfit":184647.06
  },
  {
    "CustomerName":"Australian Collectors, Co.",
    "TotalProfit":57478.22
  },
  {
    "CustomerName":"Muscle Machine Inc",
    "TotalProfit":57374.35
  },
  {
    "CustomerName":"La Rochelle Gifts",
    "TotalProfit":50583.19
  },
  {
    "CustomerName":"Dragon Souveniers, Ltd.",
    "TotalProfit":49395.37
  },
  {
    "CustomerName":"Land of Toys Inc.",
    "TotalProfit":46048.63
  },
  {
    "CustomerName":"The Sharp Gifts Warehouse",
    "TotalProfit":45629.82
  },
  {
    "CustomerName":"AV Stores, Co.",
    "TotalProfit":43639.61
  },
  {
    "CustomerName":"Corporate Gift Ideas Co.",
    "TotalProfit":43336.81
  }
]


In [126]:
print("\n--- Category Discount JSON (Sample) ---")
print(category_discount_json)


--- Category Discount JSON (Sample) ---
[
  {
    "ProductLine":"Classic Cars",
    "AverageDiscount":0.0868
  },
  {
    "ProductLine":"Trucks and Buses",
    "AverageDiscount":0.0692
  },
  {
    "ProductLine":"Vintage Cars",
    "AverageDiscount":0.0633
  },
  {
    "ProductLine":"Motorcycles",
    "AverageDiscount":0.0614
  },
  {
    "ProductLine":"Planes",
    "AverageDiscount":0.0614
  },
  {
    "ProductLine":"Ships",
    "AverageDiscount":0.0531
  },
  {
    "ProductLine":"Trains",
    "AverageDiscount":0.0476
  }
]


In [127]:
print("\n--- Profitable Categories JSON (Sample) ---")
print(profitable_categories_json)


--- Profitable Categories JSON (Sample) ---
[
  {
    "ProductLine":"Classic Cars",
    "TotalProfit":1096862.8200000001
  },
  {
    "ProductLine":"Vintage Cars",
    "TotalProfit":543582.78
  },
  {
    "ProductLine":"Motorcycles",
    "TotalProfit":333088.1
  },
  {
    "ProductLine":"Trucks and Buses",
    "TotalProfit":321266.23
  },
  {
    "ProductLine":"Planes",
    "TotalProfit":277960.61
  },
  {
    "ProductLine":"Ships",
    "TotalProfit":204820.59
  },
  {
    "ProductLine":"Trains",
    "TotalProfit":65606.82
  }
]


In [128]:
spark.stop()
logging.info("Spark Session stopped.")

2025-06-22 23:34:23,601 - INFO - Spark Session stopped.
