In [19]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [20]:
# 🔧 SOLUTION 1: Check if Spark session exists, create if needed
def get_spark_session():
    try:
        # Try to get existing active session
        spark = SparkSession.getActiveSession()
        if spark is None:
            # Create new session if none exists
            spark = SparkSession.builder \
                .appName("OrderDataAnalysis") \
                .getOrCreate()
        return spark
    except:
        # If anything goes wrong, create fresh session
        spark = SparkSession.builder \
            .appName("OrderDataAnalysis") \
            .getOrCreate()
        return spark

In [None]:
# Get or create Spark session
spark = get_spark_session()

In [22]:
print("✅ Spark Session Status:")
print(f"   App Name: {spark.sparkContext.appName}")
print(f"   Master: {spark.sparkContext.master}")
print(f"   Spark Version: {spark.version}")
print(f"   Active: {not spark.sparkContext._jsc.sc().isStopped()}")

✅ Spark Session Status:
   App Name: OrderDataAnalysis
   Master: yarn
   Spark Version: 3.5.3
   Active: True


In [23]:
# Define GCS paths using gs:// format (more efficient than https://)
input_file_path = "gs://my-dataproc-data-bucket-051203/data/order_data.csv"
output_path = "gs://my-dataproc-data-bucket-051203/output/product_sales_insights"

In [24]:
print("=== Reading Order Data from GCS ===")

try:
    # Read CSV file from GCS
    print(f"Reading data from: {input_file_path}")
    
    df = spark.read \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .option("timestampFormat", "yyyy-MM-dd") \
        .csv(input_file_path)
    
    print("✅ Data successfully loaded from GCS!")
    
    # Display basic information about the dataset
    print(f"\n📊 Dataset Overview:")
    print(f"Total number of records: {df.count()}")
    print(f"Number of columns: {len(df.columns)}")
    
    # Show schema
    print(f"\n📋 Data Schema:")
    df.printSchema()
    
    # Show column names
    print(f"\n📝 Column Names:")
    for i, col_name in enumerate(df.columns, 1):
        print(f"{i}. {col_name}")
    
    # Display first few rows
    print(f"\n🔍 First 10 rows of data:")
    df.show(10, truncate=False)
    
    # Check for any null values
    print(f"\n🔍 Null Value Check:")
    null_counts = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])
    null_counts.show()
    
    # Basic statistics for numerical columns
    print(f"\n📈 Basic Statistics:")
    df.describe().show()
    
    # Data types verification
    print(f"\n🏷️ Data Types:")
    for field in df.schema.fields:
        print(f"{field.name}: {field.dataType}")
    
    # Check unique values in categorical columns
    print(f"\n🔢 Unique Value Counts:")
    categorical_columns = ['CustomerID', 'ProductID', 'ProductName', 'Country']
    
    for column in categorical_columns:
        if column in df.columns:
            unique_count = df.select(column).distinct().count()
            print(f"{column}: {unique_count} unique values")
    
    # Sample data validation
    print(f"\n✅ Data Quality Checks:")
    
    # Check for duplicate rows
    total_rows = df.count()
    distinct_rows = df.distinct().count()
    duplicates = total_rows - distinct_rows
    print(f"Duplicate rows: {duplicates}")
    
    # Check date range
    if 'OrderDate' in df.columns:
        date_range = df.select(min('OrderDate').alias('min_date'), max('OrderDate').alias('max_date')).collect()[0]
        print(f"Date range: {date_range['min_date']} to {date_range['max_date']}")
    
    # Check for negative quantities or prices
    if 'Quantity' in df.columns:
        negative_qty = df.filter(col('Quantity') < 0).count()
        print(f"Negative quantities: {negative_qty}")
    
    if 'UnitPrice' in df.columns:
        negative_price = df.filter(col('UnitPrice') < 0).count()
        print(f"Negative prices: {negative_price}")
    
    # Show some sample records for each country
    print(f"\n🌍 Sample records by Country:")
    if 'Country' in df.columns:
        countries = df.select('Country').distinct().collect()
        for row in countries[:5]:  # Show first 5 countries
            country = row['Country']
            print(f"\n--- {country} ---")
            df.filter(col('Country') == country).show(3, truncate=False)
    
    print(f"\n🎉 Data reading completed successfully!")
    print(f"The dataset is now loaded in DataFrame 'df' and ready for analysis.")
    print(f"Next steps: You can now perform analysis and save results to: {output_path}")
    
