In [None]:
!pip install pyarrow -q

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import requests

spark = SparkSession.builder \
    .appName("EcommerceBigDataAnalysis") \
    .getOrCreate()

data_url = "https://raw.githubusercontent.com/databricks/Spark-The-Definitive-Guide/master/data/retail-data/all/online-retail-dataset.csv"
local_file_path = "/content/ecommerce_data.csv"

print("Downloading E-commerce dataset...")
response = requests.get(data_url, stream=True)
response.raise_for_status()

with open(local_file_path, 'wb') as f:
    for chunk in response.iter_content(chunk_size=1024*1024):
        f.write(chunk)
print(f"Download complete! File saved at: {local_file_path}")



df = spark.read.csv(local_file_path, header=True, inferSchema=True)
df_clean = df.filter((F.col("Quantity") > 0) & (F.col("UnitPrice") > 0))


# Calculate Total Sales per transaction: Quantity * UnitPrice
df_with_revenue = df_clean.withColumn("TotalRevenue", F.col("Quantity") * F.col("UnitPrice"))

print(f"\n--- SUCCESS: DATA LOADED ---")
print(f"Total rows analyzed: {df_clean.count():,}")

# Insight 1: Top 10 Countries by Revenue
print("\n--- GLOBAL REVENUE BY COUNTRY ---")
revenue_insights = df_with_revenue.groupBy("Country") \
    .agg(
        F.sum("TotalRevenue").alias("Total_Sales"),
        F.count("InvoiceNo").alias("Transaction_Volume"),
        F.avg("TotalRevenue").alias("Avg_Order_Value")
    ) \
    .orderBy(F.desc("Total_Sales"))

revenue_insights.show(10)

# Insight 2: Peak Shopping Hours (Time-Series Analysis)

print("\n--- BUSIEST SHOPPING HOURS (24h Format) ---")
hourly_sales = df_clean.withColumn("Hour", F.hour(F.to_timestamp(F.col("InvoiceDate"), "M/d/yyyy H:mm"))) \
    .groupBy("Hour") \
    .count() \
    .orderBy("Hour")

hourly_sales.show(24)

# 10. End Session
# spark.stop()

In [None]:
# Identifying top 10 customers by total spend
print("\n--- TOP 10 CUSTOMERS BY SPEND ---")
top_customers = df_with_revenue.groupBy("CustomerID") \
    .agg(
        F.sum("TotalRevenue").alias("Total_Spent"),
        F.count("InvoiceNo").alias("Total_Orders")
    ) \
    .filter(F.col("CustomerID").isNotNull()) \
    .orderBy(F.desc("Total_Spent"))

top_customers.show(10)

In [None]:
# Top 10 Products by Total Revenue
print("\n--- TOP 10 PRODUCTS BY REVENUE ---")
product_performance = df_with_revenue.groupBy("Description") \
    .agg(
        F.sum("TotalRevenue").alias("Product_Revenue"),
        F.sum("Quantity").alias("Units_Sold")
    ) \
    .orderBy(F.desc("Product_Revenue"))

product_performance.show(10, truncate=False)

In [None]:
# Most popular products in specific countries
print("\n--- TOP PRODUCTS PER COUNTRY (Volume) ---")
regional_trends = df_with_revenue.groupBy("Country", "Description") \
    .agg(F.sum("Quantity").alias("Total_Quantity")) \
    .orderBy(F.desc("Total_Quantity"))

# Showing top 15 results across all regions
regional_trends.show(15, truncate=False)

In [None]:
# 1 = Sunday, 2 = Monday, ..., 7 = Saturday
print("\n--- SALES VOLUME BY DAY OF WEEK ---")
day_of_week_analysis = df_with_revenue.withColumn("DayOfWeek", F.dayofweek(F.to_timestamp(F.col("InvoiceDate"), "M/d/yyyy H:mm"))) \
    .groupBy("DayOfWeek") \
    .count() \
    .orderBy("DayOfWeek")

day_of_week_analysis.show()

In [None]:
# Reloading data specifically to look at cancellations/returns
df_returns = df.filter(F.col("Quantity") < 0)

print(f"\n--- DATA QUALITY INSIGHT ---")
print(f"Total number of cancellations/returns: {df_returns.count():,}")

# Top countries with the most returns
df_returns.groupBy("Country").count().orderBy(F.desc("count")).show(5)