<a href="https://www.kaggle.com/code/nicholas33/drw-crypto-market-prediction-nb153?scriptVersionId=248237309" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

In [None]:
# DRW Crypto Market Prediction Competition Pipeline
import pandas as pd
import numpy as np
import warnings
warnings.filterwarnings('ignore')
import os # Import os for path checking
import gc # Import garbage collector
import math # For ceil in data generator

# Memory optimization and data processing
from sklearn.preprocessing import StandardScaler, RobustScaler
from sklearn.feature_selection import SelectKBest, f_regression, mutual_info_regression
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import mean_absolute_error

# Deep learning and modeling
import tensorflow as tf
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import (LSTM, ConvLSTM2D, Dense, Dropout,
                                     BatchNormalization, Input, Conv1D, MaxPooling1D,
                                     Flatten, Reshape, TimeDistributed)
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import Sequence # Import Keras Sequence for data generation

# Tree-based models for ensemble
import lightgbm as lgb
from scipy.stats import pearsonr

# Set memory growth for GPU (if available) - still good practice
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
    except RuntimeError as e:
        print(e)


class TimeSeriesSequence(Sequence):
    """
    Keras Sequence for loading time-series data in batches from Parquet files,
    preparing sequences for deep learning models.
    """
    def __init__(self, X_filepath, y_filepath, sequence_length, batch_size, feature_cols, total_samples, original_indices):
        self.X_filepath = X_filepath
        self.y_filepath = y_filepath
        self.sequence_length = sequence_length
        self.batch_size = batch_size
        self.feature_cols = feature_cols # List of feature column names
        self.original_indices = original_indices # The actual Pandas Index used when saving to Parquet

        # Calculate the number of samples that can form a full sequence
        # A sequence for label at index `i` requires features from `i - sequence_length + 1` to `i`.
        # So, the first label we can predict is at `sequence_length - 1` (0-indexed).
        self.num_predictable_samples = max(0, len(self.original_indices) - self.sequence_length + 1)

        print(f"TimeSeriesSequence initialized for {X_filepath}: "
              f"Total rows (from original_indices): {len(self.original_indices)}, "
              f"Predictable samples: {self.num_predictable_samples}, "
              f"Batch size: {self.batch_size}, Sequence length: {self.sequence_length}")

    def __len__(self):
        """Denotes the number of batches per epoch."""
        if self.num_predictable_samples == 0:
            return 0
        return math.ceil(self.num_predictable_samples / self.batch_size)

    def __getitem__(self, idx):
        """Generates one batch of data."""
        # Calculate the global indices of the *labels* for this batch
        # These are indices within the `self.original_indices` list.
        batch_label_start_idx_in_original_indices = idx * self.batch_size + (self.sequence_length - 1)
        batch_label_end_idx_in_original_indices = min(batch_label_start_idx_in_original_indices + self.batch_size, len(self.original_indices)) - 1

        if batch_label_start_idx_in_original_indices > batch_label_end_idx_in_original_indices:
            print(f"Warning: Batch {idx} - calculated label range is empty. Returning empty.")
            return np.array([]), np.array([])

        # Get the actual index values (from original Pandas Index) for the labels in this batch
        actual_label_indices = self.original_indices[batch_label_start_idx_in_original_indices : batch_label_end_idx_in_original_indices + 1]

        # Determine the full range of original indices needed to form sequences for these labels
        # The earliest feature index needed is for the start of the first sequence in this batch
        first_sequence_start_idx_in_original_indices = batch_label_start_idx_in_original_indices - (self.sequence_length - 1)

        # The latest feature index needed is the end of the last sequence in this batch
        last_sequence_end_idx_in_original_indices = batch_label_end_idx_in_original_indices

        # Get the actual original index values for the full feature chunk needed
        actual_feature_chunk_indices = self.original_indices[first_sequence_start_idx_in_original_indices : last_sequence_end_idx_in_original_indices + 1]

        if len(actual_feature_chunk_indices) == 0 or len(actual_label_indices) == 0:
            print(f"Warning: Batch {idx} - insufficient data to form sequences. Returning empty.")
            return np.array([]), np.array([])

        # Read the slice of X data using `filters` on the index column (named '__index_level_0__')
        # This approach is more robust than trying to infer row numbers when data might be sparse or not perfectly contiguous.
        X_batch_df = pd.read_parquet(
            self.X_filepath,
            columns=self.feature_cols,
            engine='pyarrow',
            filters=[('__index_level_0__', '>=', actual_feature_chunk_indices.min()),
                     ('__index_level_0__', '<=', actual_feature_chunk_indices.max())]
        )
        # Re-index X_batch_df to ensure it matches the order of actual_feature_chunk_indices
        # and contains only the rows explicitly needed, not extra rows between min/max filters.
        X_batch_df = X_batch_df.loc[actual_feature_chunk_indices]

        # Read the slice of Y data
        y_batch_df = pd.read_parquet(
            self.y_filepath,
            columns=['label'],
            engine='pyarrow',
            filters=[('__index_level_0__', '>=', actual_label_indices.min()),
                     ('__index_level_0__', '<=', actual_label_indices.max())]
        )
        # Re-index y_batch_df to ensure it matches the order of actual_label_indices
        y_batch_df = y_batch_df.loc[actual_label_indices]

        # Convert to numpy arrays
        X_batch_np = X_batch_df.values.astype(np.float32)
        y_batch_np = y_batch_df.values.flatten().astype(np.float32)

        del X_batch_df, y_batch_df # Free memory
        gc.collect()

        # Prepare sequences from the loaded chunk (X_batch_np)
        sequences = []
        targets = []

        # The labels (y_batch_np) directly correspond to the *end* of the sequences.
        # The number of sequences will be len(y_batch_np).
        # We need to construct sequences from X_batch_np such that each sequence corresponds
        # to a label in y_batch_np.

        # Calculate the starting offset within X_batch_np for the first sequence
        # X_batch_np starts at `actual_feature_chunk_indices.min()`
        # The first label is `actual_label_indices.min()`
        # Its sequence starts at `actual_label_indices.min() - self.sequence_length + 1`
        # So the offset is `(actual_label_indices.min() - self.sequence_length + 1) - actual_feature_chunk_indices.min()`
        first_sequence_start_offset_in_chunk = (actual_label_indices.min() - self.sequence_length + 1) - actual_feature_chunk_indices.min()

        if first_sequence_start_offset_in_chunk < 0:
             # This indicates misalignment or not enough look-back data in the chunk
             # Should be handled by `actual_feature_chunk_indices` construction, but as a safeguard:
             print(f"Warning: Negative offset for sequence start in batch {idx}. Adjusting to 0. This may indicate an issue.")
             first_sequence_start_offset_in_chunk = 0


        for i in range(len(y_batch_np)):
            # The current sequence starts at `first_sequence_start_offset_in_chunk + i`
            # and ends at `first_sequence_start_offset_in_chunk + i + self.sequence_length - 1`
            seq_start_in_chunk = first_sequence_start_offset_in_chunk + i
            seq_end_in_chunk = seq_start_in_chunk + self.sequence_length

            if seq_end_in_chunk > len(X_batch_np):
                # Should not happen with correct slicing, but safety check
                print(f"Warning: Not enough feature data in chunk for sequence {i} in batch {idx}. Breaking.")
                break

            sequences.append(X_batch_np[seq_start_in_chunk : seq_end_in_chunk])
            targets.append(y_batch_np[i])

        X_sequences = np.array(sequences).astype(np.float32)
        y_sequences = np.array(targets).astype(np.float32)

        if len(X_sequences) == 0:
            print(f"Warning: Batch {idx} resulted in 0 sequences. Returning empty.")
            return np.array([]).astype(np.float32).reshape(0, self.sequence_length, len(self.feature_cols)), \
                   np.array([]).astype(np.float32)

        del X_batch_np, y_batch_np # Clear memory after forming sequences
        gc.collect()

        return X_sequences, y_sequences