except Exception as e:
    print(f"❌ Error reading data: {str(e)}")
    print("Please check:")
    print("1. File path is correct")
    print("2. File exists in the GCS bucket")
    print("3. Dataproc cluster has access to the bucket")
    print("4. CSV file format is correct")

=== Reading Order Data from GCS ===
Reading data from: gs://my-dataproc-data-bucket-051203/data/order_data.csv


                                                                                

✅ Data successfully loaded from GCS!

📊 Dataset Overview:


                                                                                

Total number of records: 22
Number of columns: 8

📋 Data Schema:
root
 |-- OrderID: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- ProductID: string (nullable = true)
 |-- ProductName: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- OrderDate: date (nullable = true)
 |-- Country: string (nullable = true)


📝 Column Names:
1. OrderID
2. CustomerID
3. ProductID
4. ProductName
5. Quantity
6. UnitPrice
7. OrderDate
8. Country

🔍 First 10 rows of data:
+-------+----------+---------+----------------+--------+---------+----------+---------+
|OrderID|CustomerID|ProductID|ProductName     |Quantity|UnitPrice|OrderDate |Country  |
+-------+----------+---------+----------------+--------+---------+----------+---------+
|ORD001 |CUST101   |PROD001  |Laptop A        |1       |1200.0   |2024-01-15|USA      |
|ORD001 |CUST101   |PROD003  |Mouse XYZ       |2       |25.5     |2024-01-15|USA      |
|ORD002 |CUST102   

                                                                                

+-------+----------+---------+-----------+--------+---------+---------+-------+
|OrderID|CustomerID|ProductID|ProductName|Quantity|UnitPrice|OrderDate|Country|
+-------+----------+---------+-----------+--------+---------+---------+-------+
|      0|         0|        0|          0|       0|        0|        0|      0|
+-------+----------+---------+-----------+--------+---------+---------+-------+


📈 Basic Statistics:


25/06/25 15:30:06 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+-------+-------+----------+---------+----------------+------------------+------------------+---------+
|summary|OrderID|CustomerID|ProductID|     ProductName|          Quantity|         UnitPrice|  Country|
+-------+-------+----------+---------+----------------+------------------+------------------+---------+
|  count|     22|        22|       22|              22|                22|                22|       22|
|   mean|   NULL|      NULL|     NULL|            NULL|1.1818181818181819|362.11090909090905|     NULL|
| stddev|   NULL|      NULL|     NULL|            NULL|0.5010810823432181| 407.0944619514188|     NULL|
|    min| ORD001|   CUST101|  PROD001|External SSD 1TB|                 1|              25.5|Australia|
|    max| ORD017|   CUST110|  PROD010|       Webcam HD|                 3|            1200.0|      USA|
+-------+-------+----------+---------+----------------+------------------+------------------+---------+


🏷️ Data Types:
OrderID: StringType()
CustomerID: StringType()


[Stage 12:>                                                         (0 + 1) / 1]

CustomerID: 10 unique values


                                                                                

ProductID: 10 unique values
ProductName: 10 unique values
Country: 7 unique values

✅ Data Quality Checks:
Duplicate rows: 0
Date range: 2024-01-15 to 2024-01-31
Negative quantities: 0
Negative prices: 0

🌍 Sample records by Country:

--- Germany ---
+-------+----------+---------+------------+--------+---------+----------+-------+
|OrderID|CustomerID|ProductID|ProductName |Quantity|UnitPrice|OrderDate |Country|
+-------+----------+---------+------------+--------+---------+----------+-------+
|ORD008 |CUST106   |PROD010  |Smartwatch X|1       |299.99   |2024-01-22|Germany|
+-------+----------+---------+------------+--------+---------+----------+-------+


