In [1]:
import polars as pl
import numpy as np
from typing import List, Optional
from mlforecast import MLForecast
from mlforecast.lag_transforms import RollingMean
import lightgbm as lgb

In [None]:
# SRMIST: Print statements to be replaced by logging. Whichever framework that you are using. 
# SRMIST: functions to be placed in relavant files. You can decide on that. 

data_api_type = 'csv' # 

data_column_mapping = {
    "ID": ["Store"],
    "Date": ["Date"],
    "Target": ["Weekly_Sales"],
    "Regressor": ["Holiday_Flag", "Temperature", "Fuel_Price", "CPI", "Unemployment"] # SRMIST: Ideally we should be able to pass an empty list and this should still work. 
}
data_date_format_mapping = {
    # "Date": ["%d-%m-%Y"],
    "Date": ["%Y-%m-%d"],
}

In [3]:
def GetTrainingData():

    if (data_api_type == 'csv'):      # SRMIST: This is using a CSV. We must create options to connect to DB too based on input. 
        df = pl.read_csv('train.csv') # SRMIST: ensure actuals for the specified date is picked. 
    else:
        print(f'Undefined data_api_type: {data_api_type}. Please provide valid data_api_type.')

    return df

def GetTestData():

    if (data_api_type == 'csv'):
        df = pl.read_csv('test.csv') # ensure actuals for the specified date is picked. 
    else:
        print(f'Undefined data_api_type: {data_api_type}. Please provide valid data_api_type.')

    return df

In [4]:

def PreProcessDataType(df):

    print("\n--- Pre-processing Data Types ---")
    # Ensure Date columns are of type Date
    if "Date" in data_column_mapping:
        for col in data_column_mapping["Date"]:
            if col in df.columns:
                # Check if the column is NOT already a Date type
                if df[col].dtype != pl.Date:
                    # Attempt to parse common date formats.
                    date_format = data_date_format_mapping.get(col, ["%Y-%m-%d"])[0]
                    df = df.with_columns(
                        pl.col(col).str.strptime(pl.Date, strict=False, format=date_format).alias(col)
                    )
                    print(f"  Converted '{col}' column to Date.")
                else:
                    print(f"  Column '{col}' is already a Date, skipping conversion.")

    # Ensure ID columns are of type String
    if "ID" in data_column_mapping:
        for col in data_column_mapping["ID"]:
            if col in df.columns:
                df = df.with_columns(pl.col(col).cast(pl.String).alias(col))
                print(f"  Converted '{col}' to String.")     

    # Ensure Target and Regressor columns are of type Float64
    numerical_cols = []
    if "Target" in data_column_mapping:
        numerical_cols.extend(data_column_mapping["Target"])
    if "Regressor" in data_column_mapping:
        numerical_cols.extend(data_column_mapping["Regressor"])

    for col in numerical_cols:
        if col in df.columns:
            # Check if the column is already a numeric type.
            # If not, attempt to cast to Float64 as a general numeric type.
            # This prevents converting integers to floats if they are already correctly typed.
            if not df[col].dtype.is_numeric():
                df = df.with_columns(pl.col(col).cast(pl.Float64).alias(col))
                print(f"  Converted non-numeric '{col}' to Float64.")
            else:
                print(f"  '{col}' is already numeric (type: {df[col].dtype}). No conversion needed.")

    print("\nDataFrame Schema after Pre-processing:")
    print(df.schema)         

    return df               


In [5]:

    
def InferDataFrequency(df: pl.DataFrame, date_col: str, id_col: str) -> Optional[float]:
    """
    Infers the most common frequency (in days) of the data based on the dates for each ID.

    Args:
        df (pl.DataFrame): The input DataFrame.
        date_col (str): The name of the date column.
        id_col (str): The name of the identifier column to group by.

    Returns:
        Optional[float]: The inferred frequency in days, or None if it cannot be determined.
    """
    
    # We calculate the difference in days between consecutive dates for each ID
    # and find the most common difference (mode) to infer the frequency.
    freq_df = (
        df.select([pl.col(id_col), pl.col(date_col)])
        .sort(id_col, date_col)
        .group_by(id_col)
        .agg(pl.col(date_col).diff().dt.total_days())
        .explode(date_col)
        .drop_nulls()
    )
    
    if not freq_df.is_empty():
        frequency = freq_df['Date'].mode()[0]
        print(f"  Inferred data frequency: {frequency} days.")
    else:
        frequency = None
        print(" Could not determine data frequency.")
        raise ValueError("Could not determine data frequency.")
    
    return frequency

