## **MITSUI Commodity Prediction - Inference**
This notebook performs inference using pre-trained models and artifacts.

**Data Provenance 1:** The models and scaler used in this notebook are sourced from the following private Kaggle Dataset, which was manually created from the output of the training notebook.

**Dataset Name 1**: models-scaler-lstm-mi-rf-allhist

**Link to Training Code:** The artifacts were generated by Version 25 of the "AAY_Mitsui_LSTM_training_MI_RF" notebook, available here:  https://www.kaggle.com/code/aliaydnyldz/aay-mitsui-lstm-training-mi-rf/notebook?scriptVersionId=266096138

*****************************

**Data Provenance 2:** The feature selection dictionary used in this notebook are sourced from the following private Kaggle Dataset, which was manually created from the output of the training notebook.

**Dataset Name 2:**  mi-rf-feature-selection

**Link to Training Code:** The artifacts were generated by Version 12 of the "AAY_Mitsui_LSTM_training_MI_RF" notebook, available here:  https://www.kaggle.com/code/aliaydnyldz/aay-mitsui-lstm-training-mi-rf/notebook?scriptVersionId=265588771

# Initial Setup

In [1]:
# ------------------ Setup ------------------
import os
import gc
import warnings
warnings.filterwarnings('ignore')
import numpy as np
import pandas as pd
import polars as pl
from tqdm.auto import tqdm
import random
from sklearn.linear_model import LinearRegression
from sklearn.linear_model import Ridge
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.feature_selection import mutual_info_regression
from sklearn.ensemble import RandomForestRegressor
from sklearn.preprocessing import StandardScaler
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Input, Dropout
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.models import load_model
from tqdm import tqdm
import kaggle_evaluation.mitsui_inference_server
import joblib
import json
import pickle
import psutil
import time
import traceback
import concurrent.futures

# Global variables
timesteps =5
n_future=4
emw_lookback=26
prediction_call_counter = 0

# ------------------ Data Loading ------------------
train_df = pl.read_csv('/kaggle/input/mitsui-commodity-prediction-challenge/train.csv').to_pandas()
train_labels_df = pl.read_csv('/kaggle/input/mitsui-commodity-prediction-challenge/train_labels.csv').to_pandas()
target_pairs_df = pl.read_csv('/kaggle/input/mitsui-commodity-prediction-challenge/target_pairs.csv').to_pandas()
test_df = pl.read_csv('/kaggle/input/mitsui-commodity-prediction-challenge/test.csv').to_pandas()

# Remove last 90 rows from training data
train_df_test = train_df[1870:]
train_df= train_df[:1870]

train_labels_train = train_labels_df[:1870]
train_labels_test = train_labels_df[1870:]

# 2. Global constants for paths
FEATURES_PATH = '/kaggle/input/mi-rf-feature-selection'
MODEL_SCALER_DATASET_PATH = '/kaggle/input/models-scaler-lstm-mi-rf-allhist'


# --- Initialize Global Caches and History ---
# These will persist across all calls to the predict() function
models_cache = {}
scalers_cache = {}
feature_dict_cache = {}
history_df = None 
all_features_cache = None
scaled_features_cache = None

2025-10-06 23:50:13.563989: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1759794613.979805      19 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1759794614.098164      19 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


# Data pre-processing

In [2]:
#drop "US_Stock_GOLD* columns as too few data points (and nothing to test)"
train_df.drop(columns=[
'US_Stock_GOLD_adj_open',
'US_Stock_GOLD_adj_high',
'US_Stock_GOLD_adj_low',
'US_Stock_GOLD_adj_close',
'US_Stock_GOLD_adj_volume'], inplace=True)

#fill the gaps on the time series with the most recent data point available, then the soonest
train_df.fillna(method='ffill', inplace=True)
train_df.fillna(method='bfill', inplace=True)
train_df.fillna(0, inplace=True)

test_df.fillna(method='ffill', inplace=True)
test_df.fillna(method='bfill', inplace=True)
train_df.fillna(0, inplace=True)

# Log Memory Usage

In [3]:
def log_memory_usage(stage_name=""):
    """
    Logs the current memory usage of the process to the console.
    """
    process = psutil.Process(os.getpid())
    memory_gb = process.memory_info().rss / (1024 ** 3)  # Resident Set Size in GB
    print(f"--- RAM USAGE at '{stage_name}': {memory_gb:.3f} GB ---")