--- Canada ---
+-------+----------+---------+----------------+--------+---------+----------+-------+
|OrderID|CustomerID|ProductID|ProductName     |Quantity|UnitPrice|OrderDate |Country|
+-------+----------+---------+----------------+--------+---------+----------+-------+
|ORD002 |CUST102   |PROD002  |Smartphone B    |1       |850.0   

In [25]:
# Create calculated columns
print("\n2. Creating calculated columns...")
df_enriched = df.withColumn("TotalAmount", col("Quantity") * col("UnitPrice")) \
    .withColumn("OrderDate", to_date(col("OrderDate"), "yyyy-MM-dd")) \
    .withColumn("Year", year(col("OrderDate"))) \
    .withColumn("Month", month(col("OrderDate"))) \
    .withColumn("DayOfWeek", date_format(col("OrderDate"), "EEEE")) \
    .withColumn("WeekOfYear", weekofyear(col("OrderDate")))

# Cache the enriched dataframe for better performance
df_enriched.cache()

print("✅ Data enrichment completed!")


2. Creating calculated columns...
✅ Data enrichment completed!


In [26]:
# =================== ANALYSIS SECTION ===================

print("\n=== 📊 BUSINESS INSIGHTS ANALYSIS ===")

# 1. REVENUE ANALYSIS
print("\n1️⃣ REVENUE ANALYSIS")
print("-" * 50)

# Total business metrics
total_metrics = df_enriched.agg(
    sum("TotalAmount").alias("total_revenue"),
    count("*").alias("total_line_items"),
    countDistinct("OrderID").alias("total_orders"),
    countDistinct("CustomerID").alias("total_customers"),
    countDistinct("ProductID").alias("total_products"),
    avg("TotalAmount").alias("avg_line_value")
).collect()[0]

print(f"💰 Total Revenue: ${total_metrics['total_revenue']:,.2f}")
print(f"📦 Total Orders: {total_metrics['total_orders']:,}")
print(f"👥 Total Customers: {total_metrics['total_customers']:,}")
print(f"🛍️ Total Products: {total_metrics['total_products']:,}")
print(f"📋 Total Line Items: {total_metrics['total_line_items']:,}")
print(f"💵 Average Line Value: ${total_metrics['avg_line_value']:.2f}")


=== 📊 BUSINESS INSIGHTS ANALYSIS ===

1️⃣ REVENUE ANALYSIS
--------------------------------------------------
💰 Total Revenue: $8,117.94
📦 Total Orders: 17
👥 Total Customers: 10
🛍️ Total Products: 10
📋 Total Line Items: 22
💵 Average Line Value: $369.00


In [28]:
# Revenue by Country
print("\n🌍 Revenue Performance by Country:")
country_revenue = df_enriched.groupBy("Country") \
    .agg(
        sum("TotalAmount").alias("Revenue"),
        count("*").alias("LineItems"),
        countDistinct("OrderID").alias("Orders"),
        countDistinct("CustomerID").alias("Customers"),
        avg("TotalAmount").alias("AvgLineValue")
    ) \
    .withColumn("RevenuePercent", round((col("Revenue") / total_metrics['total_revenue']) * 100, 2)) \
    .orderBy(desc("Revenue"))

country_revenue.show(truncate=False)

# Monthly Revenue Trend
print("\n📈 Monthly Revenue Trend:")
monthly_revenue = df_enriched.groupBy("Year", "Month") \
    .agg(
        sum("TotalAmount").alias("MonthlyRevenue"),
        countDistinct("OrderID").alias("MonthlyOrders"),
        countDistinct("CustomerID").alias("ActiveCustomers")
    ) \
    .orderBy("Year", "Month")

monthly_revenue.show()


🌍 Revenue Performance by Country:
+---------+------------------+---------+------+---------+------------------+--------------+
|Country  |Revenue           |LineItems|Orders|Customers|AvgLineValue      |RevenuePercent|
+---------+------------------+---------+------+---------+------------------+--------------+
|USA      |3727.9799999999996|10       |6     |3        |372.79799999999994|45.92         |
|UK       |2020.0            |4        |3     |2        |505.0             |24.88         |
|Canada   |1220.0            |3        |3     |1        |406.6666666666667 |15.03         |
|Australia|499.98            |2        |2     |1        |249.99            |6.16          |
|Germany  |299.99            |1        |1     |1        |299.99            |3.7           |
|Japan    |199.99            |1        |1     |1        |199.99            |2.46          |
|France   |150.0             |1        |1     |1        |150.0             |1.85          |
+---------+------------------+---------+-----

