In [0]:
%pip install xgboost

Collecting xgboost
  Downloading xgboost-3.2.0-py3-none-manylinux_2_28_aarch64.whl.metadata (2.1 kB)
Collecting nvidia-nccl-cu12 (from xgboost)
  Downloading nvidia_nccl_cu12-2.29.3-py3-none-manylinux_2_18_aarch64.whl.metadata (2.1 kB)
Downloading xgboost-3.2.0-py3-none-manylinux_2_28_aarch64.whl (131.1 MB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/131.1 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m37.2/131.1 MB[0m [31m212.4 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━[0m [32m88.3/131.1 MB[0m [31m234.5 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━[0m [32m126.9/131.1 MB[0m [31m219.2 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m131.1/131.1 MB[0m [31m217.8 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━

In [0]:
from pyspark.sql.functions import col, to_date
from pyspark.sql import functions as F
from pyspark.sql.window import Window

import numpy as np
import pandas as pd

from xgboost import XGBRegressor
from sklearn.metrics import mean_squared_error, mean_absolute_percentage_error

In [0]:
df = spark.table("fuel_prices_raw")
display(df)

Date,A1,A2,A3,R1,R2,R3,M1,M2,M3,P1,P2,P3,D1
1995-01-02,1.127,1.104,1.231,1.079,1.063,1.167,1.17,1.159,1.298,1.272,1.25,1.386,1.104
1995-01-09,1.134,1.111,1.232,1.086,1.07,1.169,1.177,1.164,1.3,1.279,1.256,1.387,1.102
1995-01-16,1.126,1.102,1.231,1.078,1.062,1.169,1.168,1.155,1.299,1.271,1.249,1.385,1.1
1995-01-23,1.132,1.11,1.226,1.083,1.068,1.165,1.177,1.165,1.296,1.277,1.256,1.378,1.095
1995-01-30,1.131,1.109,1.221,1.083,1.068,1.162,1.176,1.163,1.291,1.275,1.255,1.37,1.09
1995-02-06,1.124,1.103,1.218,1.076,1.062,1.159,1.169,1.157,1.288,1.27,1.25,1.368,1.086
1995-02-13,1.121,1.099,1.218,1.074,1.058,1.158,1.166,1.153,1.285,1.265,1.243,1.367,1.088
1995-02-20,1.115,1.093,1.213,1.067,1.052,1.153,1.16,1.148,1.28,1.259,1.239,1.363,1.088
1995-02-27,1.121,1.101,1.211,1.073,1.06,1.152,1.164,1.153,1.276,1.265,1.246,1.362,1.089
1995-03-06,1.123,1.103,1.209,1.076,1.063,1.149,1.167,1.157,1.275,1.263,1.244,1.358,1.089


In [0]:
df = df.withColumnRenamed("R1", "Price")
df = df.withColumn("Date", to_date(col("Date")))

In [0]:
windowSpec = Window.orderBy("Date")

df = df.withColumn("Lag_1", F.lag("Price", 1).over(windowSpec))

df = df.withColumn(
    "Rolling_4",
    F.avg("Price").over(windowSpec.rowsBetween(-4, 0))
)

df = df.dropna()



In [0]:
df = df.withColumn(
    "Demand",
    1000 - 150 * col("Price") + F.rand(seed=42) * 20
)



In [0]:
pdf = df.toPandas()

train_size = int(len(pdf) * 0.8)

train = pdf[:train_size].copy()
test = pdf[train_size:].copy()

X_train = train.drop(columns=["Demand", "Date"])
y_train = train["Demand"]

X_test = test.drop(columns=["Demand", "Date"])
y_test = test["Demand"]



In [0]:
model = XGBRegressor(
    n_estimators=200,
    max_depth=4,
    learning_rate=0.05
)

model.fit(X_train, y_train)

preds = model.predict(X_test)

rmse = np.sqrt(mean_squared_error(y_test, preds))
mape = mean_absolute_percentage_error(y_test, preds)

print("RMSE:", rmse)
print("MAPE:", mape)

RMSE: 7.7260819313546865
MAPE: 0.00945414384802441


In [0]:
def optimize_price(row):
    prices = np.linspace(row["Price"] * 0.9, row["Price"] * 1.1, 20)
    profits = []

    for p in prices:
        temp = row.copy()
        temp["Price"] = p
        demand = model.predict([temp.drop(["Demand", "Date"])])[0]
        profit = p * demand
        profits.append(profit)

    best_idx = np.argmax(profits)
    return prices[best_idx], profits[best_idx]

optimized = test.apply(optimize_price, axis=1, result_type="expand")
optimized.columns = ["Recommended_Price", "Optimized_Profit"]

test["Recommended_Price"] = optimized["Recommended_Price"]
test["Optimized_Profit"] = optimized["Optimized_Profit"]
test["Actual_Profit"] = test["Price"] * test["Demand"]

In [0]:
total_actual = test["Actual_Profit"].sum()
total_optimized = test["Optimized_Profit"].sum()

improvement = total_optimized - total_actual

print("Total Actual Profit:", total_actual)
print("Total Optimized Profit:", total_optimized)
print("Improvement:", improvement)

Total Actual Profit: 420853.01420031686
Total Optimized Profit: 455275.37275359046
Improvement: 34422.3585532736


In [0]:
final_spark = spark.createDataFrame(test)

final_spark.write.mode("overwrite").saveAsTable("fuel_results")

In [0]:
%sql
SELECT *
FROM fuel_results
ORDER BY Date

Date,A1,A2,A3,Price,R2,R3,M1,M2,M3,P1,P2,P3,D1,Lag_1,Rolling_4,Demand,Recommended_Price,Optimized_Profit,Actual_Profit
2015-11-16,2.281,2.21,2.424,2.178,2.11,2.322,2.42,2.335,2.585,2.633,2.58,2.731,2.482,2.235,2.2284,687.7490951110871,2.3958,1604.4378974121094,1497.9175291519475
2015-11-23,2.198,2.117,2.362,2.094,2.015,2.258,2.338,2.243,2.522,2.556,2.492,2.675,2.445,2.178,2.1918,699.3011750858013,2.3034,1560.820196154785,1464.336660629668
2015-11-30,2.165,2.079,2.34,2.059,1.974,2.236,2.308,2.209,2.501,2.529,2.462,2.655,2.421,2.094,2.158,701.3486471875053,2.2649000000000004,1550.4604914245608,1444.0768645590736
2015-12-07,2.159,2.074,2.334,2.053,1.969,2.231,2.303,2.206,2.49,2.523,2.457,2.647,2.379,2.059,2.1238,707.8770385355172,2.2366894736842107,1554.4522224686473,1453.2715601134166
2015-12-14,2.144,2.059,2.317,2.037,1.953,2.214,2.287,2.191,2.474,2.509,2.446,2.626,2.338,2.053,2.0842,705.3252121015084,2.2407,1548.0360906188964,1436.7474570507727
2015-12-21,2.133,2.035,2.332,2.026,1.929,2.23,2.28,2.169,2.495,2.499,2.425,2.638,2.284,2.037,2.0538,702.9051272869414,2.2286,1550.4475209716798,1424.085787883343
2015-12-28,2.141,2.039,2.348,2.034,1.933,2.244,2.29,2.172,2.519,2.506,2.427,2.653,2.237,2.026,2.0418,697.6941568318368,2.2374,1554.96678046875,1419.109914995956
2016-01-04,2.135,2.027,2.354,2.028,1.922,2.25,2.285,2.161,2.525,2.498,2.413,2.656,2.211,2.034,2.0356,715.2953031704681,2.2308000000000003,1562.8530851806645,1450.6188748297093
2016-01-11,2.104,1.994,2.327,1.996,1.888,2.224,2.253,2.126,2.499,2.469,2.382,2.631,2.177,2.028,2.0242,717.9189559338201,2.1956,1538.1926814697267,1432.966236043905
2016-01-18,2.022,1.917,2.235,1.914,1.81,2.131,2.172,2.052,2.404,2.39,2.308,2.544,2.112,1.996,1.9996,713.5601359087652,2.1054,1496.4658649047851,1365.7541001293766


In [0]:
%sql
SELECT Date, Price, Recommended_Price
FROM fuel_results
ORDER BY Date

Date,Price,Recommended_Price
2015-11-16,2.178,2.3958
2015-11-23,2.094,2.3034
2015-11-30,2.059,2.2649000000000004
2015-12-07,2.053,2.2366894736842107
2015-12-14,2.037,2.2407
2015-12-21,2.026,2.2286
2015-12-28,2.034,2.2374
2016-01-04,2.028,2.2308000000000003
2016-01-11,1.996,2.1956
2016-01-18,1.914,2.1054


Databricks visualization. Run in Databricks to view.

In [0]:
%sql
SELECT 'Actual Profit' AS Type, SUM(Actual_Profit) AS Profit
FROM fuel_results
UNION ALL
SELECT 'Optimized Profit' AS Type, SUM(Optimized_Profit) AS Profit
FROM fuel_results

Type,Profit
Actual Profit,420853.01420031686
Optimized Profit,455275.37275359046


Databricks visualization. Run in Databricks to view.

In [0]:
%sql
SELECT 
  SUM(Optimized_Profit) - SUM(Actual_Profit) AS Profit_Improvement
FROM fuel_results

Profit_Improvement
34422.35855327331


Databricks visualization. Run in Databricks to view.

In [0]:
%sql
SELECT 
  CAST(
    (SUM(Optimized_Profit) - SUM(Actual_Profit)) / SUM(Actual_Profit) AS DOUBLE
  ) AS Profit_Improvement_Percent
FROM fuel_results

Profit_Improvement_Percent
0.0817918783798683


Databricks visualization. Run in Databricks to view.