In [0]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import os
import mlflow
from prophet import Prophet
from sklearn.metrics import mean_absolute_error, mean_squared_error
from tqdm import tqdm
from datetime import datetime, timedelta
import pytz

# --- Helper Functions for Quarter Boundaries ---
def get_quarter_boundaries(dt: pd.Timestamp):
    """
    Given a datetime dt, returns the start and end date for the quarter in which dt lies.
    Quarters: Q1: Jan-Mar, Q2: Apr-Jun, Q3: Jul-Sep, Q4: Oct-Dec.
    """
    year = dt.year
    if dt.month <= 3:
        return pd.Timestamp(year, 1, 1), pd.Timestamp(year, 3, 31)
    elif dt.month <= 6:
        return pd.Timestamp(year, 4, 1), pd.Timestamp(year, 6, 30)
    elif dt.month <= 9:
        return pd.Timestamp(year, 7, 1), pd.Timestamp(year, 9, 30)
    else:
        return pd.Timestamp(year, 10, 1), pd.Timestamp(year, 12, 31)

def next_quarter_bounds(ref_date: pd.Timestamp):
    """
    Given a reference date, returns the start and end dates for the next quarter.
    """
    current_start, current_end = get_quarter_boundaries(ref_date)
    next_start = current_end + pd.Timedelta(days=1)
    _, next_end = get_quarter_boundaries(next_start)
    return next_start, next_end

# --- Configuration & Paths ---
output_dir = "/dbfs/FileStore/prophet_forecasts"      # Directory for forecast CSV files
plot_dir = "/dbfs/FileStore/prophet_plots"              # Directory for forecast plots
os.makedirs(output_dir, exist_ok=True)
os.makedirs(plot_dir, exist_ok=True)

# Delta table names
predictions_delta_table = "genai_catalog.genai_schema.truck_forecast_predictions"
historical_delta_table = "genai_catalog.genai_schema.filtered_zone_truck_demand"
routes_delta_table = "genai_catalog.genai_schema.routes"

# --- Load Data from Delta ---
df_hist = spark.table(historical_delta_table).toPandas()
df_hist['Date'] = pd.to_datetime(df_hist['Date'])
df_hist.rename(columns={'Date': 'ds', 'Count_of_Truck': 'y'}, inplace=True)

# --- Load Holidays Data ---
# Option A: If you want to use an external holidays table, uncomment and use it;
# Otherwise, we will use Prophet's built-in holidays.
try:
    holidays_df = spark.table("genai_catalog.genai_schema.holidays").toPandas()
    holidays_df['ds'] = pd.to_datetime(holidays_df['ds'])
except Exception as e:
    print("Could not load external holidays table. Falling back to empty DataFrame.")
    holidays_df = pd.DataFrame()  # Empty DataFrame, in which case we'll add built-in holidays below

# --- Dynamic Date Calculations ---
current_date = df_hist['ds'].max()  # Most recent date in historical data
train_ratio = 0.8
earliest_date = df_hist['ds'].min()
total_duration = current_date - earliest_date
train_cutoff_date = earliest_date + total_duration * train_ratio

df_train = df_hist[df_hist['ds'] <= train_cutoff_date]
df_test = df_hist[df_hist['ds'] > train_cutoff_date]

# Determine forecast window for the next quarter
forecast_start, forecast_end = next_quarter_bounds(current_date)
future_dates = pd.date_range(start=forecast_start, end=forecast_end, freq='W-WED')

# Build a label for the forecast quarter and a timestamp for file naming
forecast_quarter = f"Q{((forecast_start.month - 1) // 3) + 1}_{forecast_start.year}"
timestamp_str = datetime.now(pytz.utc).strftime("%Y%m%dT%H%M%SZ")

# --- Validate Required Columns in df_hist ---
required_cols = {'Route_ID', 'Truck_Type', 'Zone'}
if not required_cols.issubset(set(df_hist.columns)):
    raise ValueError("Historical data must contain 'Route_ID', 'Truck_Type', and 'Zone' columns.")

# Get unique combinations for forecast (per route, truck type, zone)
combinations = df_hist[['Route_ID', 'Truck_Type', 'Zone']].drop_duplicates()

results = []
metrics_list = []


In [0]:
import mlflow
mlflow.set_registry_uri("demand_forecasting")

In [0]:
from tqdm import tqdm# --- Loop over Each Combination & Forecast ---