In [29]:
# 2. CUSTOMER ANALYSIS
print("\n2️⃣ CUSTOMER ANALYSIS")
print("-" * 50)

# Customer behavior analysis
customer_analysis = df_enriched.groupBy("CustomerID") \
    .agg(
        sum("TotalAmount").alias("TotalSpent"),
        count("*").alias("ItemsPurchased"),
        countDistinct("OrderID").alias("OrderCount"),
        countDistinct("ProductID").alias("UniqueProducts"),
        countDistinct("Country").alias("Countries"),
        min("OrderDate").alias("FirstOrder"),
        max("OrderDate").alias("LastOrder"),
        avg("TotalAmount").alias("AvgLineValue")
    ) \
    .withColumn("CustomerLifeDays", datediff(col("LastOrder"), col("FirstOrder"))) \
    .withColumn("AvgOrderValue", col("TotalSpent") / col("OrderCount"))

# Customer segmentation based on spending
customer_stats = customer_analysis.agg(
    percentile_approx("TotalSpent", 0.8).alias("high_value_threshold"),
    percentile_approx("TotalSpent", 0.5).alias("medium_value_threshold")
).collect()[0]

customer_segments = customer_analysis.withColumn(
    "CustomerSegment",
    when(col("TotalSpent") >= customer_stats['high_value_threshold'], "High Value")
    .when(col("TotalSpent") >= customer_stats['medium_value_threshold'], "Medium Value")
    .otherwise("Low Value")
)

print("🏆 Top 10 Customers by Revenue:")
customer_segments.orderBy(desc("TotalSpent")).show(10, truncate=False)

print("\n📊 Customer Segmentation:")
segment_summary = customer_segments.groupBy("CustomerSegment") \
    .agg(
        count("*").alias("CustomerCount"),
        sum("TotalSpent").alias("SegmentRevenue"),
        avg("TotalSpent").alias("AvgSpendPerCustomer"),
        avg("OrderCount").alias("AvgOrdersPerCustomer")
    ) \
    .withColumn("RevenueShare", round((col("SegmentRevenue") / total_metrics['total_revenue']) * 100, 2))

segment_summary.show(truncate=False)


2️⃣ CUSTOMER ANALYSIS
--------------------------------------------------
🏆 Top 10 Customers by Revenue:
+----------+----------+--------------+----------+--------------+---------+----------+----------+-----------------+----------------+-----------------+---------------+
|CustomerID|TotalSpent|ItemsPurchased|OrderCount|UniqueProducts|Countries|FirstOrder|LastOrder |AvgLineValue     |CustomerLifeDays|AvgOrderValue    |CustomerSegment|
+----------+----------+--------------+----------+--------------+---------+----------+----------+-----------------+----------------+-----------------+---------------+
|CUST101   |2302.49   |6             |4         |5             |1        |2024-01-15|2024-01-27|383.7483333333333|12              |575.6225         |High Value     |
|CUST103   |1720.0    |3             |2         |3             |1        |2024-01-18|2024-01-30|573.3333333333334|12              |860.0            |High Value     |
|CUST108   |1249.99   |2             |1         |2             |1

In [30]:
# 3. PRODUCT ANALYSIS
print("\n3️⃣ PRODUCT ANALYSIS")
print("-" * 50)

# Product performance analysis
product_analysis = df_enriched.groupBy("ProductID", "ProductName") \
    .agg(
        sum("Quantity").alias("TotalQuantitySold"),
        sum("TotalAmount").alias("TotalRevenue"),
        count("*").alias("OrderFrequency"),
        countDistinct("CustomerID").alias("UniqueCustomers"),
        countDistinct("OrderID").alias("UniqueOrders"),
        avg("UnitPrice").alias("AvgUnitPrice"),
        min("UnitPrice").alias("MinPrice"),
        max("UnitPrice").alias("MaxPrice")
    ) \
    .withColumn("RevenueShare", round((col("TotalRevenue") / total_metrics['total_revenue']) * 100, 2)) \
    .withColumn("AvgQuantityPerOrder", col("TotalQuantitySold") / col("UniqueOrders"))

