In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("RetailDemandIngestion") \
    .getOrCreate()


In [None]:
import pandas as pd

pdf = pd.read_excel("../data/raw/Online_Retail.xlsx")


df = spark.createDataFrame(pdf)

df.printSchema()
df.show(5)


In [None]:
df.createOrReplaceTempView("retail_sales")



In [None]:
spark.sql("""
SELECT
    Country,
    COUNT(DISTINCT InvoiceNo) AS total_orders,
    SUM(Quantity) AS total_quantity,
    ROUND(SUM(Quantity * UnitPrice), 2) AS revenue
FROM retail_sales
GROUP BY Country
ORDER BY revenue DESC
""").show(10)


In [None]:
spark.sql("""
SELECT
    InvoiceNo,
    COUNT(*) AS items_in_invoice
FROM retail_sales
GROUP BY InvoiceNo
ORDER BY items_in_invoice DESC
""").show(5)



In [None]:
country_sales_df = spark.sql("""
SELECT Country, SUM(Quantity * UnitPrice) AS revenue
FROM retail_sales
GROUP BY Country
""")

country_sales_df.printSchema()


In [None]:
spark.sql("SHOW TABLES").show()



In [None]:
spark.sql("""
SELECT
    Country,
    COUNT(*) AS rows_count
FROM retail_sales
GROUP BY Country
ORDER BY rows_count DESC
""").show(20, truncate=False)


In [None]:
df.createOrReplaceTempView("table_name")


In [None]:
spark.sql("SELECT COUNT(*) FROM retail_sales").show()


In [None]:
df.createOrReplaceTempView("retail_sales")


In [None]:
spark.sql("SHOW TABLES").show()


In [None]:
from pyspark.sql.functions import col

df_clean = df.select([
    col(c).alias(c.lower().replace(" ", "_")) for c in df.columns
])

df_clean.printSchema()


In [None]:
from pyspark.sql.functions import count, when

df_clean.select([
    count(when(col(c).isNull(), c)).alias(c) for c in df_clean.columns
]).show()


In [None]:
df_valid = df_clean.filter(
    (col("quantity") > 0) &
    (col("unitprice") > 0) &
    col("invoiceno").isNotNull() &
    col("invoicedate").isNotNull()
)

df_valid.count()


In [None]:
from pyspark.sql.functions import when

df_valid = df_valid.withColumn(
    "customerid",
    when(col("customerid").isNull(), -1).otherwise(col("customerid"))
)


In [None]:
df_valid.createOrReplaceTempView("retail_sales_clean")


In [None]:
spark.sql("""
SELECT country, COUNT(*) AS rows_count
FROM retail_sales_clean
GROUP BY country
ORDER BY rows_count DESC
""").show(10)


In [None]:
from pyspark.sql.functions import round

df_enriched = df_valid.withColumn(
    "revenue",
    round(col("quantity") * col("unitprice"), 2)
)

df_enriched.select("quantity", "unitprice", "revenue").show(5)


In [None]:
df_enriched.write \
    .mode("overwrite") \
    .parquet("../data/processed/retail_sales_clean")


In [None]:
spark.read.parquet("../data/processed/retail_sales_clean").count()


In [None]:
df_enriched.count()


In [None]:
df_enriched.createOrReplaceTempView("retail_sales_analytics")


In [None]:
spark.sql("SHOW TABLES").show()


In [None]:
country_sales = spark.sql("""
SELECT
    country,
    COUNT(DISTINCT invoiceno) AS total_orders,
    SUM(quantity) AS total_quantity,
    ROUND(SUM(revenue), 2) AS total_revenue
FROM retail_sales_analytics
GROUP BY country
ORDER BY total_revenue DESC
""")

country_sales.show(10, truncate=False)



In [None]:
daily_demand = spark.sql("""
SELECT
    DATE(invoicedate) AS sales_date,
    SUM(quantity) AS daily_quantity,
    ROUND(SUM(revenue), 2) AS daily_revenue
FROM retail_sales_analytics
GROUP BY DATE(invoicedate)
ORDER BY sales_date
""")

daily_demand.show(10)


In [None]:
product_demand = spark.sql("""
SELECT
    stockcode,
    description,
    SUM(quantity) AS total_quantity_sold,
    ROUND(SUM(revenue), 2) AS total_revenue
FROM retail_sales_analytics
GROUP BY stockcode, description
ORDER BY total_quantity_sold DESC
""")

product_demand.show(10)


In [None]:
country_sales.write.mode("overwrite") \
    .parquet("../data/processed/country_sales")

daily_demand.write.mode("overwrite") \
    .parquet("../data/processed/daily_demand")

