In [13]:
# -------------------------------------------------------------------------------
#  Wine Sales Analysis - Comparison of Processing with Pandas vs Pyspark on Colab
# -------------------------------------------------------------------------------
#
# ====================
#     PANDAS VERSION
# ====================

import pandas as pd
import time
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')

# 1. Load data
file_path = '/content/drive/MyDrive/wine_expanded2.csv' # adjust path if needed

df = pd.read_csv(file_path)
print(f"Original dataset shape: {df.shape}")

# Convert relevant columns to numeric, coercing errors
df['RETAIL SALES'] = pd.to_numeric(df['RETAIL SALES'], errors='coerce')
df['RETAIL TRANSFERS'] = pd.to_numeric(df['RETAIL TRANSFERS'], errors='coerce')

# 2. Clean data (drop rows with any NaN)
df_clean = df.dropna()
print(f"Cleaned dataset shape: {df_clean.shape}")

# ──────────────────────────────────────────────
# Analysis 1 - Total Retail Sales by Year (Pandas)
start = time.time()
sales_by_year = df_clean.groupby('YEAR')['RETAIL SALES'].sum()
print("\nTotal Retail Sales by Year (Pandas):")
print(sales_by_year)
print(f"Execution time: {time.time() - start:.4f} seconds\n")

# ──────────────────────────────────────────────
# Analysis 2 - Total Retail Transfers by Item Type (Pandas)
start = time.time()
transfers_by_type = df_clean.groupby('ITEM TYPE')['RETAIL TRANSFERS'].sum()
print("Total Retail Transfers by Item Type (Pandas):")
print(transfers_by_type)
print(f"Execution time: {time.time() - start:.4f} seconds\n")

# ──────────────────────────────────────────────
# Analysis 3 - Average Retail Sales per Supplier (Pandas)
start = time.time()
avg_sales_by_supplier = df_clean.groupby('SUPPLIER')['RETAIL SALES'].mean().sort_values(ascending=False)
print("Average Retail Sales per Supplier (top 20 shown):")
print(avg_sales_by_supplier.head(20))
print(f"Execution time: {time.time() - start:.4f} seconds\n")

# ──────────────────────────────────────────────
# Analysis 4 - Top 5 items by Retail Sales (Pandas)
start = time.time()
top_items = df_clean[['ITEM DESCRIPTION', 'RETAIL SALES']]\
                .sort_values('RETAIL SALES', ascending=False)\
                .head(10)
print("Top 10 items by Retail Sales (Pandas):")
print(top_items)
print(f"Execution time: {time.time() - start:.4f} seconds\n")

# ──────────────────────────────────────────────
# Analysis 5 - Total Retail Sales January 2018 (Pandas)
start = time.time()
jan_2018_sales = df_clean[(df_clean['YEAR'] == 2018) & (df_clean['MONTH'] == 1)]['RETAIL SALES'].sum()
print(f"Total Retail Sales - January 2018: {jan_2018_sales:,.2f}")
print(f"Execution time: {time.time() - start:.4f} seconds\n")


# ====================
#     SPARK VERSION
# ====================

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
import time

# Create Spark session
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("WineRetailAnalysis") \
    .getOrCreate()

# Load data
file_path = "/content/drive/MyDrive/wine_expanded2.csv"   # ← adjust if needed

df_spark = spark.read.option("header", "true") \
                     .csv(file_path, inferSchema=True)

# Convert relevant columns to numeric, handling commas and parentheses, and setting non-numeric to None
df_spark = df_spark.withColumn(
    "RETAIL SALES",
    F.when(
        F.regexp_replace(F.col("RETAIL SALES"), "[,()]", "").cast("string").rlike("^-?\d*\\.?\\d+$"),
        F.regexp_replace(F.col("RETAIL SALES"), "[,()]", "").cast(DoubleType())
    ).otherwise(F.lit(None))
)
df_spark = df_spark.withColumn(
    "RETAIL TRANSFERS",
    F.when(
        F.regexp_replace(F.col("RETAIL TRANSFERS"), "[,()]", "").cast("string").rlike("^-?\d*\\.?\\d+$"),
        F.regexp_replace(F.col("RETAIL TRANSFERS"), "[,()]", "").cast(DoubleType())
    ).otherwise(F.lit(None))
)