print("🏆 Top 10 Products by Revenue:")
product_analysis.orderBy(desc("TotalRevenue")).show(10, truncate=False)

print("📈 Top 10 Products by Quantity Sold:")
product_analysis.orderBy(desc("TotalQuantitySold")).show(10, truncate=False)

print("👥 Top 10 Products by Customer Reach:")
product_analysis.orderBy(desc("UniqueCustomers")).show(10, truncate=False)



3️⃣ PRODUCT ANALYSIS
--------------------------------------------------
🏆 Top 10 Products by Revenue:
+---------+----------------+-----------------+------------+--------------+---------------+------------+------------+--------+--------+------------+-------------------+
|ProductID|ProductName     |TotalQuantitySold|TotalRevenue|OrderFrequency|UniqueCustomers|UniqueOrders|AvgUnitPrice|MinPrice|MaxPrice|RevenueShare|AvgQuantityPerOrder|
+---------+----------------+-----------------+------------+--------------+---------------+------------+------------+--------+--------+------------+-------------------+
|PROD001  |Laptop A        |3                |3600.0      |3             |3              |3           |1200.0      |1200.0  |1200.0  |44.35       |1.0                |
|PROD002  |Smartphone B    |2                |1700.0      |2             |2              |2           |850.0       |850.0   |850.0   |20.94       |1.0                |
|PROD006  |Monitor 27in    |2                |600.0      

In [31]:
# 4. ORDER PATTERN ANALYSIS
print("\n4️⃣ ORDER PATTERN ANALYSIS")
print("-" * 50)

# Order-level analysis
order_analysis = df_enriched.groupBy("OrderID") \
    .agg(
        sum("TotalAmount").alias("OrderValue"),
        sum("Quantity").alias("TotalItems"),
        count("*").alias("ProductLines"),
        countDistinct("ProductID").alias("UniqueProducts"),
        first("CustomerID").alias("CustomerID"),
        first("OrderDate").alias("OrderDate"),
        first("Country").alias("Country")
    )

print("📊 Order Size Distribution:")
order_size_dist = order_analysis.groupBy("ProductLines") \
    .agg(
        count("*").alias("OrderCount"),
        avg("OrderValue").alias("AvgOrderValue")
    ) \
    .orderBy("ProductLines")

order_size_dist.show()

print("💰 Order Value Distribution by Country:")
country_order_analysis = order_analysis.groupBy("Country") \
    .agg(
        count("*").alias("OrderCount"),
        avg("OrderValue").alias("AvgOrderValue"),
        min("OrderValue").alias("MinOrderValue"),
        max("OrderValue").alias("MaxOrderValue"),
        avg("TotalItems").alias("AvgItemsPerOrder")
    ) \
    .orderBy(desc("AvgOrderValue"))

country_order_analysis.show(truncate=False)


4️⃣ ORDER PATTERN ANALYSIS
--------------------------------------------------
📊 Order Size Distribution:
+------------+----------+------------------+
|ProductLines|OrderCount|     AvgOrderValue|
+------------+----------+------------------+
|           1|        12|318.03833333333336|
|           2|         5| 860.2959999999999|
+------------+----------+------------------+

💰 Order Value Distribution by Country:


[Stage 133:>                                                        (0 + 1) / 1]

+---------+----------+-----------------+-------------+-------------+------------------+
|Country  |OrderCount|AvgOrderValue    |MinOrderValue|MaxOrderValue|AvgItemsPerOrder  |
+---------+----------+-----------------+-------------+-------------+------------------+
|UK       |3         |673.3333333333334|220.0        |1500.0       |1.3333333333333333|
|USA      |6         |621.3299999999999|76.5         |1251.0       |2.1666666666666665|
|Canada   |3         |406.6666666666667|150.0        |850.0        |1.0               |
|Germany  |1         |299.99           |299.99       |299.99       |1.0               |
|Australia|2         |249.99           |199.99       |299.99       |1.0               |
|Japan    |1         |199.99           |199.99       |199.99       |1.0               |
|France   |1         |150.0            |150.0        |150.0        |2.0               |
+---------+----------+-----------------+-------------+-------------+------------------+



                                                                                