# Return Functions

In [4]:
# ------------------ Utility Functions ------------------
def generate_log_returns_forward(data, lag):
    log_returns = pd.Series(np.nan, index=data.index)
    for t in range(len(data)-lag):
        try:
            log_returns.iloc[t] = np.log(data.iloc[t + 1 + lag ] / data.iloc[t + 1])
        except Exception:
            log_returns.iloc[t] = np.nan
    return log_returns


def generate_targets(column_a: pd.Series, lag: int, column_b: pd.Series = None) -> pd.Series:
    a_returns = generate_log_returns_forward(column_a, lag)
    if column_b is not None:
        b_returns = generate_log_returns_forward(column_b, lag)
        return a_returns - b_returns
    else:
        return a_returns

def generate_log_returns_backward(data, lag):
    log_returns = pd.Series(np.nan, index=data.index)
    for t in range(lag,len(data)):
        try:
            log_returns.iloc[t] = np.log(data.iloc[t] / data.iloc[t-lag])
        except Exception:
            log_returns.iloc[t] = np.nan
    return log_returns

def generate_differences_backward(data, lag):
    differences = pd.Series(np.nan, index=data.index)
    for t in range(lag,len(data)):
        try:
            differences.iloc[t] = data.iloc[t] - data.iloc[t-lag]
        except Exception:
            differences.iloc[t] = np.nan
    return differences

# Seed everything

In [5]:
import random

def seed_everything(seed):
    random.seed(seed)
    np.random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
seed_everything(42)

# Feature Engineering functions