product_demand.write.mode("overwrite") \
    .parquet("../data/processed/product_demand")


In [None]:
spark.read.parquet("../data/processed/daily_demand").show(5)


In [None]:
from pyspark.sql.functions import dayofweek, weekofyear, month, year

daily_features = daily_demand \
    .withColumn("day_of_week", dayofweek("sales_date")) \
    .withColumn("week_of_year", weekofyear("sales_date")) \
    .withColumn("month", month("sales_date")) \
    .withColumn("year", year("sales_date"))

daily_features.show(5)


In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag

window_spec = Window.orderBy("sales_date")

daily_features = daily_features \
    .withColumn("lag_1_day", lag("daily_quantity", 1).over(window_spec)) \
    .withColumn("lag_7_day", lag("daily_quantity", 7).over(window_spec))


In [None]:
daily_features.select(
    "sales_date", "daily_quantity", "lag_1_day", "lag_7_day"
).show(10)


In [None]:
from pyspark.sql.functions import avg

rolling_window = window_spec.rowsBetween(-7, -1)

daily_features = daily_features.withColumn(
    "rolling_7_day_avg",
    avg("daily_quantity").over(rolling_window)
)

daily_features.select(
    "sales_date", "daily_quantity", "rolling_7_day_avg"
).show(10)


In [None]:
daily_features_clean = daily_features.dropna()
daily_features_clean.count()


In [None]:
daily_features_clean.write \
    .mode("overwrite") \
    .parquet("../data/processed/daily_features")


In [None]:
spark.read.parquet("../data/processed/daily_features").show(5)


In [None]:
features_df = spark.read.parquet("../data/processed/daily_features")
features_df.printSchema()
features_df.show(5)



In [None]:
pdf = features_df.orderBy("sales_date").toPandas()
pdf.head()


In [None]:
feature_cols = [
    "lag_1_day",
    "lag_7_day",
    "rolling_7_day_avg",
    "day_of_week",
    "week_of_year",
    "month"
]

X = pdf[feature_cols]
y = pdf["daily_quantity"]


In [None]:
split_index = int(len(pdf) * 0.8)

X_train, X_test = X[:split_index], X[split_index:]
y_train, y_test = y[:split_index], y[split_index:]


In [None]:
from sklearn.linear_model import LinearRegression

model = LinearRegression()
model.fit(X_train, y_train)


In [None]:
y_pred = model.predict(X_test)

results = pdf.iloc[split_index:][["sales_date"]].copy()
results["actual_demand"] = y_test.values
results["predicted_demand"] = y_pred

results.head()


In [None]:
from sklearn.metrics import mean_absolute_error, mean_squared_error
import numpy as np

mae = mean_absolute_error(y_test, y_pred)
rmse = np.sqrt(mean_squared_error(y_test, y_pred))

mae, rmse


In [None]:
results.to_csv("../data/processed/demand_forecast_results.csv", index=False)


In [None]:
results.tail()


In [None]:
X, y


In [None]:
import torch

X_tensor = torch.tensor(X.values, dtype=torch.float32)
y_tensor = torch.tensor(y.values, dtype=torch.float32).view(-1, 1)


In [None]:
X_train = X_tensor[:split_index]
X_test  = X_tensor[split_index:]
y_train = y_tensor[:split_index]
y_test  = y_tensor[split_index:]


In [None]:
import torch.nn as nn

class DemandForecastNet(nn.Module):
    def __init__(self, input_dim):
        super().__init__()
        self.fc1 = nn.Linear(input_dim, 16)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(16, 1)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        return self.fc2(x)

model = DemandForecastNet(X_train.shape[1])


In [None]:
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

epochs = 50

for epoch in range(epochs):
    model.train()
    optimizer.zero_grad()

    outputs = model(X_train)
    loss = criterion(outputs, y_train)

    loss.backward()
    optimizer.step()

    if epoch % 10 == 0:
        print(f"Epoch {epoch}, Loss: {loss.item():.2f}")


In [None]:
model.eval()
with torch.no_grad():
    y_pred_torch = model(X_test)

y_pred_torch = y_pred_torch.numpy().flatten()


In [None]:
from sklearn.metrics import mean_absolute_error, mean_squared_error
import numpy as np

mae_torch = mean_absolute_error(y_test.numpy(), y_pred_torch)
rmse_torch = np.sqrt(mean_squared_error(y_test.numpy(), y_pred_torch))

mae_torch, rmse_torch


In [None]:
import pandas as pd

comparison = pd.DataFrame({
    "Model": ["Linear Regression", "PyTorch Neural Net"],
    "MAE": [mae, mae_torch],
    "RMSE": [rmse, rmse_torch]
})

comparison