In [32]:
# 5. TIME-BASED ANALYSIS
print("\n5️⃣ TIME-BASED ANALYSIS")
print("-" * 50)

print("📅 Daily Sales Performance:")
daily_analysis = df_enriched.groupBy("OrderDate") \
    .agg(
        sum("TotalAmount").alias("DailyRevenue"),
        countDistinct("OrderID").alias("DailyOrders"),
        countDistinct("CustomerID").alias("ActiveCustomers"),
        sum("Quantity").alias("ItemsSold")
    ) \
    .orderBy("OrderDate")

daily_analysis.show()

print("📊 Day of Week Performance:")
dow_analysis = df_enriched.groupBy("DayOfWeek") \
    .agg(
        sum("TotalAmount").alias("Revenue"),
        countDistinct("OrderID").alias("Orders"),
        avg("TotalAmount").alias("AvgLineValue")
    ) \
    .orderBy(desc("Revenue"))

dow_analysis.show()


5️⃣ TIME-BASED ANALYSIS
--------------------------------------------------
📅 Daily Sales Performance:
+----------+------------------+-----------+---------------+---------+
| OrderDate|      DailyRevenue|DailyOrders|ActiveCustomers|ItemsSold|
+----------+------------------+-----------+---------------+---------+
|2024-01-15|            1251.0|          1|              1|        3|
|2024-01-16|             850.0|          1|              1|        1|
|2024-01-17|124.99000000000001|          1|              1|        2|
|2024-01-18|            1500.0|          1|              1|        2|
|2024-01-19|            199.99|          1|              1|        1|
|2024-01-20|             175.5|          1|              1|        2|
|2024-01-21|             220.0|          1|              1|        1|
|2024-01-22|            299.99|          1|              1|        1|
|2024-01-23|             850.0|          1|              1|        1|
|2024-01-24|             150.0|          1|              

In [33]:
# 6. CROSS-SELL ANALYSIS
print("\n6️⃣ CROSS-SELL OPPORTUNITIES")
print("-" * 50)

# Products frequently bought together
cross_sell = df.alias("a").join(
    df.alias("b"),
    (col("a.OrderID") == col("b.OrderID")) & 
    (col("a.ProductID") < col("b.ProductID"))  # Avoid duplicates
) \
.groupBy(
    col("a.ProductName").alias("Product1"),
    col("b.ProductName").alias("Product2")
) \
.agg(count("*").alias("CoOccurrence")) \
.orderBy(desc("CoOccurrence"))

print("🔗 Top Product Combinations (Cross-sell Opportunities):")
cross_sell.show(15, truncate=False)


6️⃣ CROSS-SELL OPPORTUNITIES
--------------------------------------------------
🔗 Top Product Combinations (Cross-sell Opportunities):
+------------+----------------+------------+
|Product1    |Product2        |CoOccurrence|
+------------+----------------+------------+
|Keyboard Pro|Webcam HD       |1           |
|Laptop A    |Monitor 27in    |1           |
|Mouse XYZ   |External SSD 1TB|1           |
|Laptop A    |Mouse XYZ       |1           |
|Laptop A    |Webcam HD       |1           |
+------------+----------------+------------+



In [34]:
# =================== SAVE RESULTS ===================

print("\n=== 💾 SAVING ANALYSIS RESULTS TO GCS ===")