In [6]:
def generate_features_returns (feature_df:pd.DataFrame)->pd.DataFrame:

    log_memory_usage("Before feature engineering")

    # First, downcast the input DataFrame to save memory from the start
    for col in feature_df.select_dtypes(include=['float64']).columns:
        feature_df[col] = feature_df[col].astype('float32')
    for col in feature_df.select_dtypes(include=['int64']).columns:
        feature_df[col] = feature_df[col].astype('int32')

    print("generating feature returns:")
    # ... code for feature returns ...
    
    print("generating feature returns: \n\n")
    
    if 'date_id' in feature_df.columns:
        feature_df.drop(columns=['date_id'], inplace=True)

    if 'is_scored' in feature_df.columns:
        feature_df.drop(columns=['is_scored'], inplace=True)
    
    return_feature_df = pd.DataFrame(index=feature_df.index).astype('float32')
    fx_return_feature_df = pd.DataFrame(index=feature_df.index).astype('float32')
    volume_feature_df = pd.DataFrame(index=feature_df.index).astype('float32')
    oi_feature_df = pd.DataFrame(index=feature_df.index).astype('float32')
    open_price_df = pd.DataFrame(index=feature_df.index).astype('float32')
    close_price_df = pd.DataFrame(index=feature_df.index).astype('float32')
    low_price_df = pd.DataFrame(index=feature_df.index).astype('float32')
    high_price_df = pd.DataFrame(index=feature_df.index).astype('float32')
    volume_df = pd.DataFrame(index=feature_df.index).astype('float32')
    fx_rate_df= pd.DataFrame(index=feature_df.index).astype('float32')

    #calculate 1-day log returns for different lags, fillna
    for col  in tqdm(feature_df, total=len(feature_df.columns)):
        if ('volume' in col) | ('Volume' in col):
            volume_feature_df[col] = feature_df[col]
            volume_feature_df[col].fillna(method='ffill', inplace=True)
            volume_feature_df[col].fillna(method='bfill', inplace=True)

            if 'volume' in col:
                colNew= col.replace('_volume', '')
            elif 'Volume' in col:
                colNew= col.replace('_Volume', '')

            volume_df[colNew] = feature_df[col]
            volume_df[colNew].fillna(method='ffill', inplace=True)
            volume_df[colNew].fillna(method='bfill', inplace=True)

        elif 'open_interest' in col:
            oi_feature_df[col]=generate_differences_backward(feature_df[col], 1)
            oi_feature_df[col].fillna(method='ffill', inplace=True)
            oi_feature_df[col].fillna(method='bfill', inplace=True)

        elif ('close' in col) | ('Close' in col): #price data

            return_feature_df[col] = generate_log_returns_backward(feature_df[col], 1)
            return_feature_df[col].fillna(0, inplace=True)

            if 'close'in col:
              colNew= col.replace('_close', '')
            elif 'Close'in col:
              colNew= col.replace('_Close', '')

            close_price_df[colNew] = feature_df[col]
            close_price_df[colNew].fillna(method='ffill', inplace=True)
            close_price_df[colNew].fillna(method='bfill', inplace=True)

        elif 'FX_' in col: #FX rate data

            fx_return_feature_df[col] = generate_log_returns_backward(feature_df[col], 1)
            fx_return_feature_df[col].fillna(0, inplace=True)

            fx_rate_df[col] = feature_df[col]
            fx_rate_df[col].fillna(method='ffill', inplace=True)
            fx_rate_df[col].fillna(method='bfill', inplace=True)

        elif ('open'in col) | ('Open'in col):

            if 'open'in col:
              colNew= col.replace('_open', '')
            elif 'Open'in col:
              colNew= col.replace('_Open', '')

            open_price_df[colNew] = feature_df[col]
            open_price_df[colNew].fillna(method='ffill', inplace=True)
            open_price_df[colNew].fillna(method='bfill', inplace=True)

        elif ('low'in col) | ('Low'in col):

            if 'low'in col:
              colNew= col.replace('_low', '')
            elif 'Low'in col:
              colNew= col.replace('_Low', '')

            low_price_df[colNew] = feature_df[col]
            low_price_df[colNew].fillna(method='ffill', inplace=True)
            low_price_df[colNew].fillna(method='bfill', inplace=True)

        elif ('high'in col) | ('High'in col):

            if 'high'in col:
              colNew= col.replace('_high', '')
            elif 'High'in col:
              colNew= col.replace('_High', '')

            high_price_df[colNew] = feature_df[col]
            high_price_df[colNew].fillna(method='ffill', inplace=True)
            high_price_df[colNew].fillna(method='bfill', inplace=True)

    # ... code for price structure ...
    log_memory_usage("Before price structure features") # <<< KEY ADDITION
    
    print ("generating price structure and volatility features", "\n")
    
    dailyPriceRange= (high_price_df - low_price_df).astype('float32')
    dailyPriceRange.columns = [col + '_dailyPriceRange' for col in dailyPriceRange.columns]

    intradayReturn= (open_price_df - close_price_df).astype('float32')
    intradayReturn.columns = [col + '_intradayReturn' for col in intradayReturn.columns]

    positionInRange= ((close_price_df - low_price_df) / (high_price_df - low_price_df)).astype('float32')
    positionInRange.columns = [col + '_positionInRange' for col in positionInRange.columns]

    bodyToShadowRatio= (close_price_df - open_price_df) / (high_price_df - low_price_df).astype('float32')
    bodyToShadowRatio.columns = [col + '_bodyToShadowRatio' for col in bodyToShadowRatio.columns]

    log_memory_usage("Before creating momentum indicators") 
    
    print ("creating momentum indicators", '\n')

    momentumIndicators = create_technical_indicator_features(close_price_df, volume_df).astype('float32')
    fx_momentumIndicators = create_technical_indicator_features(fx_rate_df).astype('float32')

    log_memory_usage("After momentum indicators") 
    
    print ("momentum indicators completed", '\n')
    
   # Start with the first DataFrame as the base
    all_feature_df = return_feature_df.copy()
    del return_feature_df
    gc.collect()
    log_memory_usage("After adding return_feature_df")
    
    # Concatenate and delete one by one
    all_feature_df = pd.concat([all_feature_df, fx_return_feature_df], axis=1)
    del fx_return_feature_df
    gc.collect()
    log_memory_usage("After adding fx_return_feature_df")
    
    all_feature_df = pd.concat([all_feature_df, volume_feature_df], axis=1)
    del volume_feature_df
    gc.collect()
    log_memory_usage("After adding volume_feature_df")
    
    all_feature_df = pd.concat([all_feature_df, oi_feature_df], axis=1)
    del oi_feature_df
    gc.collect()
    log_memory_usage("After adding oi_feature_df")
    
    all_feature_df = pd.concat([all_feature_df, dailyPriceRange], axis=1)
    del dailyPriceRange
    gc.collect()
    log_memory_usage("After adding dailyPriceRange")
    
    all_feature_df = pd.concat([all_feature_df, intradayReturn], axis=1)
    del intradayReturn
    gc.collect()
    log_memory_usage("After adding intradayReturn")
    
    all_feature_df = pd.concat([all_feature_df, positionInRange], axis=1)
    del positionInRange
    gc.collect()
    log_memory_usage("After adding positionInRange")
    
    all_feature_df = pd.concat([all_feature_df, bodyToShadowRatio], axis=1)
    del bodyToShadowRatio
    gc.collect()
    log_memory_usage("After adding bodyToShadowRatio")
    
    all_feature_df = pd.concat([all_feature_df, momentumIndicators], axis=1)
    del momentumIndicators
    gc.collect()
    log_memory_usage("After adding momentumIndicators")
    
    all_feature_df = pd.concat([all_feature_df, fx_momentumIndicators], axis=1)
    del fx_momentumIndicators
    gc.collect()
    log_memory_usage("After final concatenation")
    
    print("All features combined successfully.")

    
    for col in all_feature_df.columns:
        if all_feature_df[col].dtype == 'float64':
            all_feature_df[col] = all_feature_df[col].astype('float32')

    all_feature_df.fillna(method='ffill', inplace=True)
    all_feature_df.fillna(method='bfill', inplace=True)

    #remove columns which have all NaN values
    all_feature_df.dropna(axis=1, how='all', inplace=True)

    return all_feature_df

