In [1]:
# Install OpenJDK 8 (required for Spark)
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
# Download Apache Spark 3.5.0 prebuilt with Hadoop 3
!wget -q https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz

In [3]:
# Verify Spark file
!ls -lh spark-3.5.0-bin-hadoop3.tgz

-rw-r--r-- 1 root root 382M Sep  9  2023 spark-3.5.0-bin-hadoop3.tgz


In [4]:
# Extract Spark archive
!tar -xzf spark-3.5.0-bin-hadoop3.tgz

In [5]:
# List extracted Spark files
!ls /content/spark-3.5.0-bin-hadoop3

bin   data	jars	    LICENSE   NOTICE  R		 RELEASE  yarn
conf  examples	kubernetes  licenses  python  README.md  sbin


In [6]:
# Install Python libraries
!pip install -q findspark
!pip install -q mysql-connector-python pyspark

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m33.9/33.9 MB[0m [31m55.8 MB/s[0m eta [36m0:00:00[0m
[?25h

In [7]:
import os

# Set Java and Spark environment paths
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"

# Initialize findspark
import findspark
findspark.init()


In [8]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().getOrCreate()

# On yarn:
# spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().master("yarn").getOrCreate()
# specify .master("yarn")

sc = spark.sparkContext

In [15]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from datetime import datetime

class SimpleRetailETL:
    def __init__(self, app_name="SimpleRetailETL"):
        """Initialize Spark session"""
        self.spark = SparkSession.builder \
            .appName(app_name) \
            .config("spark.sql.adaptive.enabled", "true") \
            .getOrCreate()

        # Define data schema
        self.schema = StructType([
            StructField("InvoiceNo", StringType(), True),
            StructField("StockCode", StringType(), True),
            StructField("Description", StringType(), True),
            StructField("Quantity", IntegerType(), True),
            StructField("InvoiceDate", StringType(), True),
            StructField("UnitPrice", DoubleType(), True),
            StructField("CustomerID", StringType(), True),
            StructField("Country", StringType(), True)
        ])
        self.analyzer = SimpleAnalyzer(self.spark)

    def load_csv_data(self, file_path):
        """Load data from CSV file"""
        df = self.spark.read \
            .option("header", "true") \
            .schema(self.schema) \
            .csv(file_path)

        print(f"Loaded {df.count()} records from {file_path}")
        return df

    def check_data_quality(self, df):
        """Basic data quality checks"""
        print("\n=== DATA QUALITY REPORT ===")

        # Total records
        total_records = df.count()
        print(f"Total records: {total_records}")

        # Check null values
        print("\nNull values per column:")
        for column in df.columns:
            null_count = df.filter(col(column).isNull()).count()
            if null_count > 0:
                print(f"  {column}: {null_count}")

        # Check duplicates
        duplicate_count = df.count() - df.dropDuplicates().count()
        print(f"\nDuplicate records: {duplicate_count}")

        # Check negative values
        negative_qty = df.filter(col("Quantity") < 0).count()
        negative_price = df.filter(col("UnitPrice") < 0).count()
        print(f"Negative quantities: {negative_qty}")
        print(f"Negative prices: {negative_price}")

        return df

    def clean_and_transform(self, df):
        """Clean and transform the data"""
        print("\nCleaning and transforming data...")

        # Step 1: Basic cleaning
        df_clean = df \
            .filter(col("InvoiceNo").isNotNull()) \
            .filter(col("StockCode").isNotNull()) \
            .filter(col("UnitPrice") >= 0) \
            .withColumn("Description", trim(upper(col("Description")))) \
            .withColumn("Country", trim(upper(col("Country"))))\
            .dropDuplicates()

        # Step 2: Parse dates
        df_with_dates = df_clean \
            .withColumn("InvoiceDateTime", to_timestamp(col("InvoiceDate"), "M/d/yyyy H:mm")) \
            .withColumn("InvoiceDate_parsed", to_date(col("InvoiceDateTime"))) \
            .withColumn("Year", year(col("InvoiceDateTime"))) \
            .withColumn("Month", month(col("InvoiceDateTime"))) \
            .withColumn("Day", dayofmonth(col("InvoiceDateTime"))) \
            .withColumn("Hour", hour(col("InvoiceDateTime"))) \
            .withColumn("DayOfWeek", dayofweek(col("InvoiceDateTime"))) \
            .drop("InvoiceDate")

        # Step 3: Calculate business metrics
        df_with_metrics = df_with_dates \
            .withColumn("TotalAmount", col("Quantity") * col("UnitPrice")) \
            .withColumn("IsReturn", col("Quantity") < 0) \
            .withColumn("IsCancellation", col("InvoiceNo").startswith("C")) \
            .withColumn("HasCustomerID", col("CustomerID").isNotNull()) \
            .withColumn("CustomerID_clean",
                       when(col("CustomerID").isNotNull(),
                            col("CustomerID").cast("integer")).otherwise(None))

        # Step 4: Product categories
        df_final = df_with_metrics \
            .withColumn("ProductCategory",
                       when(col("StockCode").rlike("^[0-9]+[A-Z]*$"), "Standard Product")
                       .when(col("StockCode") == "POST", "Postage")
                       .when(col("StockCode") == "D", "Discount")
                       .when(col("StockCode").rlike("^C[0-9]+"), "Cancellation")
                       .otherwise("Other"))

        print(f"Transformation complete. Final record count: {df_final.count()}")
        return df_final

    def create_summary_tables(self, df):
        """Create summary tables for analysis"""
        print("\nCreating summary tables...")

        # Daily sales summary
        daily_sales = df.filter(~col("IsReturn") & ~col("IsCancellation")) \
            .groupBy("InvoiceDate_parsed", "Country") \
            .agg(
                round(sum("TotalAmount"),2).alias("DailySales"),
                sum("Quantity").alias("ItemsSold"),
                countDistinct("InvoiceNo").alias("Orders"),
                countDistinct("CustomerID_clean").alias("Customers")
            ) \
            .orderBy("InvoiceDate_parsed")

        # Customer summary
        customer_summary = df.filter(col("HasCustomerID") & ~col("IsCancellation")) \
            .groupBy("CustomerID_clean", "Country") \
            .agg(
                round(sum("TotalAmount"),2).alias("TotalSpent"),
                sum("Quantity").alias("TotalItems"),
                countDistinct("InvoiceNo").alias("TotalOrders"),
                min("InvoiceDateTime").alias("FirstPurchase"),
                max("InvoiceDateTime").alias("LastPurchase")
            )

        # Product performance
        product_summary = df.filter(~col("IsCancellation")) \
            .groupBy("StockCode", "Description") \
            .agg(
                sum(when(~col("IsReturn"), col("TotalAmount")).otherwise(0)).alias("Revenue"),
                sum(when(~col("IsReturn"), col("Quantity")).otherwise(0)).alias("UnitsSold"),
                sum(when(col("IsReturn"), abs(col("Quantity"))).otherwise(0)).alias("UnitsReturned"),
                countDistinct("InvoiceNo").alias("OrderCount")
            ) \
            .orderBy(desc("Revenue"))

        # Country performance
        country_summary = df.filter(~col("IsCancellation")) \
            .groupBy("Country") \
            .agg(
                sum(when(~col("IsReturn"), col("TotalAmount")).otherwise(0)).alias("Revenue"),
                countDistinct("CustomerID_clean").alias("Customers"),
                countDistinct("InvoiceNo").alias("Orders")
            ) \
            .orderBy(desc("Revenue"))

        summaries = {
            "daily_sales": daily_sales,
            "customer_summary": customer_summary,
            "product_summary": product_summary,
            "country_summary": country_summary
        }

        print("Summary tables created successfully!")
        return summaries

    def save_data(self, df, summaries, output_path):
        """Save transformed data and summaries"""
        print(f"\nSaving data to {output_path}...")

        # Save main dataset
        df.write.mode("overwrite").parquet(f"{output_path}/transformed_data")

        # Save summary tables
        for name, summary_df in summaries.items():
            summary_df.write.mode("overwrite").parquet(f"{output_path}/summaries/{name}")

        print("Data saved successfully!")

    def show_sample_data(self, df, rows=10):
        """Display sample data"""
        print(f"\nSample data ({rows} rows):")
        df.show(rows, truncate=False)

        print("\nData schema:")
        df.printSchema()

    def show_summary_stats(self, summaries):
        """Display summary statistics"""
        print("\n=== SUMMARY STATISTICS ===")

        for name, summary_df in summaries.items():
            print(f"\n{name.upper().replace('_', ' ')}:")
            print("-" * 40)
            summary_df.show(5)

    def run_complete_pipeline(self, input_path, output_path):
        """Run the complete ETL pipeline"""
        start_time = datetime.now()
        print("Starting ETL Pipeline...")
        print(f"Input: {input_path}")
        print(f"Output: {output_path}")

        # Step 1: Load data
        raw_df = self.load_csv_data(input_path)

        # Step 2: Quality checks
        raw_df = self.check_data_quality(raw_df)

        # Step 3: Transform data
        transformed_df = self.clean_and_transform(raw_df)

        # Step 4: Create summaries
        summaries = self.create_summary_tables(transformed_df)

        # Step 5: Save data
        self.save_data(transformed_df, summaries, output_path)

        # Show results
        self.show_sample_data(transformed_df)
        self.show_summary_stats(summaries)

        # Pipeline completion
        end_time = datetime.now()
        duration = end_time - start_time

        print(f"\n=== PIPELINE COMPLETED ===")
        print(f"Duration: {duration}")
        print(f"Records processed: {transformed_df.count()}")

        return transformed_df, summaries

    def stop(self):
        """Stop Spark session"""
        self.spark.stop()
        print("Spark session stopped.")

# Simple usage example
def main():
    etl = None
    try:
        # Create ETL instance
        etl = SimpleRetailETL()

        # Set file paths
        input_file = "/content/Online Retail.csv"  # Change to your file path
        output_folder = "/content/retail_output"   # Change to your output path

        # Run the pipeline
        transformed_data, summaries = etl.run_complete_pipeline(input_file, output_folder)

        print("\nETL Pipeline completed successfully!")

    except Exception as e:
        print(f"Error: {e}")
    finally:
        if etl:
            etl.stop()

# Context manager for automatic cleanup
'''class ETLContext:
    def __init__(self, app_name="SimpleRetailETL"):
        self.app_name = app_name
        self.etl = None

    def __enter__(self):
        self.etl = SimpleRetailETL(self.app_name)
        return self.etl

    def __exit__(self, exc_type, exc_val, exc_tb):
        if self.etl:
            self.etl.stop()'''

# Recommended usage with context manager
'''def run_with_context():
    """Recommended way to run ETL with automatic cleanup"""
    input_file = "/content/Online Retail.csv"
    output_folder = "/content/retail_output"

    try:
        with ETLContext() as etl:
            transformed_data, summaries = etl.run_complete_pipeline(input_file, output_folder)
            print("Pipeline completed successfully!")

    except Exception as e:
        print(f"Pipeline failed: {e}")'''

# Additional analysis functions
class SimpleAnalyzer:
    """Simple data analysis functions"""

    @staticmethod
    def monthly_trends(spark, data_path):
        """Analyze monthly sales trends"""
        df = spark.read.parquet(data_path)

        monthly_sales = df.filter(~col("IsReturn")) \
            .groupBy("Year", "Month") \
            .agg(
                sum("TotalAmount").alias("MonthlySales"),
                countDistinct("CustomerID_clean").alias("ActiveCustomers")
            ) \
            .orderBy("Year", "Month")

        return monthly_sales

    @staticmethod
    def top_customers(spark, customer_summary_path, top_n=10):
        """Find top customers by spending"""
        df = spark.read.parquet(customer_summary_path)

        top_customers = df.orderBy(desc("TotalSpent")).limit(top_n)
        return top_customers

    @staticmethod
    def top_products(spark, product_summary_path, top_n=10):
        """Find top products by revenue"""
        df = spark.read.parquet(product_summary_path)

        top_products = df.orderBy(desc("Revenue")).limit(top_n)
        return top_products

if __name__ == "__main__":
    # Run the main function
    main()

    # Or use the context manager approach (recommended)
    #run_with_context()

Error: SimpleAnalyzer() takes no arguments


In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from datetime import datetime

class SimpleRetailETL:
    def __init__(self, app_name="SimpleRetailETL"):
        """Initialize Spark session"""
        self.spark = SparkSession.builder \
            .appName(app_name) \
            .config("spark.sql.adaptive.enabled", "true") \
            .getOrCreate()

        # Define data schema
        self.schema = StructType([
            StructField("InvoiceNo", StringType(), True),
            StructField("StockCode", StringType(), True),
            StructField("Description", StringType(), True),
            StructField("Quantity", IntegerType(), True),
            StructField("InvoiceDate", StringType(), True),
            StructField("UnitPrice", DoubleType(), True),
            StructField("CustomerID", StringType(), True),
            StructField("Country", StringType(), True)
        ])

        # Initialize analyzer
        self.analyzer = SimpleAnalyzer(self.spark)

    def load_csv_data(self, file_path):
        """Load data from CSV file"""
        df = self.spark.read \
            .option("header", "true") \
            .schema(self.schema) \
            .csv(file_path)

        print(f"Loaded {df.count()} records from {file_path}")
        return df

    def check_data_quality(self, df):
        """Basic data quality checks"""
        print("\n=== DATA QUALITY REPORT ===")

        # Total records
        total_records = df.count()
        print(f"Total records: {total_records}")

        # Check null values
        print("\nNull values per column:")
        for column in df.columns:
            null_count = df.filter(col(column).isNull()).count()
            if null_count > 0:
                print(f"  {column}: {null_count}")

        # Check duplicates
        duplicate_count = df.count() - df.dropDuplicates().count()
        print(f"\nDuplicate records: {duplicate_count}")

        # Check negative values
        negative_qty = df.filter(col("Quantity") < 0).count()
        negative_price = df.filter(col("UnitPrice") < 0).count()
        print(f"Negative quantities: {negative_qty}")
        print(f"Negative prices: {negative_price}")

        return df

    def clean_and_transform(self, df):
        """Clean and transform the data"""
        print("\nCleaning and transforming data...")

        # Step 1: Basic cleaning
        df_clean = df \
            .filter(col("InvoiceNo").isNotNull()) \
            .filter(col("StockCode").isNotNull()) \
            .filter(col("UnitPrice") >= 0) \
            .withColumn("Description", trim(upper(col("Description")))) \
            .withColumn("Country", trim(upper(col("Country"))))\
            .dropDuplicates()

        # Step 2: Parse dates
        df_with_dates = df_clean \
            .withColumn("InvoiceDateTime", to_timestamp(col("InvoiceDate"), "M/d/yyyy H:mm")) \
            .withColumn("InvoiceDate_parsed", to_date(col("InvoiceDateTime"))) \
            .withColumn("Year", year(col("InvoiceDateTime"))) \
            .withColumn("Month", month(col("InvoiceDateTime"))) \
            .withColumn("Day", dayofmonth(col("InvoiceDateTime"))) \
            .withColumn("Hour", hour(col("InvoiceDateTime"))) \
            .withColumn("DayOfWeek", dayofweek(col("InvoiceDateTime"))) \
            .drop("InvoiceDate")

        # Step 3: Calculate business metrics
        df_with_metrics = df_with_dates \
            .withColumn("TotalAmount", col("Quantity") * col("UnitPrice")) \
            .withColumn("IsReturn", col("Quantity") < 0) \
            .withColumn("IsCancellation", col("InvoiceNo").startswith("C")) \
            .withColumn("HasCustomerID", col("CustomerID").isNotNull()) \
            .withColumn("CustomerID_clean",
                       when(col("CustomerID").isNotNull(),
                            col("CustomerID").cast("integer")).otherwise(None))

        # Step 4: Product categories
        df_final = df_with_metrics \
            .withColumn("ProductCategory",
                       when(col("StockCode").rlike("^[0-9]+[A-Z]*$"), "Standard Product")
                       .when(col("StockCode") == "POST", "Postage")
                       .when(col("StockCode") == "D", "Discount")
                       .when(col("StockCode").rlike("^C[0-9]+"), "Cancellation")
                       .otherwise("Other"))

        print(f"Transformation complete. Final record count: {df_final.count()}")
        return df_final

    def create_summary_tables(self, df):
        """Create summary tables for analysis"""
        print("\nCreating summary tables...")

        # Daily sales summary
        daily_sales = df.filter(~col("IsReturn") & ~col("IsCancellation")) \
            .groupBy("InvoiceDate_parsed", "Country") \
            .agg(
                round(sum("TotalAmount"),2).alias("DailySales"),
                sum("Quantity").alias("ItemsSold"),
                countDistinct("InvoiceNo").alias("Orders"),
                countDistinct("CustomerID_clean").alias("Customers")
            ) \
            .orderBy("InvoiceDate_parsed")

        # Customer summary
        customer_summary = df.filter(col("HasCustomerID") & ~col("IsCancellation")) \
            .groupBy("CustomerID_clean", "Country") \
            .agg(
                round(sum("TotalAmount"),2).alias("TotalSpent"),
                sum("Quantity").alias("TotalItems"),
                countDistinct("InvoiceNo").alias("TotalOrders"),
                min("InvoiceDateTime").alias("FirstPurchase"),
                max("InvoiceDateTime").alias("LastPurchase")
            )

        # Product performance
        product_summary = df.filter(~col("IsCancellation")) \
            .groupBy("StockCode", "Description") \
            .agg(
                sum(when(~col("IsReturn"), col("TotalAmount")).otherwise(0)).alias("Revenue"),
                sum(when(~col("IsReturn"), col("Quantity")).otherwise(0)).alias("UnitsSold"),
                sum(when(col("IsReturn"), abs(col("Quantity"))).otherwise(0)).alias("UnitsReturned"),
                countDistinct("InvoiceNo").alias("OrderCount")
            ) \
            .orderBy(desc("Revenue"))

        # Country performance
        country_summary = df.filter(~col("IsCancellation")) \
            .groupBy("Country") \
            .agg(
                sum(when(~col("IsReturn"), col("TotalAmount")).otherwise(0)).alias("Revenue"),
                countDistinct("CustomerID_clean").alias("Customers"),
                countDistinct("InvoiceNo").alias("Orders")
            ) \
            .orderBy(desc("Revenue"))

        summaries = {
            "daily_sales": daily_sales,
            "customer_summary": customer_summary,
            "product_summary": product_summary,
            "country_summary": country_summary
        }

        print("Summary tables created successfully!")
        return summaries

    def save_data(self, df, summaries, output_path):
        """Save transformed data and summaries"""
        print(f"\nSaving data to {output_path}...")

        # Save main dataset
        df.write.mode("overwrite").parquet(f"{output_path}/transformed_data")

        # Save summary tables
        for name, summary_df in summaries.items():
            summary_df.write.mode("overwrite").parquet(f"{output_path}/summaries/{name}")

        print("Data saved successfully!")

    def show_sample_data(self, df, rows=10):
        """Display sample data"""
        print(f"\nSample data ({rows} rows):")
        df.show(rows, truncate=False)

        print("\nData schema:")
        df.printSchema()

    def show_summary_stats(self, summaries):
        """Display summary statistics"""
        print("\n=== SUMMARY STATISTICS ===")

        for name, summary_df in summaries.items():
            print(f"\n{name.upper().replace('_', ' ')}:")
            print("-" * 40)
            summary_df.show(5)

    def run_complete_pipeline(self, input_path, output_path):
        """Run the complete ETL pipeline with analysis"""
        start_time = datetime.now()
        print("Starting ETL Pipeline...")
        print(f"Input: {input_path}")
        print(f"Output: {output_path}")

        # Step 1: Load data
        raw_df = self.load_csv_data(input_path)

        # Step 2: Quality checks
        raw_df = self.check_data_quality(raw_df)

        # Step 3: Transform data
        transformed_df = self.clean_and_transform(raw_df)

        # Step 4: Create summaries
        summaries = self.create_summary_tables(transformed_df)

        # Step 5: Save data
        self.save_data(transformed_df, summaries, output_path)

        # Step 6: Show basic results
        self.show_sample_data(transformed_df)
        self.show_summary_stats(summaries)

        # Step 7: Run advanced analysis
        print("\n" + "="*50)
        print("RUNNING ADVANCED ANALYSIS")
        print("="*50)

        analysis_results = self.analyzer.run_comprehensive_analysis(
            transformed_df,
            summaries,
            output_path
        )

        # Pipeline completion
        end_time = datetime.now()
        duration = end_time - start_time

        print(f"\n=== PIPELINE COMPLETED ===")
        print(f"Duration: {duration}")
        print(f"Records processed: {transformed_df.count()}")

        return transformed_df, summaries, analysis_results

    def stop(self):
        """Stop Spark session"""
        self.spark.stop()
        print("Spark session stopped.")


class SimpleAnalyzer:
    """Enhanced data analysis class integrated with ETL pipeline"""

    def __init__(self, spark_session):
        self.spark = spark_session

    def monthly_trends(self, df):
        """Analyze monthly sales trends"""
        print("\n--- MONTHLY SALES TRENDS ---")

        monthly_sales = df.filter(~col("IsReturn") & ~col("IsCancellation")) \
            .groupBy("Year", "Month") \
            .agg(
                round(sum("TotalAmount"), 2).alias("MonthlySales"),
                countDistinct("CustomerID_clean").alias("ActiveCustomers"),
                countDistinct("InvoiceNo").alias("Orders"),
                sum("Quantity").alias("ItemsSold")
            ) \
            .withColumn("MonthYear", concat(col("Year"), lit("-"),
                       lpad(col("Month"), 2, "0"))) \
            .orderBy("Year", "Month")

        print("Monthly sales performance:")
        monthly_sales.show(12)


    def top_customers(self, customer_summary, top_n=10):
        """Find top customers by spending"""
        print(f"\n--- TOP {top_n} CUSTOMERS BY SPENDING ---")

        top_customers = customer_summary.orderBy(desc("TotalSpent")).limit(top_n)

        print("Top customers:")
        top_customers.show(top_n, truncate=False)


    def top_products(self, product_summary, top_n=10):
        """Find top products by revenue"""
        print(f"\n--- TOP {top_n} PRODUCTS BY REVENUE ---")

        top_products = product_summary.orderBy(desc("Revenue")).limit(top_n)

        print("Top products:")
        top_products.show(top_n, truncate=False)

        # Product analysis
        total_products = product_summary.count()

        product_performance = product_summary \
            .withColumn("ReturnRate",
                       round(col("UnitsReturned") /
                            (col("UnitsSold") + col("UnitsReturned")) * 100, 2)) \
            .withColumn("PerformanceTier",
                       when(col("Revenue") >= 10000, "Top Performer")
                       .when(col("Revenue") >= 1000, "Good Performer")
                       .otherwise("Low Performer"))

        performance_summary = product_performance \
            .groupBy("PerformanceTier") \
            .agg(
                count("*").alias("ProductCount"),
                round(sum("Revenue"), 2).alias("TotalRevenue"),
                round(avg("ReturnRate"), 2).alias("AvgReturnRate")
            ) \
            .withColumn("ProductPercentage",
                       round(col("ProductCount") / total_products * 100, 2))

        print("\nProduct performance tiers:")
        performance_summary.show()

        return top_products, product_performance

    def country_analysis(self, country_summary):
        """Analyze performance by country"""
        print("\n--- COUNTRY PERFORMANCE ANALYSIS ---")

        print("All countries performance:")
        country_summary.show()

        # Country insights
        country_insights = country_summary \
            .withColumn("RevenuePerCustomer",
                       round(col("Revenue") / col("Customers"), 2)) \
            .withColumn("RevenuePerOrder",
                       round(col("Revenue") / col("Orders"), 2)) \
            .orderBy(desc("Revenue"))

        print("\nCountry insights (Revenue per customer and order):")
        country_insights.show()

        return country_insights




    def save_analysis_results(self, analysis_results, output_path):
        """Save analysis results to parquet files"""
        print(f"\nSaving analysis results to {output_path}/analysis/...")

        try:
            for name, df in analysis_results.items():
                if df is not None:
                    df.write.mode("overwrite").parquet(f"{output_path}/analysis/{name}")
            print("Analysis results saved successfully!")
        except Exception as e:
            print(f"Error saving analysis results: {e}")

    def run_comprehensive_analysis(self, df, summaries, output_path):
        """Run all analysis functions"""
        analysis_results = {}

        try:
            # Monthly trends
            monthly_sales = self.monthly_trends(df)
            analysis_results["monthly_trends"] = monthly_sales

            # Top customers
            top_customers = self.top_customers(summaries["customer_summary"])
            analysis_results["top_customers"] = top_customers

            # Top products
            top_products, product_performance = self.top_products(summaries["product_summary"])
            analysis_results["top_products"] = top_products
            analysis_results["product_performance"] = product_performance

            # Country analysis
            country_insights = self.country_analysis(summaries["country_summary"])
            analysis_results["country_insights"] = country_insights

            # Save analysis results
            self.save_analysis_results(analysis_results, output_path)

            print("\n" + "="*50)
            print("COMPREHENSIVE ANALYSIS COMPLETED!")
            print("="*50)

        except Exception as e:
            print(f"Error during analysis: {e}")

        return analysis_results


# Context manager for automatic cleanup
class ETLContext:
    def __init__(self, app_name="SimpleRetailETL"):
        self.app_name = app_name
        self.etl = None

    def __enter__(self):
        self.etl = SimpleRetailETL(self.app_name)
        return self.etl

    def __exit__(self, exc_type, exc_val, exc_tb):
        if self.etl:
            self.etl.stop()


def main():
    """Main function to run the complete ETL and analysis pipeline"""
    etl = None
    try:
        # Create ETL instance
        etl = SimpleRetailETL()

        # Set file paths
        input_file = "/content/Online Retail.csv"  # Change to your file path
        output_folder = "/content/retail_output"   # Change to your output path

        # Run the complete pipeline with analysis
        transformed_data, summaries, analysis_results = etl.run_complete_pipeline(
            input_file, output_folder
        )

        print("\n" + "="*60)
        print("ETL PIPELINE AND ANALYSIS COMPLETED SUCCESSFULLY!")
        print("="*60)
        print(f"✓ Transformed data saved to: {output_folder}/transformed_data")
        print(f"✓ Summary tables saved to: {output_folder}/summaries/")
        print(f"✓ Analysis results saved to: {output_folder}/analysis/")

    except Exception as e:
        print(f"Pipeline Error: {e}")

    finally:
        if etl:
            etl.stop()


def run_with_context():
    """Recommended way to run ETL with automatic cleanup"""
    input_file = "/content/Online Retail.csv"
    output_folder = "/content/retail_output"

    try:
        with ETLContext() as etl:
            transformed_data, summaries, analysis_results = etl.run_complete_pipeline(
                input_file, output_folder
            )
            print("\nPipeline and analysis completed successfully!")

    except Exception as e:
        print(f"Pipeline failed: {e}")


if __name__ == "__main__":
    # Run the main function
    main()

    # Or use the context manager approach (recommended)
    # run_with_context()

Starting ETL Pipeline...
Input: /content/Online Retail.csv
Output: /content/retail_output
Loaded 541909 records from /content/Online Retail.csv

=== DATA QUALITY REPORT ===
Total records: 541909

Null values per column:
  Description: 1454
  CustomerID: 135080

Duplicate records: 5268
Negative quantities: 10624
Negative prices: 2

Cleaning and transforming data...
Transformation complete. Final record count: 536639

Creating summary tables...
Summary tables created successfully!

Saving data to /content/retail_output...
Data saved successfully!

Sample data (10 rows):
+---------+---------+----------------------------------+--------+---------+----------+--------------+-------------------+------------------+----+-----+---+----+---------+------------------+--------+--------------+-------------+----------------+----------------+
|InvoiceNo|StockCode|Description                       |Quantity|UnitPrice|CustomerID|Country       |InvoiceDateTime    |InvoiceDate_parsed|Year|Month|Day|Hour|Day