df_spark.printSchema()
print(f"Number of rows: {df_spark.count():,}, columns: {len(df_spark.columns)}")

# Clean data
df_clean_spark = df_spark.dropna()
print(f"Cleaned rows: {df_clean_spark.count():,}")

# ──────────────────────────────────────────────
# Analysis 1 - Total Retail Sales by Year (Spark)
start = time.time()
sales_by_year_sp = df_clean_spark.groupBy("YEAR") \
                                 .agg(F.sum("RETAIL SALES").alias("total_retail_sales")) \
                                 .orderBy("YEAR")
sales_by_year_sp.show()
print(f"Execution time: {time.time() - start:.4f} seconds\n")

# ──────────────────────────────────────────────
# Analysis 2 - Total Retail Transfers by Item Type (Spark)
start = time.time()
transfers_by_type_sp = df_clean_spark.groupBy("ITEM TYPE") \
                                     .agg(F.sum("RETAIL TRANSFERS").alias("total_transfers")) \
                                     .orderBy(F.desc("total_transfers"))
transfers_by_type_sp.show(truncate=False)
print(f"Execution time: {time.time() - start:.4f} seconds\n")

# ──────────────────────────────────────────────
# Analysis 3 - Average Retail Sales per Supplier (Spark)
start = time.time()
avg_sales_supplier_sp = df_clean_spark.groupBy("SUPPLIER") \
                                      .agg(F.avg("RETAIL SALES").alias("avg_retail_sales")) \
                                      .orderBy(F.desc("avg_retail_sales"))
avg_sales_supplier_sp.show(20, truncate=False)
print(f"Execution time: {time.time() - start:.4f} seconds\n")

# ──────────────────────────────────────────────
# Analysis 4 - Top 10 items by Retail Sales (Spark)
start = time.time()
top_items_sp = df_clean_spark.select("ITEM DESCRIPTION", "RETAIL SALES") \
                             .orderBy(F.desc("RETAIL SALES")) \
                             .limit(10)
top_items_sp.show(truncate=False)
print(f"Execution time: {time.time() - start:.4f} seconds\n")

# ──────────────────────────────────────────────
# Analysis 5 - Total Retail Sales January 2018 (Spark)
start = time.time()
jan2018_sp = df_clean_spark.filter((F.col("YEAR") == 2018) & (F.col("MONTH") == 1)) \
                           .agg(F.sum("RETAIL SALES").alias("total_jan2018")) \
                           .collect()[0][0]

print(f"Total Retail Sales - January 2018 (Spark): {jan2018_sp:,.2f}")
print(f"Execution time: {time.time() - start:.4f} seconds")

# Optional: stop Spark session when done
# spark.stop()^


  F.regexp_replace(F.col("RETAIL SALES"), "[,()]", "").cast("string").rlike("^-?\d*\\.?\\d+$"),
  F.regexp_replace(F.col("RETAIL TRANSFERS"), "[,()]", "").cast("string").rlike("^-?\d*\\.?\\d+$"),


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Original dataset shape: (307645, 9)
Cleaned dataset shape: (306315, 9)

Total Retail Sales by Year (Pandas):
YEAR
2017    672674.79
2018    153277.00
2019    932280.70
2020    348193.17
Name: RETAIL SALES, dtype: float64
Execution time: 0.0085 seconds

Total Retail Transfers by Item Type (Pandas):
ITEM TYPE
BEER            544643.41
DUNNAGE              0.00
KEGS                 0.00
LIQUOR          772935.06
NON-ALCOHOL      26671.92
REF                389.00
STR_SUPPLIES     10207.66
WINE            734927.24
Name: RETAIL TRANSFERS, dtype: float64
Execution time: 0.0220 seconds

Average Retail Sales per Supplier (top 20 shown):
SUPPLIER
FIFTH GENERATION INC                166.252581
CROWN IMPORTS                        52.052247
YUENGLING BREWERY                    50.256987
CHARLES JACQUIN ET CIE INC           50.143333
HEINEKEN USA                        