In [7]:
def create_technical_indicator_features(close_price_df, volume_df=None):
    """
    Engineers momentum features from a DataFrame with 'Close' and 'Volume'.
    """

    """Check if there is volume time series corresponding to the priceSeries

    1. replace "_close" or "_Close" at the end of the priceSeries name with "_volume" , call it volumeName

    2. check if volumeName existis in scaledFeatures columns
    """

    df_macd=pd.DataFrame(index=close_price_df.index)
    df_rsi=pd.DataFrame(index=close_price_df.index)
    df_momentum_x_volume=pd.DataFrame(index=close_price_df.index)

    #for priceSerie in priceSeries:
    # 1. Price Rate of Change (10-day and 21-day momentum)
    momentum_10d = close_price_df.pct_change(periods=10).astype('float32')
    momentum_21d = close_price_df.pct_change(periods=21).astype('float32')

    # 2. MACD
    ema_12 = close_price_df.ewm(span=12, adjust=False).mean().astype('float32')
    ema_26 = close_price_df.ewm(span=emw_lookback, adjust=False).mean().astype('float32')
    macd= ema_12 - ema_26
    df_macd = macd.ewm(span=9, adjust=False).mean().astype('float32')
    df_macd.columns = [col + '_macd' for col in df_macd.columns]

    # 3. Relative Strength Index (RSI)
    delta = close_price_df.diff().astype('float32')
    gain = (delta.where(delta > 0, 0)).rolling(window=14).mean().astype('float32')
    loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean().astype('float32')
    rs = gain / loss
    df_rsi= 100 - (100 / (1 + rs))
    df_rsi.columns = [col + '_rsi' for col in df_rsi.columns]


    # 4. Volume-Adjusted Momentum
    if volume_df is not None:
      volume_ma_20d = volume_df.rolling(window=20).mean().astype('float32')
      df_momentum_x_volume = momentum_10d * (volume_df / volume_ma_20d)
      df_momentum_x_volume.columns = [col + '_momentum_x_volume' for col in df_momentum_x_volume.columns]

    df_techIndicators=pd.concat([df_macd, df_rsi, df_momentum_x_volume], axis=1)

    return df_techIndicators

# Multistep Data Set for LSTM

