In [1]:
import polars as pl
from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error
from xgboost import XGBRegressor
import gc
import time
import numpy as np
import joblib


In [2]:
# Step 1: Read Data with Lags
def read_data_with_lags(current_file_path, previous_file_path, columns_needed, lag_column="responder_6", lag_name="responder_6_lag_1", length = 5000):
    """Read the current file and add lagged data from the previous file."""
    # Read current data
    current_file = pl.scan_parquet(current_file_path).select(columns_needed).head(length).collect()
    
    # If there is a previous file, add lagged data
    if previous_file_path:
        previous_file = pl.scan_parquet(previous_file_path).select(["time_id", lag_column]).head(length).collect()
        lagged_data = previous_file.rename({lag_column: lag_name})
        current_file = current_file.join(lagged_data, on="time_id", how="left")
    
    return current_file

# Step 2: Clean Data
def clean_data(df):
    """Clean the data by handling missing values."""
    if isinstance(df, pl.DataFrame):
        df = df.fill_null(strategy="backward").fill_null(strategy="forward").fill_null(0)
    else:
        raise TypeError("The input is not a Polars DataFrame.")
    return df

# Step 3: Preprocess Data
def preprocess_data(df, target_column='responder_6', n_components=0.95):
    """Apply PCA to reduce dimensions."""
    # Drop the target column and convert to numpy
    X = df.drop(target_column).to_numpy()
    y = df[target_column].to_numpy()

    # Apply PCA
    pca = PCA(n_components=n_components)
    X_pca = pca.fit_transform(X)
    return X_pca, y

# Step 4: Anonymize
def anonymize(file): 
    return file

In [3]:
from xgboost import XGBRegressor

def select_features_by_importance(df, target_column='responder_6', threshold=0.01):
 
    X = df.drop(target_column).to_pandas()
    y = df[target_column].to_pandas()
    
    # Train a temporary XGBoost model to get feature importance
    model = XGBRegressor(objective='reg:squarederror', n_estimators=50, random_state=42)
    model.fit(X, y)
    
    # Get feature importance
    feature_importances = model.feature_importances_
    important_features = X.columns[feature_importances >= threshold]
    
    # Filter the original DataFrame for these features
    filtered_df = df.select(list(important_features) + [target_column])
    return filtered_df


Model Functions

In [4]:
# Step 5: Train Model
from sklearn.model_selection import cross_val_score


def train_XGB(X, y):
    """Train the model using XGBoost."""
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    model = XGBRegressor(objective='reg:squarederror', n_estimators=100, random_state=42)
    cv_scores = cross_val_score(model, X, y, cv=5, scoring='neg_mean_absolute_error')
    mean_mae = np.mean(cv_scores)
    print(f"Cross-Validated Mean Absolute Error (MAE): {mean_mae}")
    
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    mae = mean_absolute_error(y_test, y_pred)
    print(f"Mean Absolute Error (MAE): {mae}")
    return model

In [5]:
from sklearn.svm import SVR
from sklearn.linear_model import SGDRegressor
from sklearn.preprocessing import StandardScaler

def train_SVM(X, y, model_type='SVR'):
    """Train the model using Support Vector Machine (SVM) """
    
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)
    
    X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2, random_state=42)

    if model_type == 'SVR':
        model = SVR(kernel='rbf', C=1.0, gamma="scale")
    else:
        model = SGDRegressor(max_iter=1000, tol=1e-3, penalty='l2', alpha=0.0001, random_state=42)
    
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    mae = mean_absolute_error(y_test, y_pred)
    print(f"Mean Absolute Error SVM (MAE): {mae}")
    return model

In [6]:
def pipeline(current_file_path, previous_file_path, columns_needed ,model, counter):

    start_time = time.time()
    df = read_data_with_lags(current_file_path, previous_file_path, columns_needed)
    df = clean_data(df)
    df = anonymize(df)
    df = select_features_by_importance(df, target_column='responder_6', threshold=0.001)

    X, y = preprocess_data(df, target_column='responder_6', n_components=0.95)
    
    if model =='XGB':
        model = train_XGB(X, y)
    elif model == 'SVM': 
        model = train_SVM(X,y, 'SGD')
    else: 
        raise NameError('Model name not found')

    # Clear memory
    del df, X, y
    gc.collect()  # Force garbage collection
    
    end_time = time.time()
    print(f"Finished processing file {counter+1}/{len(file_paths)} in {end_time - start_time:.2f} seconds\n")
    
    # Update previous file path
    previous_file_path = current_file_path

    return previous_file_path, model


In [7]:
columns_needed = [
    'date_id', 'time_id', 'symbol_id', 'weight', 'feature_00', 'feature_01',
    'feature_02', 'feature_03', 'feature_04', 'feature_05', 'feature_06',
    'feature_07', 'feature_08', 'feature_09', 'feature_10', 'feature_11',
    'feature_12', 'feature_13', 'feature_14', 'feature_15', 'feature_16',
    'feature_17', 'feature_18', 'feature_19', 'feature_20', 'feature_21',
    'feature_22', 'feature_23', 'feature_24', 'feature_25', 'feature_26',
    'feature_27', 'feature_28', 'feature_29', 'feature_30', 'feature_31',
    'feature_32', 'feature_33', 'feature_34', 'feature_35', 'feature_36',
    'feature_37', 'feature_38', 'feature_39', 'feature_40', 'feature_41',
    'feature_42', 'feature_43', 'feature_44', 'feature_45', 'feature_46',
    'feature_47', 'feature_48', 'feature_49', 'feature_50', 'feature_51',
    'feature_52', 'feature_53', 'feature_54', 'feature_55', 'feature_56',
    'feature_57', 'feature_58', 'feature_59', 'feature_60', 'feature_61',
    'feature_62', 'feature_63', 'feature_64', 'feature_65', 'feature_66',
    'feature_67', 'feature_68', 'feature_69', 'feature_70', 'feature_71',
    'feature_72', 'feature_73', 'feature_74', 'feature_75', 'feature_76',
    'feature_77', 'feature_78', 'responder_6'
]

Call Functions 

In [None]:

# Jupyter Notebook - File Paths 
file_paths = [
    f"/Users/nipace/Downloads/datas/train.parquet/partition_id={i}/part-0.parquet" for i in range(10)]

# Train XGB with PCA via pipeline without storing all in memory
previous_file_path = None
for i, current_file_path in enumerate(file_paths):
    previous_file_path, model = pipeline(current_file_path, previous_file_path, columns_needed, 'XGB', i)

# Train SVM with PCA via pipeline without storing all in memory
previous_file_path = None
for i, current_file_path in enumerate(file_paths):
    previous_file_path, model = pipeline(current_file_path, previous_file_path, columns_needed, 'SVM', i)

FileNotFoundError: No such file or directory (os error 2): /Users/nipace/Downloads/datas/train.parquet/partition_id=0/part-0.parquet

This error occurred with the following context stack:
	[1] 'parquet scan'
	[2] 'select'
	[3] 'slice'


In [None]:
# git add .
# git commit -m "commit message"
# git push origin main