def CreateMonthCyclicalEncoding(df: pl.DataFrame, date_col: str) -> pl.DataFrame:
    """
    Creates cyclical features (sin, cos) for the month of the year.

    Args:
        df (pl.DataFrame): The input DataFrame.
        date_col (str): The name of the date column.

    Returns:
        pl.DataFrame: The DataFrame with added month_sin and month_cos columns.
    """
        
    # Month of Year
    df = df.with_columns([
        pl.col(date_col).dt.month().alias("month_of_year")
    ])
    df = df.with_columns([
        (np.pi * 2 * pl.col("month_of_year") / 12).sin().alias("month_sin"),
        (np.pi * 2 * pl.col("month_of_year") / 12).cos().alias("month_cos")
    ])
    df = df.drop(["month_of_year"])

    return df

def CreateWeekCyclicalEncoding(df: pl.DataFrame, date_col: str) -> pl.DataFrame:
    """
    Creates cyclical features (sin, cos) for the week of the year.

    Args:
        df (pl.DataFrame): The input DataFrame.
        date_col (str): The name of the date column.

    Returns:
        pl.DataFrame: The DataFrame with added week_sin and week_cos columns.
    """
    
    df = df.with_columns([
        pl.col(date_col).dt.week().alias("week_of_year")
    ])
    df = df.with_columns([
        (np.pi * 2 * pl.col("week_of_year") / 52).sin().alias("week_sin"),
        (np.pi * 2 * pl.col("week_of_year") / 52).cos().alias("week_cos")
    ])
    df = df.drop(["week_of_year"])
    
    return df

def CreateDayCyclicalEncoding(df: pl.DataFrame, date_col: str) -> pl.DataFrame:
    """
    Creates cyclical features (sin, cos) for the day of the year.

    Args:
        df (pl.DataFrame): The input DataFrame.
        date_col (str): The name of the date column.

    Returns:
        pl.DataFrame: The DataFrame with added day_of_year_sin and day_of_year_cos columns.
    """
        
    # Day of Week
    df = df.with_columns([
        pl.col(date_col).dt.ordinal_day().alias("day_of_year")
    ])
    df = df.with_columns([
        (np.pi * 2 * pl.col("day_of_year") / 365.25).sin().alias("day_sin"),
        (np.pi * 2 * pl.col("day_of_year") / 365.25).cos().alias("day_cos")
    ])
    df = df.drop("day_of_year")

    return df

def create_time_features(df: pl.DataFrame, date_col: str, id_col: str) -> pl.DataFrame:
    """
    Creates time-series features based on the inferred data frequency.

    Args:
        df (pl.DataFrame): The input DataFrame.
        date_col (str): The name of the date column.
        id_col (str): The name of the identifier column.

    Returns:
        pl.DataFrame: The DataFrame with added time-based features.
    """
    print("\n--- Creating Time-wise Features ---")
   
    frequency = InferDataFrequency(df, date_col, id_col)

    # --- Feature Creation ---
    if frequency <= 31:
        df = CreateMonthCyclicalEncoding(df, date_col)
        print(" Created 'month_sin' and 'month_cos' features.")
    else:
        print(" Skipping 'month' features due to higher frequency.")

    # Week of Year
    if frequency <= 7:
        df = CreateWeekCyclicalEncoding(df, date_col)
        print(" Created 'week_sin' and 'week_cos' features.")
    else:
        print(" Skipping 'week' features due to higher frequency.")
    
    # Day of Year
    if frequency <= 1:
        df = CreateDayCyclicalEncoding(df, date_col)
        print(" Created 'day_of_year_sin' and 'day_of_year_cos' features.")
    else:
        print(" Skipping 'day_of_year' features due to higher frequency.")
        
    return df, frequency

In [6]:

def FeatureEngineering(df: pl.DataFrame) -> pl.DataFrame:
    """
    Main function to orchestrate the feature engineering process.

    Args:
        df (pl.DataFrame): Preprocessed DataFrame.

    Returns:
        pl.DataFrame: DataFrame with all new features.
    """
    print("\n--- Starting Feature Engineering ---")
    
    id_col = data_column_mapping.get("ID", [None])[0]
    date_col = data_column_mapping.get("Date", [None])[0]
    target_col = data_column_mapping.get("Target", [None])[0]

    if not target_col or target_col not in df.columns:
        raise ValueError("Target column is not defined or does not exist in the DataFrame.")
    if not date_col or date_col not in df.columns:
        raise ValueError("Date column is not defined or does not exist in the DataFrame.")
    if not id_col or id_col not in df.columns:
        raise ValueError("ID column is not defined or does not exist in the DataFrame.")    

    # Sorting is crucial for time-series features
    if id_col and date_col and id_col in df.columns and date_col in df.columns:
        df = df.sort(id_col, date_col)
        print(f" DataFrame sorted by '{id_col}' and '{date_col}'.")

    # Call feature creation functions
    df, frequency = create_time_features(df, date_col, id_col)
    # df = create_lags(df, target_col, id_col, lags = [1, 2, 3, 4])
    # df = create_rolling_averages(df, target_col, id_col, window_size = 4)
    
    print("\n--- Feature Engineering Complete ---")
    print("DataFrame Schema after Feature Engineering:")
    print(df.schema)
    
    return df, frequency