In [8]:
def create_multi_step_dataset(feature_series,timesteps, n_future ,target_series=None) -> tuple[np.ndarray, np.ndarray] | np.ndarray:
    """
    Creates a dataset for multi-step forecasting.

    Args:
        feature_series: Series of input features.
        target_series: Series of target values, or None.
        timesteps: The lookback window size.
        n_future: The number of future steps to predict.
    """
    print("generating multi-step dataset: \n\n")
    print("shape of the input features_series: ", feature_series.shape)

    if target_series is not None:
        target_name = target_series.name
         #find the 'lag' column for the row where 'target_name' == target_name on the target_pairs dataframe
        lag = target_pairs_df[target_pairs_df['target'] == target_name]['lag'].iloc[0]
         #find the 'pair' column for the row where 'target_name' == target_name on the target_pairs dataframe
        pair = target_pairs_df[target_pairs_df['target'] == target_name]['pair'].iloc[0]

        if ' - ' in pair:
          a, b = pair.split(' - ')
          target_series_replicated = generate_targets(train_df[a],1,train_df[b])

        else:
          target_series_replicated=generate_targets(train_df[pair],1)

        target_series_replicated.fillna(0, inplace=True)

    X = []
    y = []


    if target_series is None: # this is only for inference / prediction
        for i in range(len(feature_series)-1, len(feature_series)):
            # Input sequence (past data)
            X.append(feature_series[i-timesteps:i])
        X_array = np.array(X)
        return X_array
    else:
        # Adjust the loop to ensure there's enough data for the future sequence
        for i in range(timesteps, len(feature_series) - n_future):
            # Input sequence (past data)
            X.append(feature_series[i-timesteps:i])    
            # Output sequence (future data)
            y.append(target_series_replicated[i-timesteps : i-timesteps+n_future].to_numpy())
        X_array = np.array(X)
        y_array = np.array(y)
        return X_array, y_array
    
        

# Predict function