try:
    # 1. Save Country Analysis
    print("Saving country revenue analysis...")
    country_revenue.coalesce(1).write.mode("overwrite") \
        .option("header", "true") \
        .csv(f"{output_path}/country_analysis")
    
    # 2. Save Customer Segmentation
    print("Saving customer segmentation...")
    customer_segments.coalesce(1).write.mode("overwrite") \
        .option("header", "true") \
        .csv(f"{output_path}/customer_segmentation")
    
    # 3. Save Product Analysis
    print("Saving product performance analysis...")
    product_analysis.coalesce(1).write.mode("overwrite") \
        .option("header", "true") \
        .csv(f"{output_path}/product_performance")
    
    # 4. Save Order Analysis
    print("Saving order pattern analysis...")
    order_analysis.coalesce(1).write.mode("overwrite") \
        .option("header", "true") \
        .csv(f"{output_path}/order_patterns")
    
    # 5. Save Time Analysis
    print("Saving daily sales trends...")
    daily_analysis.coalesce(1).write.mode("overwrite") \
        .option("header", "true") \
        .csv(f"{output_path}/daily_sales_trends")
    
    # 6. Save Cross-sell Analysis
    print("Saving cross-sell opportunities...")
    cross_sell.coalesce(1).write.mode("overwrite") \
        .option("header", "true") \
        .csv(f"{output_path}/cross_sell_opportunities")
    
    # 7. Save Enriched Dataset (Parquet for efficiency)
    print("Saving enriched dataset as Parquet...")
    df_enriched.write.mode("overwrite") \
        .partitionBy("Country", "Year") \
        .parquet(f"{output_path}/enriched_order_data")
    
    # 8. Save Summary Statistics
    print("Creating and saving executive summary...")
    
    # Create summary dataframe
    summary_data = [
        ("Total Revenue", f"${total_metrics['total_revenue']:,.2f}"),
        ("Total Orders", f"{total_metrics['total_orders']:,}"),
        ("Total Customers", f"{total_metrics['total_customers']:,}"),
        ("Total Products", f"{total_metrics['total_products']:,}"),
        ("Average Order Value", f"${(total_metrics['total_revenue']/total_metrics['total_orders']):.2f}"),
        ("Top Country by Revenue", country_revenue.first()['Country']),
        ("Date Range", f"{df_enriched.agg(min('OrderDate')).collect()[0][0]} to {df_enriched.agg(max('OrderDate')).collect()[0][0]}")
    ]
    
    summary_df = spark.createDataFrame(summary_data, ["Metric", "Value"])
    summary_df.coalesce(1).write.mode("overwrite") \
        .option("header", "true") \
        .csv(f"{output_path}/executive_summary")
    
    print("\n🎉 ALL ANALYSIS RESULTS SAVED SUCCESSFULLY!")
    print(f"📂 Results Location: {output_path}/")
    print("\n📋 Files Created:")
    print("   ├── country_analysis/ (CSV)")
    print("   ├── customer_segmentation/ (CSV)")
    print("   ├── product_performance/ (CSV)")
    print("   ├── order_patterns/ (CSV)")  
    print("   ├── daily_sales_trends/ (CSV)")
    print("   ├── cross_sell_opportunities/ (CSV)")
    print("   ├── executive_summary/ (CSV)")
    print("   └── enriched_order_data/ (Parquet, partitioned)")
    
    print(f"\n✨ Analysis Complete! You can now access all insights from your GCS bucket.")
    
except Exception as e:
    print(f"❌ Error saving results: {str(e)}")



=== 💾 SAVING ANALYSIS RESULTS TO GCS ===
Saving country revenue analysis...


                                                                                

Saving customer segmentation...


                                                                                

Saving product performance analysis...


                                                                                

Saving order pattern analysis...


                                                                                

Saving daily sales trends...


                                                                                

Saving cross-sell opportunities...


                                                                                

Saving enriched dataset as Parquet...


                                                                                

Creating and saving executive summary...


                                                                                


🎉 ALL ANALYSIS RESULTS SAVED SUCCESSFULLY!
📂 Results Location: gs://my-dataproc-data-bucket-051203/output/product_sales_insights/

📋 Files Created:
   ├── country_analysis/ (CSV)
   ├── customer_segmentation/ (CSV)
   ├── product_performance/ (CSV)
   ├── order_patterns/ (CSV)
   ├── daily_sales_trends/ (CSV)
   ├── cross_sell_opportunities/ (CSV)
   ├── executive_summary/ (CSV)
   └── enriched_order_data/ (Parquet, partitioned)

✨ Analysis Complete! You can now access all insights from your GCS bucket.


In [35]:
# Clean up
df_enriched.unpersist()
spark.stop()

print("\n🔚 Spark session ended. Analysis pipeline completed successfully!")


🔚 Spark session ended. Analysis pipeline completed successfully!