for _, row in tqdm(combinations.iterrows(), total=combinations.shape[0], desc="Forecasting combinations"):
    route = row['Route_ID']
    truck_type = row['Truck_Type']
    zone = row['Zone']
    
    combo_df = df_hist[(df_hist['Route_ID'] == route) & (df_hist['Truck_Type'] == truck_type)].copy()
    if len(combo_df) < 50:
        continue

    # Split data using the dynamic cutoff date
    train_data = combo_df[combo_df['ds'] <= train_cutoff_date]
    test_data = combo_df[combo_df['ds'] > train_cutoff_date]
    
    # Create the experiment directory if it does not exist
    experiment_path = f"/Workspace/GenAIAutomation/Truck_Demand_Forecasting/{route}_{truck_type}_{forecast_quarter}_{timestamp_str}"
    experiment = mlflow.get_experiment_by_name(experiment_path)
    # if experiment is None:
    #     mlflow.create_experiment(experiment_path)
    mlflow.set_experiment(experiment_path)
    
    try:
        with mlflow.start_run(run_name=f"{route}_{truck_type}_forecast_{forecast_quarter}_{timestamp_str}"):
            # Initialize Prophet model.
            # If external holidays_df is empty, we use built-in country holidays.
            model = Prophet(
                seasonality_mode='multiplicative',
                changepoint_prior_scale=0.1,
                seasonality_prior_scale=10.0,
                holidays_prior_scale=15.0,
                weekly_seasonality=True,
                yearly_seasonality=True
            )
            if holidays_df.empty:
                # Use built-in holidays for India as an example; adjust as needed.
                model.add_country_holidays(country_name='India')
            else:
                model.holidays = holidays_df

            # Fit the model on the training data.
            model.fit(train_data[['ds', 'y']])
            
            # Evaluate on test data if available.
            if not test_data.empty:
                future_test = test_data[['ds']].copy()
                forecast_test = model.predict(future_test)
                y_true = test_data['y'].values
                y_pred = forecast_test['yhat'].clip(lower=0).values

                mae = mean_absolute_error(y_true, y_pred)
                mse = mean_squared_error(y_true, y_pred)
                rmse = np.sqrt(mse)
                mape = np.mean(np.abs((y_true - y_pred) / (y_true + 1e-8))) * 100

                mlflow.log_metric("MAE", mae)
                mlflow.log_metric("RMSE", rmse)
                mlflow.log_metric("MAPE", mape)

                metrics_list.append({
                    'Route_ID': route,
                    'Truck_Type': truck_type,
                    'Zone': zone,
                    'MAE': round(mae, 2),
                    'MSE': round(mse, 2),
                    'RMSE': round(rmse, 2),
                    'MAPE': round(mape, 2),
                    'Forecast_Quarter': forecast_quarter
                })

            # Forecast for the next quarter.
            future_forecast = pd.DataFrame({'ds': future_dates})
            forecast = model.predict(future_forecast)
            forecast['yhat'] = np.ceil(forecast['yhat'].clip(lower=0)).astype(int)
            forecast['yhat_lower'] = np.ceil(forecast['yhat_lower'].clip(lower=0)).astype(int)
            forecast['yhat_upper'] = np.ceil(forecast['yhat_upper'].clip(lower=0)).astype(int)

            forecast_df = pd.DataFrame({
                'Date': forecast['ds'],
                'Route_ID': route,
                'Truck_Type': truck_type,
                'Zone': zone,
                'Forecasted_Count_of_Truck': forecast['yhat'],
                'Lower_Limit': forecast['yhat_lower'],
                'Upper_Limit': forecast['yhat_upper'],
                'Forecast_Quarter': forecast_quarter,
                'Run_Timestamp': timestamp_str
            })
            results.append(forecast_df)

            # Create a plot of the forecast.
            plt.figure(figsize=(12, 6))
            if not test_data.empty:
                plt.plot(test_data['ds'], test_data['y'], label='Actual Test', color='blue')
            plt.plot(forecast['ds'], forecast['yhat'], label='Forecast', color='green')
            plt.fill_between(forecast['ds'], forecast['yhat_lower'], forecast['yhat_upper'], 
                             color='red', alpha=0.2, label='Confidence Interval')
            plt.title(f"Truck Forecast: {route} | {truck_type} - {forecast_quarter}")
            plt.xlabel("Date")
            plt.ylabel("Truck Count")
            plt.legend()
            plt.tight_layout()

            plot_file = f"{plot_dir}/{route}_{truck_type}_{forecast_quarter}_{timestamp_str}.png"
            plt.savefig(plot_file)
            plt.close()
            mlflow.log_artifact(plot_file)

    except Exception as e:
        print(f"Failed for {route} - {truck_type}: {e}")
        continue