In [9]:
def predict(test: pl.DataFrame,
    label_lags_1_batch: pl.DataFrame,
    label_lags_2_batch: pl.DataFrame,
    label_lags_3_batch: pl.DataFrame,
    label_lags_4_batch: pl.DataFrame,
    ) -> pl.DataFrame | pd.DataFrame:
    
    
    # --- Capture original structure ---
    original_index = test.to_pandas().index
    current_date_id = test.to_pandas()['date_id'].iloc[0]
    # ----------------------------------

    test_df_new = test.to_pandas()
    
    print ('test_df shape from server is:', test.shape, '\n' )
    print ('test df head:', test.head(), '\n\n' )
    print ('test df tail:', test.tail(), '\n\n' )
    print ('label_lags_1_batch shape:', label_lags_1_batch.shape, '\n\n' )
    print ('label_lags_2_batch shape:', label_lags_2_batch.shape, '\n\n' )
    print ('label_lags_3_batch shape:', label_lags_3_batch.shape, '\n\n' )
    print ('label_lags_4_batch shape:', label_lags_4_batch.shape, '\n\n' )
    
    # --- Tell the function we intend to modify the global variables ---
    global models_cache, scalers_cache, feature_dict_cache, history_df, all_features_cache, scaled_features_cache, prediction_call_counter

    prediction_call_counter+=1
    # Get the date_id from the incoming data
    current_date_id = test.to_pandas()['date_id'].iloc[0]
    
    print(f"--- Prediction Call #{prediction_call_counter} | Processing Date ID: {current_date_id} ---")
    
    try:
        # --- 1. LOAD STATIC ARTIFACTS (on first run only) ---
        if not feature_dict_cache:
            with open(f'{FEATURES_PATH}/featureSetDict_MI_RF.json', 'r') as f: feature_dict_cache['main'] = json.load(f)
        if not scalers_cache:
            with open(f'{MODEL_SCALER_DATASET_PATH}/scaler_fit_train.pkl', 'rb') as f: scalers_cache['main'] = pickle.load(f)
        
        featureSetDict = feature_dict_cache['main']
        scaler_train = scalers_cache['main']
        
        if not models_cache:
            # 3. Eagerly load all 424 models into the global cache
            models_dir = os.path.join(MODEL_SCALER_DATASET_PATH, 'models')
            print(f"Eagerly loading all models from: {models_dir}")
            start_time = time.time()
            
            if os.path.exists(models_dir):
                for file_name in os.listdir(models_dir):
                    if file_name.endswith('_model.keras'):
                        target_name = file_name.replace('_model.keras', '')
                        model_path = os.path.join(models_dir, file_name)
                        models_cache[target_name] = load_model(model_path)
                        end_time = time.time()
                    else:
                        raise FileNotFoundError(f"FATAL ERROR: Models directory not found at {models_dir}")
                print(f"Successfully loaded {len(models_cache)} models in {end_time - start_time:.2f} seconds.") 
        
        if history_df is None:
            print ('no time series history, attempt to generate one ')
            history_df = train_df[train_df['date_id']<current_date_id] 
            print ('size of history_df before current prediction date: ',history_df.shape)
            if 'is_scored' not in history_df.columns:
                print("Adding 'is_scored' column to train_df.")
                history_df['is_scored'] = False
        
        # Subsequent runs, append new data and maintain history length
        history_df = pd.concat([history_df, test_df_new], ignore_index=True)
        print ('size of history_df before truncating: ',history_df.shape)
        
        # Ensure we don't keep more history than needed
        history_df = history_df.iloc[-(emw_lookback + 5):,:] # Keep a small buffer
        print ('emw_lookback: ',emw_lookback)
        print ('size of history_df after truncating: ',history_df.shape)
        
        # Now, create the DataFrame for this prediction run
        test_df_all = history_df.copy()
        test_df_all.fillna(method='ffill', inplace=True)
        test_df_all.fillna(method='bfill', inplace=True)
        test_df_all.fillna(0, inplace=True)
        
         # #if any of test_df_all.columns begins with "US_Stock_GOLD_adj_", remove that column
        for col in test_df_all.columns:
            if col.startswith("US_Stock_GOLD_adj_"):
                test_df_all.drop(columns=[col], inplace=True)

        print ('test_df_all head():\n',test_df_all.head(), '\n' )
        print ('is scored column:\n ', test_df_all['is_scored'])

        last_is_scored = test_df_all['is_scored'].iloc[-1]
        
        if not last_is_scored:
            # If the current date is not scored, we don't need to predict.
            # Return a default DataFrame of zeros immediately.
            print(f"SKIP Predictions: The date is not scored.")
            target_columns = target_pairs_df['target'].tolist()
            return pl.DataFrame({col: 0.0 for col in target_columns})
        
        if scaled_features_cache  is None:
            allFeatureReturns_test= generate_features_returns(test_df_all)
            allFeatureReturns_test_cut=allFeatureReturns_test[-(timesteps+1):] # need the last "timestep" of observations for multi-data set
            scaledFeatures_test = scaler_train.transform(allFeatureReturns_test_cut)
            scaledFeatures_test_df = pd.DataFrame(scaledFeatures_test, columns=allFeatureReturns_test.columns)
            print('shape of scaledFeatures_test_df :/n/n')
            print(scaledFeatures_test_df.shape)
            # Store in the global cache for the next run
            scaled_features_cache = scaledFeatures_test_df.copy()            
        else:
            print("Using cached features.")
            scaledFeatures_test_df = scaled_features_cache
            
        
        t_it= time.time()
        def predict_for_single_target(target_info, features_df, feature_map):
            """Processes a single target."""
            target_name, lag, pair = target_info
            # 1. Prepare data for this specific target
            scaledFeatureSubset_test = features_df[feature_map[target_name]]
            print('shape of scaledFeatureSubset_test into multi-step:/n/n')
            print(scaledFeatureSubset_test.shape)
            X_lstm = create_multi_step_dataset(scaledFeatureSubset_test, timesteps=timesteps, n_future=n_future)
            print ('predictor shape: ',X_lstm.shape)
            model = models_cache[target_name]
            # 3. Predict
            pred_returns = model.predict(X_lstm)
            print ('prediction shape: ',pred_returns.shape)
            # 4. Aggregate
            pred_return = 0
            for j in range(0, lag):
                pred_return += pred_returns[-1, j]
            return target_name, pred_return
        
        # --- Parallel Execution ---
        predictions = {}
        # Get a list of tuples, one for each target
        target_info_list = [(row['target'], row['lag'], row['pair']) for i, row in target_pairs_df.iterrows()]
            
        # Use a thread pool to execute predictions in parallel (4 workers for 4 CPU cores)
        with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
            # map applies the function to each item in the list and returns the results
            future_to_target = {executor.submit(predict_for_single_target, info, scaledFeatures_test_df, featureSetDict):
                                info for info in target_info_list}
            for future in concurrent.futures.as_completed(future_to_target):
                try:
                    target_name, pred_return = future.result()
                    predictions[target_name] = pred_return
                except Exception as exc:
                    failed_target_info = future_to_target[future]
                    print(f"Target '{failed_target_info[0]}' generated an exception: {exc}")
                    predictions[failed_target_info[0]] = 0.0
        # --- End of Parallel Execution ---

         # --- FINAL RECONSTRUCTION: COLUMNS AND INDEX ---
        
        print("Step 7.0: Constructing final ordered prediction DataFrame...")
    
        sorted_target_names = sorted(predictions.keys(), key=lambda name: int(name.split('_')[1]))
        
        final_predictions_df = pd.DataFrame(
            {name: [predictions[name]] for name in sorted_target_names}
        ).astype('float64')
        
        final_predictions_df.index = [int(current_date_id)]
        final_predictions_df.index.name = 'date_id'

        # --- END OF RECONSTRUCTION ---

        print("Step 7.1: Final DataFrame reconstruction complete.")
        print(f"--> Final DF shape: {final_predictions_df.shape}")
        print(f"--> Final DF columns: {final_predictions_df.columns.tolist()}")
        print(f"--> Final DF index: {final_predictions_df.index}")

        assert isinstance(final_predictions_df, (pd.DataFrame, pl.DataFrame))
        assert len(final_predictions_df) == 1
        print(">>> Prediction function finished successfully.")
        t_it_end= time.time()
        print ('TOTAL TIME TAKEN for predictions' , t_it_end-t_it)
        return final_predictions_df
        
    except Exception as e:
        # --- THE ULTIMATE SAFETY NET ---
        print("\n!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
        print("!!! A FATAL ERROR WAS CAUGHT BY THE SAFETY NET !!!")
        print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n")
        traceback.print_exc() # This will print the full error to the logs.
        
        # Now, create and return a perfectly formatted default DataFrame.
        # This prevents the notebook from crashing.
        print(">>> Returning a default DataFrame to prevent a hard crash.")
        
        target_columns = target_pairs_df['target'].tolist()
        default_df = pd.DataFrame({col: [0.0] for col in target_columns}).astype('float64')
        
        # Try to set the index correctly, but have a fallback.
        try:
            current_date_id_for_error = test.to_pandas()['date_id'].iloc[0]
            default_df.index = [int(current_date_id_for_error)]
            default_df.index.name = 'date_id'
        except Exception:
            print(">>> Could not get date_id for error return. Using default index.")

        return default_df
        
server = kaggle_evaluation.mitsui_inference_server.MitsuiInferenceServer(predict)

# --- ADD THIS DEBUGGING BLOCK ---
print("--- ENVIRONMENT VARIABLE CHECK ---")
print(f"Value of KAGGLE_IS_COMPETITION_RERUN: {os.getenv('KAGGLE_IS_COMPETITION_RERUN')}")
print(f"Value of KAGGLE_KERNEL_RUN_TYPE:    {os.getenv('KAGGLE_KERNEL_RUN_TYPE')}")
print("------------------------------------")
# ------------------------------------

# Initialize your server object as before
server = kaggle_evaluation.mitsui_inference_server.MitsuiInferenceServer(predict)

# Check if this is the final submission run (highest priority)
if os.getenv('KAGGLE_IS_COMPETITION_RERUN') == 'true':
    print(">>> EXECUTING FINAL SUBMISSION RUN (server.serve())")
    server.serve()

# If not a final submission, check if we are in an interactive session
elif os.getenv('KAGGLE_KERNEL_RUN_TYPE') == 'Interactive':
    print(">>> EXECUTING INTERACTIVE RUN (server.run_local_gateway())")
    server.run_local_gateway(('/kaggle/input/mitsui-commodity-prediction-challenge',))

# If it's neither of the above, it's a 'Batch' run (a "Save & Commit")
else:
    print(">>> EXECUTING COMMIT RUN ('Batch'). Creating dummy file.")
    target_columns = target_pairs_df['target'].tolist()
    dummy_df = pd.DataFrame({col: [0.0] for col in target_columns})
    dummy_df.to_parquet('submission.parquet', index=False)


--- ENVIRONMENT VARIABLE CHECK ---
Value of KAGGLE_IS_COMPETITION_RERUN: None
Value of KAGGLE_KERNEL_RUN_TYPE:    Batch
------------------------------------
>>> EXECUTING COMMIT RUN ('Batch'). Creating dummy file.