class CryptoMarketPredictor:
    def __init__(self, sequence_length=30, top_features=100, top_X_features_to_preselect=30):
        self.sequence_length = sequence_length
        self.top_features = top_features
        self.top_X_features_to_preselect = top_X_features_to_preselect
        self.scaler = RobustScaler()
        self.feature_selector = None
        self.selected_features = None # Stores final selected feature names
        self.models = {}
        # Path for the initial processed data after feature engineering
        self._engineered_data_checkpoint_path = './engineered_train_data_checkpoint.parquet'
        # Paths for scaled and feature-selected data, to be read by the generator
        self._scaled_X_path = './scaled_train_X.parquet'
        self._scaled_y_path = './scaled_train_y.parquet'
        self._scaled_val_X_path = './scaled_val_X.parquet'
        self._scaled_val_y_path = './scaled_val_y.parquet'


    def optimize_memory(self, df):
        """
        Optimize Pandas DataFrame memory usage and clean data.
        """
        print(f"Memory usage before optimization: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")

        # Clean data first
        df = self.clean_data(df)

        # Optimize numeric columns (Pandas directly)
        for col in df.select_dtypes(include=[np.number]).columns:
            if col == 'timestamp' or col == 'ID' or col == 'label':
                continue # Don't downcast timestamp, ID, or label
            df[col] = pd.to_numeric(df[col], downcast='float')

        print(f"Memory usage after optimization: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
        return df

    def clean_data(self, df):
        """
        Clean Pandas DataFrame by handling inf, -inf, and extreme values.
        """
        print("Cleaning data...")

        numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
        numeric_cols = [col for col in numeric_cols if col not in ['timestamp', 'ID', 'label']]

        # Replace inf and -inf with NaN first
        for col in numeric_cols:
            df[col] = df[col].replace([np.inf, -np.inf], np.nan)

        # Fill NaN values with forward fill, then backward fill, then 0
        for col in numeric_cols:
            df[col] = df[col].ffill().bfill().fillna(0)

        # Handle extreme outliers (values beyond 3*IQR)
        for col in df.select_dtypes(include=[np.float32, np.float64]).columns:
            if col in ['timestamp', 'ID', 'label']: continue
            if df[col].nunique() > 1:
                q25 = df[col].quantile(0.25)
                q75 = df[col].quantile(0.75)
                iqr = q75 - q25

                if iqr != 0 and not pd.isna(iqr):
                    lower_bound = q25 - 3 * iqr
                    upper_bound = q75 + 3 * iqr
                    df[col] = df[col].clip(lower_bound, upper_bound)
        print("Data cleaning applied.")
        return df

    def create_time_features(self, df):
        """
        Create time-based features with robust calculations using Pandas.
        Significantly reduced complexity for faster execution.
        """
        print("Creating time-based features...")

        # Basic market features
        df['mid_price'] = (df['bid_qty'] + df['ask_qty']) / 2
        df['spread'] = df['ask_qty'] - df['bid_qty']

        # Safe division for imbalance
        denominator = df['bid_qty'] + df['ask_qty'] + 1e-10
        df['imbalance'] = (df['bid_qty'] - df['ask_qty']) / denominator

        # Safe division for buy/sell ratio
        df['buy_sell_ratio'] = df['buy_qty'] / (df['sell_qty'] + 1e-10)

        # Rolling statistics - significantly reduced windows for speed
        windows = [10, 30]
        base_cols_for_rolling = ['volume', 'mid_price', 'buy_qty', 'sell_qty', 'imbalance']

        for col in base_cols_for_rolling:
            for window in windows:
                df[f'{col}_ma_{window}'] = df[col].rolling(window, min_periods=1).mean()
                df[f'{col}_std_{window}'] = df[col].rolling(window, min_periods=1).std().fillna(0)

        # Lagged features - significantly reduced lags for speed
        lags = [1, 5]
        base_cols_for_lag = ['mid_price', 'imbalance']

        for col in base_cols_for_lag:
            for lag in lags:
                df[f'{col}_lag_{lag}'] = df[col].shift(lag)

        # Technical indicators - reduced
        df['rsi_proxy'] = self.calculate_rsi_proxy(df['mid_price'], window=10)
        df['momentum'] = df['mid_price'] - df['mid_price'].shift(5)

        # Final check for any inf/nan values that might have been introduced
        numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
        numeric_cols = [col for col in numeric_cols if col not in ['timestamp', 'ID', 'label']]

        for col in numeric_cols:
            df[col] = df[col].replace([np.inf, -np.inf], np.nan)
            df[col] = df[col].ffill().bfill().fillna(0)

        print(f"Time-based features created. Current shape: {df.shape[0]} rows, {df.shape[1]} columns")
        return df

    def calculate_rsi_proxy(self, prices_series, window=14):
        """Calculate RSI-like indicator with safe operations for Pandas Series."""
        delta = prices_series.diff()
        gain = (delta.where(delta > 0, 0)).rolling(window=window, min_periods=1).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(window=window, min_periods=1).mean()

        rs = gain / (loss + 1e-10)
        rsi = 100 - (100 / (1 + rs))

        rsi = rsi.replace([np.inf, -np.inf], np.nan).fillna(50)

        return rsi

    def select_features(self, X_df, y_df, method='mutual_info'):
        """
        Feature selection to reduce dimensionality with robust handling.
        Operates directly on Pandas DataFrames.
        """
        print(f"Selecting top {self.top_features} features from {X_df.shape[1]} features...")

        print("Validating data before final feature selection...")

        # Check for any remaining inf/nan values
        inf_mask = np.isinf(X_df).any(axis=1)
        nan_mask = np.isnan(X_df).any(axis=1)
        invalid_mask = inf_mask | nan_mask

        if invalid_mask.sum() > 0:
            print(f"Removing {invalid_mask.sum()} rows with invalid values before final selection.")
            X_df = X_df[~invalid_mask]
            y_df = y_df[~invalid_mask]

        # Check for constant or near-constant features (can cause issues for some selectors)
        feature_std = X_df.std()
        constant_features_mask = feature_std < 1e-8

        if constant_features_mask.all():
            print("Warning: All features are constant. Cannot perform feature selection.")
            non_constant_features = X_df.columns.tolist()
        else:
            non_constant_features = X_df.columns[~constant_features_mask].tolist()
            if constant_features_mask.sum() > 0:
                print(f"Removing {constant_features_mask.sum()} constant features.")
            X_df = X_df[non_constant_features]

        print(f"Final data shape for feature selection: {X_df.shape}")

        n_features_to_select = min(self.top_features, X_df.shape[1])

        if method == 'mutual_info':
            selector = SelectKBest(score_func=mutual_info_regression, k=n_features_to_select)
        else:
            selector = SelectKBest(score_func=f_regression, k=n_features_to_select)

        X_selected = selector.fit_transform(X_df, y_df)
        self.feature_selector = selector
        self.selected_features = X_df.columns[selector.get_support()].tolist()

        print(f"\n--- Selected Features ({len(self.selected_features)}) ---")
        for feature in self.selected_features:
            print(f"- {feature}")
        print("---------------------------------------\n")

        # Return as NumPy array and Pandas Index to maintain alignment with y
        return X_selected.astype(np.float32), X_df.index

    def build_convlstm_model(self, input_shape):
        """Build ConvLSTM model for spatial-temporal patterns"""
        model = Sequential([
            # Reshape for ConvLSTM (samples, time_steps, rows, cols, channels)
            # input_shape is (sequence_length, num_features)
            # We need (sequence_length, 1, num_features, 1)
            Reshape((self.sequence_length, 1, input_shape[1], 1), input_shape=input_shape),

            ConvLSTM2D(filters=64, kernel_size=(1, 3), # Reverted filters
                       activation='tanh', recurrent_activation='sigmoid',
                       return_sequences=True, dropout=0.2, padding='same'),
            BatchNormalization(),

            ConvLSTM2D(filters=32, kernel_size=(1, 3), # Reverted filters
                       activation='tanh', recurrent_activation='sigmoid',
                       return_sequences=False, dropout=0.2, padding='same'),
            BatchNormalization(),

            Flatten(),
            Dense(50, activation='relu'), # Reverted dense units
            Dropout(0.3),
            Dense(1, activation='linear')
        ])

        model.compile(optimizer=Adam(learning_rate=0.001),
                      loss='mae', metrics=['mae'])
        return model

    def build_lstm_model(self, input_shape):
        """Build standard LSTM model"""
        model = Sequential([
            LSTM(100, return_sequences=True, input_shape=input_shape, dropout=0.2), # Reverted LSTM units
            BatchNormalization(),
            LSTM(50, return_sequences=False, dropout=0.2), # Reverted LSTM units
            BatchNormalization(),
            Dense(50, activation='relu'), # Reverted dense units
            Dropout(0.3),
            Dense(1, activation='linear')
        ])

        model.compile(optimizer=Adam(learning_rate=0.001),
                      loss='mae', metrics=['mae'])
        return model

    def build_cnn_lstm_model(self, input_shape):
        print("CNN-LSTM model building skipped for current speed optimization, but can be reinstated later.")
        return None

    def train_lightgbm(self, X_train, y_train, X_val, y_val):
        """Train LightGBM model"""
        print("Training LightGBM model...")

        # Ensure no NaNs are passed to LightGBM
        X_train_clean = np.nan_to_num(X_train.values, nan=0.0, posinf=1e6, neginf=-1e6)
        y_train_clean = np.nan_to_num(y_train, nan=0.0, posinf=1e6, neginf=-1e6)
        X_val_clean = np.nan_to_num(X_val.values, nan=0.0, posinf=1e6, neginf=-1e6)
        y_val_clean = np.nan_to_num(y_val, nan=0.0, posinf=1e6, neginf=-1e6)

        train_data = lgb.Dataset(X_train_clean, label=y_train_clean)
        val_data = lgb.Dataset(X_val_clean, label=y_val_clean, reference=train_data)

        params = {
            'objective': 'regression',
            'metric': 'mae',
            'boosting_type': 'gbdt',
            'num_leaves': 31,
            'learning_rate': 0.05,
            'feature_fraction': 0.9,
            'bagging_fraction': 0.8,
            'bagging_freq': 5,
            'verbose': -1,
            'random_state': 42,
            'n_estimators': 1000
        }

        model = lgb.train(
            params,
            train_data,
            valid_sets=[val_data],
            num_boost_round=params['n_estimators'],
            callbacks=[lgb.early_stopping(50), lgb.log_evaluation(0)]
        )

        return model

    def evaluate_model(self, y_true, y_pred, model_name):
        """Evaluate model performance"""
        # Ensure y_true and y_pred are clean before evaluation
        y_true_clean = np.nan_to_num(y_true, nan=0.0, posinf=1e6, neginf=-1e6)
        y_pred_clean = np.nan_to_num(y_pred, nan=0.0, posinf=1e6, neginf=-1e6)

        mae = mean_absolute_error(y_true_clean, y_pred_clean)

        # Pearson correlation requires at least 2 non-constant values for both arrays
        correlation = 0.0
        if len(np.unique(y_true_clean)) > 1 and len(np.unique(y_pred_clean)) > 1:
            try:
                correlation, _ = pearsonr(y_true_clean, y_pred_clean)
            except ValueError:
                # Can happen if inputs have 0 variance after cleaning
                correlation = 0.0

        print(f"{model_name} - MAE: {mae:.4f}, Pearson Correlation: {correlation:.4f}")
        return correlation

    def fit(self, train_data_raw_initial_load):
        """Main training pipeline with robust error handling"""
        print("Starting training pipeline...")

        # Determine raw X_n columns from the initial load structure
        X_n_cols_raw = [col for col in train_data_raw_initial_load.columns if col.startswith('X') and col != 'label']
        basic_features_cols = ['timestamp', 'bid_qty', 'ask_qty', 'buy_qty', 'sell_qty', 'volume']

        preselected_X_n_features = X_n_cols_raw[:min(self.top_X_features_to_preselect, len(X_n_cols_raw))]
        print(f"Initially selected {len(preselected_X_n_features)} X_n features for direct Pandas load.")

        columns_to_process_raw_for_fit = basic_features_cols + preselected_X_n_features + ['label']

        train_df = None
        if os.path.exists(self._engineered_data_checkpoint_path):
            print(f"Checkpoint found. Loading processed data from {self._engineered_data_checkpoint_path}...")
            train_df = pd.read_parquet(self._engineered_data_checkpoint_path)
            if 'timestamp' in train_df.columns:
                train_df['timestamp'] = pd.to_datetime(train_df['timestamp'])
        else:
            print(f"Selecting columns from initial raw training data...")
            train_df = train_data_raw_initial_load[columns_to_process_raw_for_fit].copy()
            if 'timestamp' not in train_df.columns and train_df.index.name == 'timestamp':
                train_df = train_df.reset_index(names=['timestamp'])
            train_df['timestamp'] = pd.to_datetime(train_df['timestamp'])
            gc.collect()
            train_df = self.optimize_memory(train_df)
            train_df = self.create_time_features(train_df)
            print(f"Saving engineered data to checkpoint: {self._engineered_data_checkpoint_path}...")
            train_df.to_parquet(self._engineered_data_checkpoint_path, index=False)
            print("Engineered data checkpoint saved.")

        print(f"Data shape after feature engineering: {train_df.shape[0]} rows, {train_df.shape[1]} columns")
        feature_cols_final = [col for col in train_df.columns
                              if col not in ['timestamp', 'label']]
        X_df = train_df[feature_cols_final]
        y_df = train_df['label']
        print(f"Features shape before final selection: {X_df.shape[0]} rows, {X_df.shape[1]} columns")
        print(f"Target shape: {y_df.shape[0]} rows")

        X_selected, valid_idx = self.select_features(X_df, y_df)
        y_for_training = y_df.loc[valid_idx].astype(np.float32)

        del X_df, y_df
        gc.collect()

        X_scaled = self.scaler.fit_transform(X_selected)
        print("Final data validation (after scaling)...")
        if np.any(np.isnan(X_scaled)) or np.any(np.isinf(X_scaled)):
            print("ERROR: Still have invalid values after preprocessing! Applying emergency cleanup.")
            X_scaled = np.nan_to_num(X_scaled, nan=0.0, posinf=1e6, neginf=-1e6)
        print(f"Final training data shape: {X_scaled.shape}")

        del X_selected
        gc.collect()

        # Split data into training and validation sets
        # IMPORTANT: Use the index from the `train_df` (which is already processed and filtered by `select_features`' `valid_idx`)
        # to ensure alignment with X_scaled and y_for_training.
        # train_data_raw_initial_load is too large and might have different index after cleanup if it was not reset.

        # Determine actual indices for train/val split based on timestamp column of engineered data
        # We need to re-create the full dataframe from X_scaled and y_for_training with its original indices
        full_processed_data_for_split = pd.DataFrame(X_scaled, index=valid_idx, columns=self.selected_features)
        full_processed_data_for_split['label'] = y_for_training

        # Merge with timestamps from the engineered checkpoint data to retain alignment
        # For simplicity, load the 'timestamp' column from the *engineered* checkpoint
        # to align with its filtered/cleaned data indices.
        # This assumes _engineered_data_checkpoint_path parquet file includes the index, or is sequential.
        # If train_df was saved with index=False, this becomes problematic.
        # Let's ensure _engineered_data_checkpoint_path always saves index for join.

        # If train_df was saved without index, we need to load it fully just for timestamp and then merge.
        # However, the previous approach of using original_train_indices based on `train_data_raw_initial_load`
        # and then intersecting with `valid_idx` is robust for ensuring alignment.
        # Let's stick with that for now, assuming `train_data_raw_initial_load`'s timestamp indices are sequential.

        temp_original_timestamps_df = train_data_raw_initial_load[['timestamp']].copy()
        # Ensure timestamp is datetime and index is sequential (if it was from original load)
        if 'timestamp' not in temp_original_timestamps_df.columns and temp_original_timestamps_df.index.name == 'timestamp':
            temp_original_timestamps_df = temp_original_timestamps_df.reset_index(names=['timestamp'])
        temp_original_timestamps_df['timestamp'] = pd.to_datetime(temp_original_timestamps_df['timestamp'])

        VALIDATION_SPLIT_DATE = '2024-01-01'
        original_train_indices_based_on_timestamp = temp_original_timestamps_df[temp_original_timestamps_df['timestamp'] < VALIDATION_SPLIT_DATE].index
        original_val_indices_based_on_timestamp = temp_original_timestamps_df[temp_original_timestamps_df['timestamp'] >= VALIDATION_SPLIT_DATE].index

        del temp_original_timestamps_df # Free up memory
        gc.collect()

        # Intersect indices to ensure correct slicing after any data cleaning/filtering in select_features
        # These are the *actual row indices* from the original full dataset for train/val splits
        actual_train_indices = original_train_indices_based_on_timestamp.intersection(full_processed_data_for_split.index)
        actual_val_indices = original_val_indices_based_on_timestamp.intersection(full_processed_data_for_split.index)

        # Separate X and Y for train and validation using .loc to maintain alignment
        X_train_df = full_processed_data_for_split.loc[actual_train_indices, self.selected_features]
        y_train_series = full_processed_data_for_split.loc[actual_train_indices, 'label']
        X_val_df = full_processed_data_for_split.loc[actual_val_indices, self.selected_features]
        y_val_series = full_processed_data_for_split.loc[actual_val_indices, 'label']

        # Save these as Parquet files for the generator to read. Important to save index=True.
        X_train_df.to_parquet(self._scaled_X_path, index=True)
        y_train_series.to_frame(name='label').to_parquet(self._scaled_y_path, index=True) # Save as DataFrame with 'label' column
        X_val_df.to_parquet(self._scaled_val_X_path, index=True)
        y_val_series.to_frame(name='label').to_parquet(self._scaled_val_y_path, index=True) # Save as DataFrame with 'label' column

        print("Scaled training and validation data saved to separate Parquet files with original indices.")

        # Delete large in-memory objects after saving them to disk
        del full_processed_data_for_split, X_train_df, y_train_series, X_val_df, y_val_series
        gc.collect()

        # Create Data Generators
        train_generator = TimeSeriesSequence(
            X_filepath=self._scaled_X_path,
            y_filepath=self._scaled_y_path,
            sequence_length=self.sequence_length,
            batch_size=32,
            feature_cols=self.selected_features,
            total_samples=len(actual_train_indices), # Total samples for this generator's data
            original_indices=actual_train_indices # Pass the actual Pandas Index
        )
        val_generator = TimeSeriesSequence(
            X_filepath=self._scaled_val_X_path,
            y_filepath=self._scaled_val_y_path,
            sequence_length=self.sequence_length,
            batch_size=32,
            feature_cols=self.selected_features,
            total_samples=len(actual_val_indices), # Total samples for this generator's data
            original_indices=actual_val_indices # Pass the actual Pandas Index
        )
        print("Keras Data Generators created.")

        try:
            # For LightGBM, we still need to load the full train/val sets into memory
            # as it does not inherently support Keras-style generators.
            # These are now loaded from the *scaled* parquet files.
            lgb_X_train_mem = pd.read_parquet(self._scaled_X_path)
            lgb_y_train_mem = pd.read_parquet(self._scaled_y_path)['label'].values
            lgb_X_val_mem = pd.read_parquet(self._scaled_val_X_path)
            lgb_y_val_mem = pd.read_parquet(self._scaled_val_y_path)['label'].values

            lgb_model = self.train_lightgbm(
                lgb_X_train_mem, lgb_y_train_mem, lgb_X_val_mem, lgb_y_val_mem
            )
            lgb_pred = lgb_model.predict(lgb_X_val_mem) # Predict on the already loaded val data
            lgb_score = self.evaluate_model(lgb_y_val_mem, lgb_pred, "LightGBM")
            self.models['lightgbm'] = lgb_model

            del lgb_X_train_mem, lgb_y_train_mem, lgb_X_val_mem, lgb_y_val_mem, lgb_pred # Clear LGBM-specific data
            gc.collect()

        except Exception as e:
            print(f"LightGBM training or prediction failed: {e}")
            lgb_score = 0

        # The y_val for evaluation of DL models needs to be derived from the actual labels
        # that the generator will produce. For simplicity, we can load the full y_val from its checkpoint
        # and then slice it by sequence_length for evaluation.
        full_y_val_for_dl_eval = pd.read_parquet(self._scaled_val_y_path)['label'].values
        # Ensure we only evaluate on the part where sequences can be formed (matching generator output)
        y_val_dl_eval_aligned = full_y_val_for_dl_eval[self.sequence_length - 1:]

        print(f"Full Y_val for DL evaluation shape: {y_val_dl_eval_aligned.shape}")


        try:
            # Check if generators are not empty before training DL models
            if len(train_generator) > 0:
                print(f"Proceeding with Deep Learning models. Training generator has {len(train_generator)} batches.")
                callbacks = [
                    EarlyStopping(patience=10, restore_best_weights=True, monitor='val_mae'),
                    ReduceLROnPlateau(patience=5, factor=0.5, min_lr=1e-6, monitor='val_mae')
                ]
                convlstm_score = 0
                try:
                    print("Training ConvLSTM model...")
                    # Get input shape from the generator's first item to ensure correct model build
                    dummy_X, _ = train_generator.__getitem__(0)
                    input_shape_convlstm = dummy_X.shape[1:] # (sequence_length, num_features)
                    del dummy_X
                    gc.collect()

                    convlstm_model = self.build_convlstm_model(input_shape_convlstm)
                    if convlstm_model:
                        convlstm_model.fit(
                            train_generator,
                            validation_data=val_generator,
                            epochs=5, # Initial conservative
                            callbacks=callbacks, verbose=1,
                            # workers=os.cpu_count(), use_multiprocessing=True # Enable for faster I/O if needed
                        )
                        convlstm_pred = convlstm_model.predict(val_generator).flatten()
                        convlstm_score = self.evaluate_model(y_val_dl_eval_aligned, convlstm_pred, "ConvLSTM")
                        self.models['convlstm'] = convlstm_model
                except Exception as e:
                    print(f"ConvLSTM training or prediction failed: {e}")
                    convlstm_score = 0
                lstm_score = 0
                try:
                    print("Training LSTM model...")
                    dummy_X, _ = train_generator.__getitem__(0)
                    input_shape_lstm = dummy_X.shape[1:]
                    del dummy_X
                    gc.collect()

                    lstm_model = self.build_lstm_model(input_shape_lstm)
                    if lstm_model:
                        lstm_model.fit(
                            train_generator,
                            validation_data=val_generator,
                            epochs=5, # Initial conservative
                            callbacks=callbacks, verbose=1,
                            # workers=os.cpu_count(), use_multiprocessing=True
                        )
                        lstm_pred = lstm_model.predict(val_generator).flatten()
                        lstm_score = self.evaluate_model(y_val_dl_eval_aligned, lstm_pred, "LSTM")
                        self.models['lstm'] = lstm_model
                except Exception as e:
                    print(f"LSTM training or prediction failed: {e}")
                    lstm_score = 0

                if len(self.models) > 1:
                    # Align LGBM predictions for the ensemble.
                    # LGBM was trained on X_val, y_val. DL models were trained on sequences from X_val, y_val.
                    # The DL predictions start from `sequence_length - 1` onwards in the original validation set.
                    # lgb_pred is the full LGBM prediction on X_val (not sequence-aligned).
                    # We need to ensure lgb_pred is defined before trying to slice it.
                    if 'lgb_pred' in locals():
                        lgb_pred_aligned = lgb_pred[self.sequence_length - 1:]
                    else:
                        print("LightGBM prediction not available for ensemble alignment.")
                        lgb_pred_aligned = np.array([]) # Empty array if LGBM failed

                    if 'convlstm' in self.models and convlstm_score > 0:
                        dl_predictions_list.append(convlstm_pred)
                        dl_weights.append(0.35)
                    if 'lstm' in self.models and lstm_score > 0:
                        dl_predictions_list.append(lstm_pred)
                        dl_weights.append(0.25)

                    if dl_predictions_list and lgb_pred_aligned.size > 0 and len(dl_predictions_list[0]) == len(lgb_pred_aligned):
                        dl_ensemble_weighted = np.average(dl_predictions_list, axis=0, weights=dl_weights)
                        ensemble_pred = (lgb_pred_aligned * 0.4) + (dl_ensemble_weighted * 0.6)
                        ensemble_score = self.evaluate_model(y_val_dl_eval_aligned, ensemble_pred, "Ensemble")
                        print(f"\nEnsemble score: {ensemble_score:.4f}")
                    else:
                        print("Not enough successful models or prediction lengths mismatch for ensemble.")
                        ensemble_score = lgb_score
                else:
                    print("Only LightGBM model was successfully trained or no DL models could be built. Skipping ensemble of DL models.")
                    ensemble_score = lgb_score
                print(f"\nBest individual model score: {max(lgb_score, convlstm_score, lstm_score) if 'convlstm' in self.models else lgb_score:.4f}")
                print(f"Final overall ensemble score: {ensemble_score:.4f}")
            else:
                print("Skipping deep learning models as training generator is empty (e.g., due to insufficient data for sequence_length).")
                ensemble_score = lgb_score
        except Exception as e:
            print(f"Deep learning training failed during overall process (likely model build or generator issue): {e}")
            ensemble_score = lgb_score

        # Clean up temporary scaled data files
        for path in [self._scaled_X_path, self._scaled_y_path, self._scaled_val_X_path, self._scaled_val_y_path]:
            if os.path.exists(path):
                os.remove(path)
                print(f"Cleaned up temporary file: {path}")
        gc.collect()
        return self

    def predict(self, test_data_raw_initial_load):
        """Generate predictions for test data with robust error handling"""
        print("Generating predictions...")

        temp_df_for_id_map = test_data_raw_initial_load.copy()
        if 'ID' not in temp_df_for_id_map.columns and temp_df_for_id_map.index.name == 'ID':
            temp_df_for_id_map = temp_df_for_id_map.reset_index(names=['ID'])
        elif 'ID' not in temp_df_for_id_map.columns and temp_df_for_id_map.index is not None and temp_df_for_id_map.index.name is None and len(temp_df_for_id_map.index) == len(temp_df_for_id_map):
            temp_df_for_id_map = temp_df_for_id_map.reset_index()
            temp_df_for_id_map.rename(columns={'index': 'ID'}, inplace=True)

        id_to_original_index_map = pd.Series(temp_df_for_id_map.index.values, index=temp_df_for_id_map['ID'])
        original_test_ids = temp_df_for_id_map['ID'].copy()

        del temp_df_for_id_map
        gc.collect()

        basic_features_cols_test = ['timestamp', 'bid_qty', 'ask_qty', 'buy_qty', 'sell_qty', 'volume', 'ID']
        X_n_cols_raw_test = [col for col in test_data_raw_initial_load.columns if col.startswith('X') and col != 'label']
        preselected_X_n_features_test = X_n_cols_raw_test[:min(self.top_X_features_to_preselect, len(X_n_cols_raw_test))]

        columns_to_process_raw_for_predict = basic_features_cols_test + preselected_X_n_features_test

        missing_columns = [col for col in columns_to_process_raw_for_predict if col not in test_data_raw_initial_load.columns]
        if missing_columns:
            raise KeyError(f"The following required columns are missing from the test data: {missing_columns}. Please ensure your test.parquet file contains these columns.")

        print(f"Selecting columns from initial raw test data (only {len(columns_to_process_raw_for_predict)} columns)...")
        test_df = test_data_raw_initial_load[columns_to_process_raw_for_predict].copy()

        del test_data_raw_initial_load
        gc.collect()

        if 'ID' not in test_df.columns and test_df.index.name == 'ID':
            test_df = test_df.reset_index()

        if 'timestamp' in test_df.columns:
            test_df['timestamp'] = pd.to_datetime(test_df['timestamp'])

        test_df = self.optimize_memory(test_df)
        test_df = self.create_time_features(test_df)

        print(f"Test data shape after feature engineering: {test_df.shape[0]} rows, {test_df.shape[1]} columns")

        if self.selected_features is None:
            raise ValueError("Model must be fitted before making predictions (selected_features is None)")

        X_test_df_final = test_df[self.selected_features]

        X_test_df_final = X_test_df_final.replace([np.inf, -np.inf], np.nan)
        X_test_df_final = X_test_df_final.ffill().bfill().fillna(0)

        X_test_scaled = self.scaler.transform(X_test_df_final)

        if np.any(np.isnan(X_test_scaled)) or np.any(np.isinf(X_test_scaled)):
            print("WARNING: Invalid values in test data after scaling, applying emergency cleanup.")
            X_test_scaled = np.nan_to_num(X_test_scaled, nan=0.0, posinf=1e6, neginf=-1e6)

        X_test_scaled_df = pd.DataFrame(X_test_scaled, index=X_test_df_final.index, columns=self.selected_features)

        del X_test_df_final, test_df
        gc.collect()

        predictions = np.zeros(len(original_test_ids), dtype=np.float32)
        indexed_predictions_by_internal_idx = {}

        if 'lightgbm' in self.models:
            try:
                lgb_pred_full = self.models['lightgbm'].predict(X_test_scaled_df)
                for i, idx in enumerate(X_test_scaled_df.index):
                    indexed_predictions_by_internal_idx[idx] = lgb_pred_full[i]
                print("LightGBM predictions generated.")
            except Exception as e:
                print(f"LightGBM prediction failed: {e}")

        if ('convlstm' in self.models and self.models['convlstm'] is not None) or \
           ('lstm' in self.models and self.models['lstm'] is not None):
            try:
                # For prediction, if test data is too large, you might need a generator here too.
                # For now, we'll assume X_test_scaled can fit in memory for sequence prep for prediction.
                # If this is still an OOM, we'd create a TimeSeriesSequence for prediction as well.
                # We need a temporary full data array for prepare_sequences here.
                X_test_seq_for_predict = self._prepare_sequences_for_inference(X_test_scaled)

                if X_test_seq_for_predict is not None and X_test_seq_for_predict.size > 0:
                    dl_predictions_list_test = []
                    dl_weights_test = []

                    if 'convlstm' in self.models and self.models['convlstm'] is not None:
                        try:
                            convlstm_pred = self.models['convlstm'].predict(X_test_seq_for_predict).flatten()
                            dl_predictions_list_test.append(convlstm_pred)
                            dl_weights_test.append(0.6)
                        except Exception as e:
                            print(f"ConvLSTM prediction during test failed: {e}")

                    if 'lstm' in self.models and self.models['lstm'] is not None:
                        try:
                            lstm_pred = self.models['lstm'].predict(X_test_seq_for_predict).flatten()
                            dl_predictions_list_test.append(lstm_pred)
                            dl_weights_test.append(0.4)
                        except Exception as e:
                            print(f"LSTM prediction during test failed: {e}")

                    if dl_predictions_list_test:
                        dl_ensemble_weighted = np.average(dl_predictions_list_test, axis=0, weights=dl_weights_test)

                        # Indices for DL predictions in the test set
                        # These are derived from the X_test_scaled_df's index, offset by sequence_length-1
                        sequence_covered_indices_in_X_scaled_df = X_test_scaled_df.index[self.sequence_length - 1:].tolist()


                        for i, internal_idx in enumerate(sequence_covered_indices_in_X_scaled_df):
                            lgbm_part = indexed_predictions_by_internal_idx.get(internal_idx, 0.0)
                            indexed_predictions_by_internal_idx[internal_idx] = (lgbm_part * 0.4) + (dl_ensemble_weighted[i] * 0.6)

                        print("Deep learning models included in predictions.")
                    else:
                        print("No successful deep learning models to include in test predictions.")
                else:
                    print("Test data too short for deep learning sequences. Using LightGBM only.")
            except Exception as e:
                print(f"Deep learning prediction setup failed: {e}")
                print("Falling back to LightGBM predictions only for test set.")

        del X_test_scaled_df # Clear prediction-specific data
        if 'X_test_seq_for_predict' in locals() and X_test_seq_for_predict is not None:
            del X_test_seq_for_predict
        gc.collect()

        for i, current_id in enumerate(original_test_ids):
            original_idx_in_raw_df = id_to_original_index_map.get(current_id)

            if original_idx_in_raw_df is not None and original_idx_in_raw_df in indexed_predictions_by_internal_idx:
                predictions[i] = indexed_predictions_by_internal_idx[original_idx_in_raw_df]
            else:
                predictions[i] = 0.0

        if np.any(np.isnan(predictions)) or np.any(np.isinf(predictions)):
            print("WARNING: Invalid predictions detected (NaN/Inf), cleaning...")
            predictions = np.nan_to_num(predictions, nan=0.0, posinf=1.0, neginf=-1.0)

        print(f"Generated {len(predictions)} predictions")
        print(f"Prediction range: [{predictions.min():.4f}, {predictions.max():.4f}]")

        return predictions

    def _prepare_sequences_for_inference(self, data):
        """Helper to prepare sequences for inference, similar to original prepare_sequences."""
        sequences = []
        data_np = np.asarray(data)

        if len(data_np) < self.sequence_length:
            print(f"Warning: Not enough data ({len(data_np)} rows) to create sequences of length {self.sequence_length} for inference.")
            if data_np.shape and len(data_np.shape) > 1:
                return np.array([]).astype(np.float32).reshape(0, self.sequence_length, data_np.shape[-1])
            else: # Handle cases where data_np might be 1D or empty, resulting in 0 features
                return np.array([]).astype(np.float32).reshape(0, self.sequence_length, 0)


        # The loop range for inference is different because there are no explicit 'targets'
        # We want to create all possible sequences.
        # A sequence for `data_np[i]` means data from `data_np[i - sequence_length + 1 : i + 1]`.
        # So, the first sequence ends at `sequence_length - 1`.
        for i in range(self.sequence_length - 1, len(data_np)):
            sequences.append(data_np[i - (self.sequence_length - 1) : i + 1])

        return np.array(sequences).astype(np.float32)


# Main execution function
def run_competition_pipeline():
    """Run the complete competition pipeline"""

    print("Loading data...")
    train_full_raw = pd.read_parquet('/kaggle/input/drw-crypto-market-prediction/train.parquet')

    if 'timestamp' not in train_full_raw.columns:
        if train_full_raw.index.name == 'timestamp':
            train_full_raw = train_full_raw.reset_index(names=['timestamp'])
            print(f"DEBUG: Resetting index 'timestamp' for train_full_raw. New columns: {train_full_raw.columns.tolist()}")
        else:
            train_full_raw = train_full_raw.reset_index()
            if 'index' in train_full_raw.columns and 'timestamp' not in train_full_raw.columns:
                 train_full_raw.rename(columns={'index': 'timestamp'}, inplace=True)
                 print(f"DEBUG: Renamed 'index' to 'timestamp' for train_full_raw. New columns: {train_full_raw.columns.tolist()}")

    if 'timestamp' in train_full_raw.columns:
        train_full_raw['timestamp'] = pd.to_datetime(train_full_raw['timestamp'])
        print(f"DEBUG: 'timestamp' column found and converted to datetime in train_full_raw.")
    else:
        print("CRITICAL WARNING: 'timestamp' column still not found in train_full_raw after all attempts. This will likely cause issues.")
        print(f"DEBUG: train_full_raw columns are: {train_full_raw.columns.tolist()}")

    test_full_raw = pd.read_parquet('/kaggle/input/drw-crypto-market-prediction/test.parquet')

    if 'ID' not in test_full_raw.columns:
        if test_full_raw.index.name == 'ID':
            test_full_raw = test_full_raw.reset_index(names=['ID'])
            print(f"DEBUG: Resetting index 'ID' for test_full_raw. New columns: {test_full_raw.columns.tolist()}")
        else:
            test_full_raw = test_full_raw.reset_index()
            if 'index' in test_full_raw.columns and 'ID' not in test_full_raw.columns:
                test_full_raw.rename(columns={'index': 'ID'}, inplace=True)
                print(f"DEBUG: Renamed 'index' to 'ID' for test_full_raw. New columns: {test_full_raw.columns.tolist()}")

    if 'timestamp' not in test_full_raw.columns:
        if test_full_raw.index.name == 'timestamp':
            test_full_raw = test_full_raw.reset_index(names=['timestamp'])
            print(f"DEBUG: Resetting index 'timestamp' for test_full_raw. New columns: {test_full_raw.columns.tolist()}")
        else:
            test_full_raw = test_full_raw.reset_index()
            if 'index' in test_full_raw.columns and 'timestamp' not in test_full_raw.columns:
                test_full_raw.rename(columns={'index': 'timestamp'}, inplace=True)
                print(f"DEBUG: Renamed 'index' to 'timestamp' for test_full_raw. New columns: {test_full_raw.columns.tolist()}")

    if 'timestamp' in test_full_raw.columns:
        test_full_raw['timestamp'] = pd.to_datetime(test_full_raw['timestamp'])
        print(f"DEBUG: 'timestamp' column found and converted to datetime in test_full_raw.")
    else:
        print("CRITICAL WARNING: 'timestamp' column still not found in test_full_raw after all attempts. This will likely cause issues.")
        print(f"DEBUG: test_full_raw columns are: {test_full_raw.columns.tolist()}")

    print(f"\nTrain shape: {train_full_raw.shape}")
    print(f"Test shape: {test_full_raw.shape}")

    # Initialize and train model
    predictor = CryptoMarketPredictor(
        sequence_length=30,
        top_features=100,
        top_X_features_to_preselect=30
    )
    predictor.fit(train_full_raw)

    predictions = predictor.predict(test_full_raw)

    # Create submission
    submission = pd.DataFrame({
        'ID': test_full_raw['ID'],
        'Prediction': predictions
    })

    # Save submission
    submission.to_csv('/kaggle/working/submission.csv', index=False)
    print(f"Submission saved with {len(submission)} predictions")
    print(f"Prediction statistics - Mean: {predictions.mean():.4f}, Std: {predictions.std():.4f}")

    return submission

# Run the pipeline
if __name__ == "__main__":
    submission = run_competition_pipeline()