# --- Save Combined Forecasts ---
if results:
    forecast_final = pd.concat(results, ignore_index=True)
    forecast_csv_name = f"Forecast_{forecast_quarter}_{timestamp_str}.csv"
    forecast_csv_path = os.path.join(output_dir, forecast_csv_name)
    forecast_final.to_csv(forecast_csv_path, index=False)
    print(f"Forecast CSV saved to: {forecast_csv_path}")
    mlflow.log_artifact(forecast_csv_path)
else:
    print("No forecast results generated.")

# --- Write Predictions to Delta Table ---
if results:
    forecast_sdf = spark.createDataFrame(forecast_final)
    from delta.tables import DeltaTable
    try:
        # Attempt to get Delta table by name; if found, merge records
        deltaTable = DeltaTable.forName(spark, predictions_delta_table)
        # Use a composite key: Date, Forecast_Quarter, Route_ID, Truck_Type, Zone.
        deltaTable.alias("tgt").merge(
            forecast_sdf.alias("src"),
            "tgt.Date = src.Date AND tgt.Forecast_Quarter = src.Forecast_Quarter AND tgt.Route_ID = src.Route_ID AND tgt.Truck_Type = src.Truck_Type AND tgt.Zone = src.Zone"
        ).whenMatchedUpdateAll() \
         .whenNotMatchedInsertAll() \
         .execute()
    except Exception as ex:
        # If table not found, write out as new Delta table.
        forecast_sdf.write.format("delta").mode("append").saveAsTable(predictions_delta_table)
    
    print(f"Predictions successfully written to Delta table: {predictions_delta_table}")
else:
    print("No forecast records to write to Delta.")

print("All done! Forecasts, evaluation metrics, and Delta table updates complete.")

In [0]:
forecast_sdf.display()

In [0]:
# --- Import Required Packages ---
import numpy as np
import pandas as pd
import os

# --- Load Data from Delta Tables ---
forecast_df = forecast_sdf.toPandas()
route_df = spark.table("genai_catalog.genai_schema.multi_stop_route_plan").toPandas()

# --- Clean and Prepare Route Data ---
route_df.rename(columns={
    'Route ID': 'Route_ID',
    'Distance (km)': 'Distance_km',
    'Estimated Time (hrs)': 'Estimated_Time_hrs'
}, inplace=True)

# --- Aggregate Forecast Metrics ---
summary_df = (
    forecast_df
    .groupby(['Route_ID', 'Truck_Type', 'Zone'], as_index=False)
    .agg(
        Required_Trucks=('Forecasted_Count_of_Truck', 'max'),
    )
)

# --- Round Avg_Trip ---
#summary_df['Avg_Trip'] = np.ceil(summary_df['Avg_Trip']).astype(int)

# --- Merge Route Info ---
merged_df = pd.merge(summary_df, route_df, on=['Route_ID', 'Zone'], how='inner')

# --- Calculate Number of Stops ---
merged_df['No_of_Stops'] = merged_df['Destination Cities'].fillna('').apply(
    lambda x: len(x.split(',')) if x else 0
)

# --- Final Column Order ---
final_df = merged_df[[
    'Zone', 'Route_ID', 'Truck_Type', 'Required_Trucks',
    'Source City', 'Destination Cities', 'No_of_Stops', 'Distance_km'
]]
final_df.columns = [col.replace(" ", "_") for col in final_df.columns]


# --- Save as Zone-Wise CSVs into catalog ---
spark_df = spark.createDataFrame(final_df)

spark_df.write\
        .format("delta")\
        .mode("overwrite")\
        .option("inferSchema", "true")\
        .saveAsTable("genai_catalog.genai_schema.zone_route_truck_requirements")

# Loop through each zone and write to Unity Catalog
for zone in final_df['Zone'].unique():
    zone_df = final_df[final_df['Zone'] == zone]
    zone_file_path = os.path.join(output_dir, f"{zone}.csv")
    zone_df.to_csv(zone_file_path, index=False)

print("All zone-wise vendor files saved in: /dbfs/FileStore/zone_vendor_files/")

In [0]:
%sql
--select * from genai_catalog.genai_schema.truck_forecast_predictions