# Automated ML
## Introduction

This notebook is automatically generated by the Fabric low-code AutoML wizard based on your selections. Whether you're building a regression model, a classifier, or another machine-learning solution, this tool simplifies the process by transforming your goals into executable code. You can easily modify any settings or code snippets to better align with your requirements.

### What is FLAML?

[FLAML (Fast and Lightweight Automated Machine Learning)](https://aka.ms/fabric-automl) is an open-source AutoML library designed to quickly and efficiently find the best machine learning models and hyperparameters. FLAML optimizes for speed, accuracy, and cost, making it an excellent choice for a wide range of machine-learning tasks.

### Steps in this notebook

1. **Load the data**: Import your dataset.
2. **Generate features**: Automatically transform and preprocess your data to improve model performance.
3. **Use AutoML to find your best model**: Use FLAML to automatically select the most suitable model and optimize its parameters.
4. **Save the final machine learning model**: Store the trained model for future use.
5. **Generate predictions**: Use the saved model to predict outcomes on new data.

> [!IMPORTANT]
> **The forecasting functionality is currently supported only on Pandas DataFrames.**
> **Automated ML is currently supported on Fabric Runtimes 1.2+ or any Fabric environment with Spark 3.4+.**


In [1]:
%pip install scikit-learn==1.5.1


StatementMeta(, 1cee74f3-bc71-48f6-a120-c1558dfe2d31, 7, Finished, Available, Finished)

Collecting scikit-learn==1.5.1
  Downloading scikit_learn-1.5.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting threadpoolctl>=3.1.0 (from scikit-learn==1.5.1)
  Downloading threadpoolctl-3.6.0-py3-none-any.whl.metadata (13 kB)
Downloading scikit_learn-1.5.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (13.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.3/13.3 MB[0m [31m115.9 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hDownloading threadpoolctl-3.6.0-py3-none-any.whl (18 kB)
Installing collected packages: threadpoolctl, scikit-learn
  Attempting uninstall: threadpoolctl
    Found existing installation: threadpoolctl 2.2.0
    Not uninstalling threadpoolctl at /home/trusted-service-user/cluster-env/trident_env/lib/python3.11/site-packages, outside environment /nfs4/pyenv-4f705162-aa2e-4dd8-ae14-cf1ea67d3bc1
    Can't uninstall 'threadpoolctl'. No files were found to uninstall.
  Attempting uninstall:

### Default notebook optimization

This cell configures the logging and warning settings to reduce unnecessary output and focus on critical information. It suppresses specific warnings and logs from the underlying libraries, ensuring a cleaner and more readable notebook experience.

In [2]:
import logging
import warnings
 
logging.getLogger('synapse.ml').setLevel(logging.CRITICAL)
logging.getLogger('mlflow.utils').setLevel(logging.CRITICAL)
warnings.simplefilter('ignore', category=FutureWarning)
warnings.simplefilter('ignore', category=UserWarning)

StatementMeta(, 1cee74f3-bc71-48f6-a120-c1558dfe2d31, 9, Finished, Available, Finished)

## Step 1: Load the Data

This cell is responsible for importing the raw data from the specified source into the notebook environment. The data could come from various sources, such as a file or table in your lakehouse.

Once loaded, this data will serve as the input for subsequent steps, such as data transformation, model training, and evaluation.

## Step 3: Use AutoML to find your best model

We will now use FLAML's AutoML to automatically find the best machine learning model for our data. AutoML (Automated Machine Learning) simplifies the model selection process by automatically testing and tuning various algorithms and configurations, helping us quickly identify the most effective model with minimal manual effort.

### Tracking results with experiments in Fabric

Experiments in Fabric let you track the results of your AutoML process, providing a comprehensive view of all the metrics and parameters from your trials.

In [3]:
# Cell 1: Imports and Setup (Keep as is)
import logging
import warnings
import re
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
import mlflow
import flaml
from flaml import AutoML

# Configure logging and warnings
logging.getLogger('synapse.ml').setLevel(logging.CRITICAL)
logging.getLogger('mlflow.utils').setLevel(logging.CRITICAL)
logging.getLogger('flaml').setLevel(logging.INFO) # Keep FLAML info
warnings.simplefilter('ignore', category=FutureWarning)
warnings.simplefilter('ignore', category=UserWarning)

print(f"FLAML version: {flaml.__version__}")

# --- Configuration ---
DELTA_TABLE_PATH = "Tables/dbo/SPICEdata csv"
LIMIT_ROWS = 100000
TARGET_COL_RAW = "PRICE"
TIME_COL = "Month"
FORECAST_PERIOD = 5
TEST_SPLIT_RATIO = 0.2
MLFLOW_EXPERIMENT_NAME = "trial4-AutoMLExperiment-Resampled" # Updated name
RANDOM_SEED = 41

# --- Helper Function ---
def clean_col_names(df):
    """Cleans DataFrame column names for compatibility."""
    return df.rename(columns=lambda c: re.sub('[^A-Za-z0-9_]+', '_', c))

# Cell 2: Data Loading and Initial Cleaning (Keep as is)
print(f"Loading data from Delta table: '{DELTA_TABLE_PATH}'...")
df_spark = spark.read.format("delta").load(DELTA_TABLE_PATH).cache()

if LIMIT_ROWS:
    print(f"Limiting data to {LIMIT_ROWS} rows.")
    X_pd = df_spark.limit(LIMIT_ROWS).toPandas()
else:
    print("Converting full Spark DataFrame to Pandas...")
    X_pd = df_spark.toPandas()
    print(f"Loaded {X_pd.shape[0]} rows.")

df_spark.unpersist()

print("Cleaning column names...")
X_pd = clean_col_names(X_pd)
target_col = re.sub('[^A-Za-z0-9_]+', '_', TARGET_COL_RAW)
time_col = re.sub('[^A-Za-z0-9_]+', '_', TIME_COL)

print(f"Target column: '{target_col}'")
print(f"Time column: '{time_col}'")

if time_col not in X_pd.columns:
    raise KeyError(f"Time column '{time_col}' not found!")
if target_col not in X_pd.columns:
    raise KeyError(f"Target column '{target_col}' not found!")

# Cell 3: Preprocessing (MODIFIED)

print(f"Converting time column '{time_col}' to datetime...")
X_pd[time_col] = pd.to_datetime(X_pd[time_col], errors='coerce')

initial_rows = len(X_pd)
X_pd = X_pd.dropna(subset=[time_col, target_col])
if len(X_pd) < initial_rows:
     print(f"Dropped {initial_rows - len(X_pd)} rows with missing time or target values.")

print("Converting remaining data types...")
X_pd = X_pd.convert_dtypes()
X_pd = X_pd.dropna(axis=1, how='all')

print(f"Sorting data by time column '{time_col}'...")
X_pd = X_pd.sort_values(by=time_col).reset_index(drop=True)

# Column Selection (Keep as is)
print("Selecting columns...")
potential_feature_cols = X_pd.select_dtypes(include=['number', 'datetime', 'datetime64[ns]', 'category', 'bool', 'string']).columns.tolist()
cols_to_keep = list(dict.fromkeys(potential_feature_cols + [target_col, time_col]))
X_pd = X_pd[cols_to_keep]
print(f"Columns kept for processing: {X_pd.columns.tolist()}")

# Train/Test Split (Modified for Target Size ~2000)
print("\nSplitting data into train and test sets chronologically...")

TARGET_TEST_SIZE = 10000 # Desired approximate test size
test_size_multiple = FORECAST_PERIOD # e.g., 5

# Calculate the test size, ensuring it's at least the forecast period
# and adjusting to be a multiple of the forecast period (optional but good practice)
test_size_abs = max(test_size_multiple, TARGET_TEST_SIZE)

# Optional: Make it a multiple of FORECAST_PERIOD by rounding down
# If TARGET_TEST_SIZE is already a multiple, this does nothing.
# If not, it finds the largest multiple <= TARGET_TEST_SIZE.
test_size_abs = max(test_size_multiple, (test_size_abs // test_size_multiple) * test_size_multiple)

# Check if enough training data remains
min_train_size = test_size_multiple # Need at least one period for training context potentially
if len(X_pd) - test_size_abs < min_train_size:
     print(f"Warning: Target test size ({TARGET_TEST_SIZE} -> {test_size_abs}) leaves insufficient data ({len(X_pd) - test_size_abs}) for training. Adjusting test size.")
     # Reduce test size to leave minimum required for training
     test_size_abs = max(test_size_multiple, len(X_pd) - min_train_size)
     # Re-ensure it's a multiple of the period if possible after adjustment
     test_size_abs = max(test_size_multiple, (test_size_abs // test_size_multiple) * test_size_multiple)


print(f"Target test size ~{TARGET_TEST_SIZE}, using absolute size: {test_size_abs}")

X_train_raw = X_pd.iloc[:-test_size_abs, :].copy()
X_test_raw = X_pd.iloc[-test_size_abs:, :].copy() # Test set remains raw for now
print(f"Raw Train set shape: {X_train_raw.shape}, Raw Test set shape: {X_test_raw.shape}")

# *** STEP 1: Remove Duplicates from Training Data ***
print(f"\n--- Removing Duplicates from Training Data based on '{time_col}' ---")
print(f"X_train shape before duplicate removal: {X_train_raw.shape}")
X_train_dedup = X_train_raw.drop_duplicates(subset=[time_col], keep='first').copy()
rows_removed = X_train_raw.shape[0] - X_train_dedup.shape[0]
if rows_removed > 0:
    print(f"Removed {rows_removed} duplicate rows from training data based on '{time_col}'.")
else:
    print("No duplicate timestamps found in training data.")
print(f"X_train shape after duplicate removal: {X_train_dedup.shape}")

# *** STEP 2: Resample Training Data to Regular Monthly Frequency ***
print(f"\n--- Resampling Training Data to ensure regular '{time_col}' frequency ---")
if X_train_dedup.empty:
    raise ValueError("Training data is empty after removing duplicates. Cannot proceed.")

# Set time column as index
X_train_dedup.set_index(time_col, inplace=True)

# Determine the full date range (use 'MS' for Month Start frequency)
min_date = X_train_dedup.index.min()
max_date = X_train_dedup.index.max()
print(f"Original training date range: {min_date} to {max_date}")
# Ensure we have a valid range
if pd.isna(min_date) or pd.isna(max_date):
     raise ValueError("Could not determine date range from training data after deduplication.")

full_range = pd.date_range(start=min_date, end=max_date, freq='MS')
print(f"Resampling to full range: {full_range.min()} to {full_range.max()} ({len(full_range)} periods)")

# Reindex to the full range (introduces NaNs for missing months)
X_train_resampled = X_train_dedup.reindex(full_range)
print(f"Shape after reindexing: {X_train_resampled.shape}")
print(f"NaNs introduced in target ('{target_col}'): {X_train_resampled[target_col].isnull().sum()}")

# *** STEP 3: Impute NaNs created by Resampling ***
# Use forward fill as a common strategy for time series.
# This fills missing values with the last known value.
# Apply to ALL columns, including the target.
print("Applying forward fill (ffill) to handle NaNs introduced by resampling...")
X_train_resampled = X_train_resampled.ffill()

# Check if any NaNs remain (e.g., at the very beginning if the first rows were NaN)
if X_train_resampled.isnull().any().any():
    print("NaNs still present after ffill (likely at the beginning). Applying backfill (bfill)...")
    X_train_resampled = X_train_resampled.bfill() # Fill initial NaNs

if X_train_resampled.isnull().any().any():
     print("Warning: NaNs still present after ffill and bfill. Consider alternative imputation.")
     # Example: Fill remaining numeric NaNs with median/mean, categoricals with mode
     for col in X_train_resampled.columns:
         if X_train_resampled[col].isnull().any():
             if pd.api.types.is_numeric_dtype(X_train_resampled[col]):
                 fill_val = X_train_resampled[col].median()
                 print(f"Filling remaining NaNs in numeric column '{col}' with median ({fill_val})")
                 X_train_resampled[col].fillna(fill_val, inplace=True)
             else:
                 try:
                     fill_val = X_train_resampled[col].mode()[0]
                     print(f"Filling remaining NaNs in non-numeric column '{col}' with mode ({fill_val})")
                     X_train_resampled[col].fillna(fill_val, inplace=True)
                 except IndexError:
                     print(f"Could not find mode for column '{col}'. Leaving NaNs.")


# Reset the index to bring the time column back
X_train_resampled.reset_index(inplace=True)
X_train_resampled.rename(columns={'index': time_col}, inplace=True) # Rename index col back to time_col
print(f"Shape after imputation and index reset: {X_train_resampled.shape}")


# STEP 4: Separate Target Variable from RESAMPLED data
print("\nSeparating target variable from resampled training data...")
if target_col not in X_train_resampled.columns:
    raise KeyError(f"Target column '{target_col}' not found in resampled X_train.")
y_train = X_train_resampled.pop(target_col)
X_train_final = X_train_resampled # Final features for training

# Prepare Test Set (just separate target)
print("Separating target variable from test data...")
if target_col not in X_test_raw.columns:
    raise KeyError(f"Target column '{target_col}' not found in raw X_test.")
y_test = X_test_raw.pop(target_col)
X_test_final = X_test_raw # Final features for testing

print(f"Final X_train shape: {X_train_final.shape}, y_train shape: {y_train.shape}")
print(f"Final X_test shape: {X_test_final.shape}, y_test shape: {y_test.shape}")

print("\nSample of final X_train (after resampling and imputation):")
display(X_train_final.head())


# Cell 4: MLflow & FLAML Setup (Keep as is, maybe update experiment name)
print("\n--- MLflow & FLAML Setup ---")
mlflow.autolog(exclusive=False)
mlflow.set_experiment(MLFLOW_EXPERIMENT_NAME) # Using updated name
print(f"MLflow Experiment set to: '{MLFLOW_EXPERIMENT_NAME}'")

settings = {
    "time_budget": 120,
    "metric": "rmse",
    "estimator_list": ['lgbm', 'xgboost', 'extra_tree', 'xgb_limitdepth', 'prophet'],
    "task": "ts_forecast",
    "log_file_name": "flaml_experiment_trial4_resampled.log", # Updated log name
    "max_iter": 10,
    "force_cancel": True,
    "seed": RANDOM_SEED,
    "use_spark": True,
    "n_concurrent_trials": 3,
    "verbose": 1,
    "featurization": "auto",
}

automl = AutoML(**settings)

# Cell 5: Run FLAML Fit (Keep as is)
print(f"\nStarting FLAML fit with Auto-Featurization, time_col='{time_col}', period={FORECAST_PERIOD}...")
with mlflow.start_run(nested=True, run_name="trial4_automl_fit_autofeature_resampled") as run: # Updated run name
    automl.fit(
        X_train=X_train_final,    # Use the RESAMPLED training features
        y_train=y_train,          # Use the corresponding training target
        period=FORECAST_PERIOD,   # Forecast horizon
        time_col=time_col         # Pass the correct time column name
    )
    print("FLAML fit completed.")
    mlflow.log_param("forecast_period", FORECAST_PERIOD)
    mlflow.log_param("time_column", time_col)
    mlflow.log_param("featurization_mode", "auto")
    mlflow.log_param("resampling_applied", True) # Log that we resampled

print("\n--- Script Finished ---")

# Add prediction/saving steps if needed

StatementMeta(, 1cee74f3-bc71-48f6-a120-c1558dfe2d31, 10, Finished, Available, Finished)

FLAML version: 2.3.3.post3
Loading data from Delta table: 'Tables/dbo/SPICEdata csv'...
Limiting data to 100000 rows.
Cleaning column names...
Target column: 'PRICE'
Time column: 'Month'
Converting time column 'Month' to datetime...
Converting remaining data types...
Sorting data by time column 'Month'...
Selecting columns...
Columns kept for processing: ['Year', 'Month', 'Dept_Name', 'Section_Name', 'Family_name', 'Sub_Family_Name', 'Brand_Principle', 'Brand_Name', 'SupplierName', 'Item_Code', 'Item_Bar_Code', 'Item_Name', 'BRAND', 'TYPE', 'VARIANTS_2', 'CATEGORY', 'SUPER_CATEGORY', 'PACKAGING', 'PRICE', 'Storename', 'Qty']

Splitting data into train and test sets chronologically...
Target test size ~10000, using absolute size: 10000
Raw Train set shape: (90000, 21), Raw Test set shape: (10000, 21)

--- Removing Duplicates from Training Data based on 'Month' ---
X_train shape before duplicate removal: (90000, 21)
Removed 89960 duplicate rows from training data based on 'Month'.
X_trai

SynapseWidget(Synapse.DataFrame, b7216b70-5cc3-42bd-9f61-2582ef93317e)


--- MLflow & FLAML Setup ---
MLflow Experiment set to: 'trial4-AutoMLExperiment-Resampled'

Starting FLAML fit with Auto-Featurization, time_col='Month', period=5...


INFO:flaml.automl.task.time_series_task:Couldn't import orbit, skipping
[I 2025-04-05 08:33:23,180] A new study created in memory with name: optuna


[I 2025-04-05 08:33:36,833] A new study created in memory with name: optuna





FLAML fit completed.



--- Script Finished ---


## Step 4: Save the final machine learning model

Upon completing the AutoML trial, you can now save the final, tuned model as an ML model in Fabric.

In [4]:
model_path = f"runs:/{automl.best_run_id}/model"

# Register the model to the MLflow registry
registered_model = mlflow.register_model(model_uri=model_path, name="trial4")

# Print the registered model's name and version
print(f"Model '{registered_model.name}' version {registered_model.version} registered successfully.")

StatementMeta(, 1cee74f3-bc71-48f6-a120-c1558dfe2d31, 11, Finished, Available, Finished)

Registered model 'trial4' already exists. Creating a new version of this model...
Created version '22' of model 'trial4'.


## Step 5: Generate predictions

1. Generate predictions.

In [5]:
# Cell: Model Registration, Prediction, and Saving

import mlflow
import pandas as pd
import numpy as np
from pyspark.sql.types import StructType, StructField, TimestampType, FloatType # Import necessary types
import traceback # For detailed error printing

# --- Configuration (Ensure these are defined or carried over) ---
REGISTERED_MODEL_NAME = "trial4" # As specified in your registration code
PREDICTIONS_TABLE_NAME = f"{REGISTERED_MODEL_NAME}_predictions" # Define a name for the predictions table
# Ensure 'time_col' variable is available from previous cells (e.g., time_col = "Month")
if 'time_col' not in locals():
    raise NameError("Variable 'time_col' is not defined. Please ensure it's carried over from previous cells.")
# Ensure spark session is available
if 'spark' not in locals():
    raise NameError("'spark' session not found.")
# Ensure test data is available
if 'X_test_final' not in locals():
     raise NameError("'X_test_final' not found.")

# --- Model Registration (Using your provided logic) ---
print("\n--- Model Registration ---")
best_run_id = None
try:
    # Ensure automl object is available from the fit cell run
    if 'automl' not in locals() or not hasattr(automl, 'best_run'):
         print("Warning: 'automl' object or 'best_run' attribute not found. Cannot reliably determine best run ID for registration based on the object.")
         # Attempt to retrieve the ID from the MLflow context if available
         if 'run' in locals() and hasattr(run, 'info') :
             best_run_id = run.info.run_id
             print(f"Using run_id from MLflow context manager: {best_run_id}")
         else:
             print("MLflow context 'run' also not found. Cannot register model without run ID.")
             raise ValueError("Could not determine best run ID.")

    elif hasattr(automl, 'best_run') and hasattr(automl.best_run, 'info'):
         best_run_id = automl.best_run.info.run_uuid # Common attribute in FLAML
         print(f"Using run_id from automl.best_run: {best_run_id}")
    else:
         # Fallback if structure is different or context was lost
         raise ValueError("Could not determine best_run_id from automl object.")

    model_path = f"runs:/{best_run_id}/model"
    print(f"Attempting to register model '{REGISTERED_MODEL_NAME}' from URI: {model_path}")
    registered_model = mlflow.register_model(model_uri=model_path, name=REGISTERED_MODEL_NAME)
    print(f"Model '{registered_model.name}' version {registered_model.version} registered successfully.")

except Exception as e:
     print(f"Error during model registration: {e}")
     print("Skipping registration.")
     registered_model = None


# --- Prediction ---
print(f"\n--- Predicting on test data (shape: {X_test_final.shape}) ---")

try:
    # Ensure automl object is available for prediction
    if 'automl' not in locals():
        raise NameError("automl object not found. Cannot predict.")

    print("Generating predictions...")
    loaded_model_pred = automl.predict(X_test_final)

    # Display sample predictions
    if isinstance(loaded_model_pred, (np.ndarray, pd.Series)):
        print('Predicted values sample:', loaded_model_pred[:10])
        print(f"Number of predictions generated: {len(loaded_model_pred)}")
    else:
        print("Predictions generated (type not array/series).")


    # --- Prepare predictions for saving ---
    print("Formatting predictions for saving...")
    if isinstance(loaded_model_pred, np.ndarray):
        # Ensure index alignment if possible, otherwise reset both
        pred_series = pd.Series(loaded_model_pred, name="prediction", index=X_test_final.index)
    elif isinstance(loaded_model_pred, pd.Series):
        pred_series = loaded_model_pred.rename("prediction")
    else: # Convert list or other iterable
        # Use test index if lengths match, otherwise default range index
        if len(loaded_model_pred) == len(X_test_final):
             pred_series = pd.Series(loaded_model_pred, name="prediction", index=X_test_final.index)
        else:
             print(f"Warning: Prediction length ({len(loaded_model_pred)}) doesn't match test set index length ({len(X_test_final)}). Using default index.")
             pred_series = pd.Series(loaded_model_pred, name="prediction")


    # Combine predictions with test set time column - reset index for robust concatenation
    # Ensure the time column exists in the test set
    if time_col not in X_test_final.columns:
        raise KeyError(f"Time column '{time_col}' not found in X_test_final.")

    predictions_df_pd = pd.concat(
        [X_test_final[[time_col]].reset_index(drop=True),
         pred_series.reset_index(drop=True)],
        axis=1
    )

    # --- Convert to Spark DataFrame ---
    print("Converting predictions to Spark DataFrame...")
    # Define schema carefully
    schema = StructType([
        StructField(time_col, TimestampType(), True), # Assuming time_col is indeed timestamp
        StructField("prediction", FloatType(), True)  # Assuming predictions are numeric (float)
    ])

    # Check for NaNs before conversion, as createDataFrame can be strict
    if predictions_df_pd[time_col].isnull().any():
         print(f"Warning: NaNs found in time column '{time_col}' before Spark conversion. This might cause errors.")
    if predictions_df_pd['prediction'].isnull().any():
         print(f"Warning: NaNs found in 'prediction' column before Spark conversion.")

    predictions_spark_df = spark.createDataFrame(predictions_df_pd, schema=schema)
    print("Spark DataFrame created successfully.")


    # --- Save to Delta ---
    # Clean the table name for path safety
    saved_name = PREDICTIONS_TABLE_NAME.replace(".", "_").replace(" ", "_")
    save_path = f"Tables/{saved_name}"
    print(f"Saving predictions to Delta table: {save_path}")

    predictions_spark_df.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save(save_path)

    print(f"Predictions saved successfully to '{save_path}'.")

except Exception as e:
    print(f"An error occurred during prediction or saving: {e}")
    print("\n--- Traceback ---")
    traceback.print_exc() # Print detailed traceback

print("\nPrediction cell finished.")

StatementMeta(, 1cee74f3-bc71-48f6-a120-c1558dfe2d31, 12, Finished, Available, Finished)

Registered model 'trial4' already exists. Creating a new version of this model...


Model 'trial4' version 23 registered successfully.

--- Predicting on test data (shape: (10000, 20)) ---
Generating predictions...
Predicted values sample: 90000    100.546720
90001    103.104424
90002    154.055115
90003    143.622007
90004    132.781834
90005    159.708107
90006    103.104424
90007    154.055115
90008    132.361609
90009    121.521435
Name: PRICE, dtype: float64
Number of predictions generated: 10000
Formatting predictions for saving...
Converting predictions to Spark DataFrame...
Spark DataFrame created successfully.
Saving predictions to Delta table: Tables/trial4_predictions
Predictions saved successfully to 'Tables/trial4_predictions'.

Prediction cell finished.


2. Save the predictions to a table.

In [6]:
from pyspark.sql.types import FloatType
predictions = spark.createDataFrame(loaded_model_pred, FloatType())
saved_name = "SPICEdata csv_predictions".replace(".", "_")
predictions.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save(f"Tables/{saved_name}")

StatementMeta(, 1cee74f3-bc71-48f6-a120-c1558dfe2d31, 13, Finished, Available, Finished)

In [7]:
# Query the prediction results from the Delta table
print(f"Querying the first 20 predictions from the saved Delta table 'Tables/trial4_predictions'...")

# Ensure spark session is available
if 'spark' not in locals():
    raise NameError("'spark' session not found. Please ensure it is initialized.")
# Ensure time_col is available
if 'time_col' not in locals():
     raise NameError("'time_col' (e.g., 'Month') not found.")


try:
    # Use backticks for safety with paths/table names, although likely not needed for 'trial4_predictions'
    predictions_results_df = spark.sql(f"""
        SELECT *
        FROM delta.`Tables/trial4_predictions`
        ORDER BY `{time_col}`  -- Order by your time column (Month)
        LIMIT 20
    """)

    print(f"\nDisplaying first 20 predictions (ordered by {time_col}):")
    display(predictions_results_df) # Use display() for rich output in Synapse/Databricks

    # Optional: Convert to Pandas for easier inspection if needed later
    # predictions_results_pd = predictions_results_df.toPandas()
    # print("\nSample as Pandas DataFrame:")
    # print(predictions_results_pd.head())


except Exception as e:
    print(f"\nAn error occurred while querying the Delta table: {e}")
    print("Please ensure the table 'Tables/trial4_predictions' exists and the path/name is correct.")

StatementMeta(, 1cee74f3-bc71-48f6-a120-c1558dfe2d31, 14, Finished, Available, Finished)

Querying the first 20 predictions from the saved Delta table 'Tables/trial4_predictions'...

Displaying first 20 predictions (ordered by Month):


SynapseWidget(Synapse.DataFrame, d8e56e34-9811-42f8-a2ef-ca879100621e)

## joining the data to correctly view

In [8]:
# Step 1: Prepare Data for Joining (Modified to add Row ID)

import pyspark.sql.functions as F

# Ensure required variables exist
if 'X_test_final' not in locals(): raise NameError("'X_test_final' is not defined.")
if 'y_test' not in locals(): raise NameError("'y_test' is not defined.")
if 'time_col' not in locals(): raise NameError("'time_col' (e.g., 'Month') not found.")
if 'target_col' not in locals(): raise NameError("'target_col' (e.g., 'PRICE') not found.")
if 'spark' not in locals(): raise NameError("'spark' session is not defined.")

print("Preparing test data with actual prices...")
X_test_with_actuals_pd = X_test_final.copy()
X_test_with_actuals_pd['Actual_PRICE'] = y_test.values

print(f"X_test_with_actuals_pd shape: {X_test_with_actuals_pd.shape}")

# --- Convert Test Data with Actuals to Spark DataFrame ---
print("\nConverting test data with actuals to Spark DataFrame...")
try:
    test_data_sdf_temp = spark.createDataFrame(X_test_with_actuals_pd)

    # *** Add a unique row identifier based on window function ***
    # This assumes the order needs to be preserved from the pandas DF
    # Define a window spec without partitioning or ordering (relies on current order)
    from pyspark.sql.window import Window
    from pyspark.sql.functions import monotonically_increasing_id, row_number

    # Using monotonically_increasing_id is generally safer for unique IDs across partitions
    # If you need strictly sequential 0, 1, 2... AND you are SURE it's not distributed/shuffled,
    # you could use row_number over an empty window, but it's less robust.
    # Let's try row_number over a dummy window first for simplicity, assuming order preserved.
    # If order isn't guaranteed, monotonically_increasing_id() is better.
    # window_spec = Window.orderBy(monotonically_increasing_id()) # Safer if order might change
    window_spec = Window.orderBy("__dummy_col_for_order") # Dummy order if needed for row_number

    test_data_sdf = test_data_sdf_temp.withColumn("__dummy_col_for_order", F.lit(1)) # Add dummy column if needed by window
    test_data_sdf = test_data_sdf.withColumn("row_id", row_number().over(window_spec) - 1).drop("__dummy_col_for_order") # Generate 0-based ID

    test_data_sdf.createOrReplaceTempView("test_data_view")
    print("Created temporary view: test_data_view (with row_id)")
    print("Schema for test_data_view:")
    test_data_sdf.printSchema()

except Exception as e:
    print(f"Error converting test data to Spark DataFrame or adding row_id: {e}")
    import traceback
    traceback.print_exc()
    raise

# --- Load Predictions Spark DataFrame ---
print("\nLoading predictions Spark DataFrame...")
try:
    predictions_sdf_temp = spark.read.format("delta").load(f"Tables/trial4_predictions")
    if 'prediction' not in predictions_sdf_temp.columns:
         raise ValueError("Column 'prediction' not found.")
    predictions_sdf_temp = predictions_sdf_temp.withColumnRenamed("prediction", "Predicted_PRICE")

    # *** Add the same unique row identifier ***
    # Ensure the window spec results in the same ordering logic used for test_data_sdf
    predictions_sdf = predictions_sdf_temp.withColumn("__dummy_col_for_order", F.lit(1)) # Add dummy column
    predictions_sdf = predictions_sdf.withColumn("row_id", row_number().over(window_spec) - 1).drop("__dummy_col_for_order") # Generate 0-based ID


    predictions_sdf.createOrReplaceTempView("predictions_view")
    print("Created temporary view: predictions_view (with row_id)")
    print("Schema for predictions_view:")
    predictions_sdf.printSchema()

except Exception as e:
    print(f"Error loading predictions Delta table or adding row_id: {e}")
    import traceback
    traceback.print_exc()
    raise


# --- Step 2: Perform Join and Query in Spark using row_id ---

print("\nJoining test data with predictions using row_id and querying comparison:")

# Define key columns (needed for SELECT and ORDER BY, not join)
display_cols_test = ['Storename', 'Item_Name', 'Item_Code', 'Dept_Name', 'Actual_PRICE'] # Adjust as needed
select_cols_t1 = ", ".join([f"t1.`{col}`" for col in [time_col] + display_cols_test if col in test_data_sdf.columns])

try:
    # Join test_data_view (t1) with predictions_view (t2) using the generated row_id
    comparison_query = f"""
        SELECT
            {select_cols_t1},
            t2.Predicted_PRICE,
            (t2.Predicted_PRICE - t1.Actual_PRICE) AS Prediction_Error,
            CASE
                WHEN t1.Actual_PRICE IS NULL OR t1.Actual_PRICE = 0 THEN NULL -- Avoid division by zero/null
                ELSE ABS((t2.Predicted_PRICE - t1.Actual_PRICE) / t1.Actual_PRICE) * 100
            END AS Absolute_Percentage_Error
        FROM test_data_view t1
        INNER JOIN predictions_view t2 ON t1.row_id = t2.row_id -- *** JOIN ON row_id ***
        ORDER BY t1.`{time_col}`, t1.Storename, t1.Item_Name -- Order meaningfully using t1 columns
        LIMIT 20
    """

    print("\nExecuting Spark SQL query for comparison...")
    comparison_results_df = spark.sql(comparison_query)

    print(f"\nDisplaying first 20 comparison results (ordered by {time_col}, Storename, Item_Name):")
    display(comparison_results_df)

    # Check if the join produced results
    if comparison_results_df.count() == 0:
         print("\nWARNING: The join between test data and predictions using row_id produced 0 rows.")
         print("This suggests an issue with row_id generation or data loss.")

except Exception as e:
    print(f"\nAn error occurred while querying the comparison: {e}")
    import traceback
    traceback.print_exc()

StatementMeta(, 1cee74f3-bc71-48f6-a120-c1558dfe2d31, 15, Finished, Available, Finished)

Preparing test data with actual prices...
X_test_with_actuals_pd shape: (10000, 21)

Converting test data with actuals to Spark DataFrame...
Created temporary view: test_data_view (with row_id)
Schema for test_data_view:
root
 |-- Year: long (nullable = true)
 |-- Month: timestamp (nullable = true)
 |-- Dept_Name: string (nullable = true)
 |-- Section_Name: string (nullable = true)
 |-- Family_name: string (nullable = true)
 |-- Sub_Family_Name: string (nullable = true)
 |-- Brand_Principle: string (nullable = true)
 |-- Brand_Name: string (nullable = true)
 |-- SupplierName: string (nullable = true)
 |-- Item_Code: long (nullable = true)
 |-- Item_Bar_Code: long (nullable = true)
 |-- Item_Name: string (nullable = true)
 |-- BRAND: string (nullable = true)
 |-- TYPE: string (nullable = true)
 |-- VARIANTS_2: string (nullable = true)
 |-- CATEGORY: string (nullable = true)
 |-- SUPER_CATEGORY: string (nullable = true)
 |-- PACKAGING: string (nullable = true)
 |-- Storename: string (nul

SynapseWidget(Synapse.DataFrame, 55e32b55-40c7-45db-9b1a-9feb3cd948af)

## explanations

Okay, fantastic! This output is exactly what we need to evaluate the model's performance at a granular level. Let's break down one row as an example and then discuss the overall picture:

# Example Row Breakdown (Row 1):

2024-08-01 00:00:00: The prediction is for the month of August 2024.

3110 - KRIV: This likely represents the Storename (Store 3110, perhaps KRIV is a region or type).

KOL BLACK PEPPER GROUND 100G: This is the Item_Name.

65429: This is the Item_Code.

CGD: This is the Dept_Name.

237.9972: This is the Actual_PRICE for this specific item in this store during August 2024 (from your y_test data).

262.155: This is the Predicted_PRICE generated by the FLAML model for this item/store/month combination.

24.1577...: This is the Prediction_Error (Predicted - Actual). The model predicted about 24 units higher than the actual price.

10.1504...: This is the Absolute_Percentage_Error (APE). The prediction was about 10.15% higher than the actual price.

# Interpreting the Results Overall:

Row-Level Detail: You can now see the specific prediction for each unique combination of Month, Storename, and Item_Name/Item_Code.

Performance Varies: Notice how the predictions and errors differ significantly across rows, even within the same store (3110 - KRIV) and month:

For KOL BLACK PEPPER GROUND 100G, the prediction was ~10% high.

For KOL CARDAMON GROUND 100G, the prediction (262.155) was much lower than the actual (431.997), resulting in a large negative error and a ~39% APE.

For KOL CHILIES GROUND 50G, the prediction (209.31) was much higher than the actual (81.00), leading to a very high positive error and a huge 158% APE.

For KOL CINNAMON GROUND 100G, the prediction (209.31) was very close to the actual (204.99), resulting in a small error and only ~2% APE.

# Model Strengths/Weaknesses: This kind of detailed view helps you identify:

Items/Categories Predicted Well: Items like the Cinnamon Ground 100G seem well-predicted (low error).

Items/Categories Predicted Poorly: Items like Chilies Ground 50G or Cardamon Ground 100G have large errors. This might indicate the model struggles with these specific items, maybe due to volatile pricing, sparse data for them in the training set, or missing features that drive their price.

Systematic Bias? Look for patterns. Does the model consistently over-predict or under-predict for certain departments, brands, or price ranges? (In this small sample, it seems mixed).

Actionable Insights: Based on this, you might decide:

The model is generally acceptable if the average error across all items is low enough.

Specific items/categories need further investigation or maybe even separate modeling approaches.

More feature engineering might be needed to capture factors influencing the poorly predicted items.

To retrain the model, perhaps excluding certain volatile items or focusing the evaluation metric (e.g., using MAE instead of RMSE if outliers are heavily penalized).

In conclusion, this detailed comparison view is crucial. It shows your model is generating predictions at the intended item/store/month level, but its accuracy varies significantly across different items. You now have the information needed to perform a much deeper evaluation of the model's quality.