In [7]:
def train_model(df, frequency):

    id_col = data_column_mapping.get("ID", [None])[0]
    date_col = data_column_mapping.get("Date", [None])[0]
    target_col = data_column_mapping.get("Target", [None])[0]

    lgb_params = {
        'verbosity': -1,
        'num_leaves': 512,
    }

    sales_model = MLForecast(
        models={
            'pred': lgb.LGBMRegressor(**lgb_params)
        }, 
        freq='1w',
        # lags=[1,2,3,4], 
        lag_transforms={
            1: [RollingMean(window_size=4)],
        },
        # target_transforms=[Differences([24])],
    )

    sales_model.fit(df, 
                    id_col=id_col, 
                    time_col=date_col, 
                    target_col=target_col,
                    static_features=[])

    return sales_model

def save_model(sales_model, model_path = 'sales_forecast_model.pkl'):
    """
    Saves the trained MLForecast model to the specified path.
    
    Args:
        sales_model (MLForecast): The trained MLForecast model.
        model_path (str): The path where the model should be saved.
    """
    
    sales_model.save(model_path)

    print(f"Model saved to {model_path}")

def load_model( model_path = 'sales_forecast_model.pkl'):
    """
    load the trained MLForecast model from the specified path.
    
    Args:
         (str): base directory.
        model_path (str): The path from which the model should be loaded.
    """
    
    sales_model = MLForecast.load(model_path)

    return sales_model

In [None]:
def train_forecast_model():

    df = GetTrainingData()

    df = PreProcessDataType(df)

    # Feature creation 
    df, frequency = FeatureEngineering(df)

    # Train and save the model
    sales_model = train_model(df, frequency)
    save_model(sales_model)

    print("\n--- Forecast Model Training & Saving Complete ---")

def predict_using_forecast_model(h):

    # get future weeks exogenous data
    df = GetTestData()

    df = PreProcessDataType(df)

    # Feature creation 
    df, frequency = FeatureEngineering(df)

    target_col = data_column_mapping.get("Target", [None])[0]
    X_df = df.drop(target_col)

    sales_model = load_model()

    # SRMIST: h is the number of weeks ahead that we are forecasting. 
    preds = sales_model.predict(h=h, X_df=X_df)

    # SRMIST: This should be written into the DB. 
    preds.write_csv('predictions.csv')
    
    print("\n--- Predictions done and saved. ---")    

In [None]:
# SRMIST: This function will have to be scheduled and we also should have an option to get this run on request. 
# SRMIST: This function creates and saves the model. 
train_forecast_model()


--- Pre-processing Data Types ---
  Converted 'Date' column to Date.
  Converted 'Store' to String.
  'Weekly_Sales' is already numeric (type: Float64). No conversion needed.
  'Holiday_Flag' is already numeric (type: Int64). No conversion needed.
  'Temperature' is already numeric (type: Float64). No conversion needed.
  'Fuel_Price' is already numeric (type: Float64). No conversion needed.
  'CPI' is already numeric (type: Float64). No conversion needed.
  'Unemployment' is already numeric (type: Float64). No conversion needed.

DataFrame Schema after Pre-processing:
Schema({'Store': String, 'Date': Date, 'Weekly_Sales': Float64, 'Holiday_Flag': Int64, 'Temperature': Float64, 'Fuel_Price': Float64, 'CPI': Float64, 'Unemployment': Float64})

--- Starting Feature Engineering ---
 DataFrame sorted by 'Store' and 'Date'.

--- Creating Time-wise Features ---
  Inferred data frequency: 7 days.
 Created 'month_sin' and 'month_cos' features.
 Created 'week_sin' and 'week_cos' features.
 Ski

In [None]:
# SRMIST: This function generates the prediction. 
# SRMIST: h is the number of weeks ahead that we want to forecast.
h = 4
predict_using_forecast_model(h)


--- Pre-processing Data Types ---
  Converted 'Date' column to Date.
  Converted 'Store' to String.
  'Weekly_Sales' is already numeric (type: Float64). No conversion needed.
  'Holiday_Flag' is already numeric (type: Int64). No conversion needed.
  'Temperature' is already numeric (type: Float64). No conversion needed.
  'Fuel_Price' is already numeric (type: Float64). No conversion needed.
  'CPI' is already numeric (type: Float64). No conversion needed.
  'Unemployment' is already numeric (type: Float64). No conversion needed.

DataFrame Schema after Pre-processing:
Schema({'Store': String, 'Date': Date, 'Weekly_Sales': Float64, 'Holiday_Flag': Int64, 'Temperature': Float64, 'Fuel_Price': Float64, 'CPI': Float64, 'Unemployment': Float64})

--- Starting Feature Engineering ---
 DataFrame sorted by 'Store' and 'Date'.

--- Creating Time-wise Features ---
  Inferred data frequency: 7 days.
 Created 'month_sin' and 'month_cos' features.
 Created 'week_sin' and 'week_cos' features.
 Ski