In [None]:
import os
import glob
import numpy as np
import polars as pl
import torch
import torch.nn as nn
import torch.optim as optim
import pickle
import random
from torch.utils.data import DataLoader, Dataset
from sklearn.preprocessing import StandardScaler
from torch import amp
from torch.optim.lr_scheduler import ReduceLROnPlateau
from sklearn.metrics import r2_score
import logging

# Setup Logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)

# Reproducibility Setup
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)
        torch.backends.cudnn.benchmark = True

set_seed(42)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
logger.info(f"Using device: {device}")

# Constants from best Optuna trial
HIDDEN_DIM = 512
DROPOUT_RATE = 0.2537
NOISE_STD = 0.0017
LEARNING_RATE = 4.16276e-05
BATCH_SIZE = 512
EPOCHS = 75
N_FOLDS = 5
GAP_SIZE = 30
VAL_SIZE = 100
TRAIN_SIZE = 300

# Paths
PARTITIONED_DIR = "/home/jupyter/data/partitioned/"
all_part_files = sorted(glob.glob(os.path.join(PARTITIONED_DIR, "*.parquet")))
logger.info(f"Found {len(all_part_files)} partitioned files.")

# Exclude columns
EXCLUDE_COLS = ["date_id", "time_id", "symbol_id", "weight", "partition_id"]
df_tmp = pl.read_parquet(all_part_files[0])
EXCLUDE_COLS += [col for col in df_tmp.columns if col.startswith("responder_")]
del df_tmp

class EarlyStopping:
    def __init__(self, patience=5, min_delta=0):
        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.best_loss = None
        self.early_stop = False
        
    def __call__(self, val_loss):
        if self.best_loss is None:
            self.best_loss = val_loss
        elif val_loss < self.best_loss - self.min_delta:
            self.best_loss = val_loss
            self.counter = 0
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True

class FinancialDataset(Dataset):
    def __init__(self, features, target, weights):
        self.features = torch.tensor(features, dtype=torch.float32)
        self.target = torch.tensor(target, dtype=torch.float32)
        self.weights = torch.tensor(weights, dtype=torch.float32)

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        return self.features[idx], self.target[idx], self.weights[idx]

class GaussianNoise(nn.Module):
    def __init__(self, std=0.05):
        super().__init__()
        self.std = std

    def forward(self, x):
        if self.training and self.std > 0:
            return x + torch.randn_like(x) * self.std
        return x

class AutoencoderWithMLP(nn.Module):
    def __init__(self, input_dim, hidden_dim=512, dropout_rate=0.3, noise_std=0.05):
        super().__init__()
        self.noise = GaussianNoise(std=noise_std)

        # Encoder (more complex like in Code B)
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.SiLU(),
            nn.BatchNorm1d(hidden_dim),
            nn.Dropout(dropout_rate),
            
            nn.Linear(hidden_dim, hidden_dim),
            nn.SiLU(),
            nn.BatchNorm1d(hidden_dim),
            nn.Dropout(dropout_rate),
            
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.SiLU(),
            nn.BatchNorm1d(hidden_dim // 2),
            nn.Dropout(dropout_rate)
        )
        
        self.bottleneck = nn.Linear(hidden_dim // 2, hidden_dim // 4)

        # Decoder (enhanced)
        self.decoder = nn.Sequential(
            nn.Linear(hidden_dim // 4, hidden_dim // 2),
            nn.SiLU(),
            nn.BatchNorm1d(hidden_dim // 2),
            nn.Linear(hidden_dim // 2, hidden_dim),
            nn.SiLU(),
            nn.BatchNorm1d(hidden_dim),
            nn.Linear(hidden_dim, input_dim)
        )

        # MLP (more complex like in Code B)
        self.mlp = nn.Sequential(
            nn.Linear(input_dim + hidden_dim // 4, hidden_dim),
            nn.SiLU(),
            nn.BatchNorm1d(hidden_dim),
            nn.Dropout(dropout_rate),
            
            nn.Linear(hidden_dim, hidden_dim),
            nn.SiLU(),
            nn.BatchNorm1d(hidden_dim),
            nn.Dropout(dropout_rate),
            
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.SiLU(),
            nn.BatchNorm1d(hidden_dim // 2),
            nn.Dropout(dropout_rate),
            
            nn.Linear(hidden_dim // 2, 1)
        )

    def forward(self, x):
        x_noised = self.noise(x)
        encoded = self.encoder(x_noised)
        latent = self.bottleneck(encoded)
        reconstructed = self.decoder(latent)
        mlp_input = torch.cat([x, latent], dim=1)
        mlp_output = self.mlp(mlp_input)
        return reconstructed, mlp_output, latent

def initialize_weights(model):
    """Initialize model weights using Kaiming He initialization."""
    for m in model.modules():
        if isinstance(m, nn.Linear):
            nn.init.kaiming_normal_(m.weight, nonlinearity='relu')
            if m.bias is not None:
                nn.init.zeros_(m.bias)

def load_and_scale_data(parquet_file, feature_scaler, exclude_cols, target_col="responder_6", weight_col="weight"):
    df = pl.read_parquet(parquet_file)
    numeric_cols = [col for col in df.columns if col not in exclude_cols]
    features = df.select(numeric_cols).to_numpy().astype(np.float32)
    targets = df[target_col].to_numpy().astype(np.float32)
    weights = df[weight_col].to_numpy().astype(np.float32)
    features = feature_scaler.transform(features)
    return features, targets, weights

def get_unique_dates(files):
    all_dates = set()
    for file in files:
        df = pl.read_parquet(file).select("date_id")
        unique_dates = df.unique().to_series().to_list()
        all_dates.update(unique_dates)
    return sorted(all_dates)

def create_rolling_folds(dates_list, n_folds, gap_size, val_size, train_size):
    folds = []
    idx_start = 0
    total_dates = len(dates_list)

    for _ in range(n_folds):
        train_end = idx_start + train_size
        gap_end = train_end + gap_size
        val_end = gap_end + val_size

        if val_end > total_dates:
            break

        train_dates = dates_list[idx_start:train_end]
        val_dates = dates_list[gap_end:val_end]
        
        folds.append((train_dates, val_dates))
        idx_start = val_end

    return folds

def train_model_with_cv(folds, files, feature_scaler):
    for fold_idx, (train_dates, val_dates) in enumerate(folds, start=1):
        logger.info(f"Training Fold {fold_idx}/{len(folds)}...")

        model = AutoencoderWithMLP(
            input_dim=GLOBAL_INPUT_DIM,
            hidden_dim=HIDDEN_DIM,
            dropout_rate=DROPOUT_RATE,
            noise_std=NOISE_STD
        )
        initialize_weights(model)
        model.to(device)

        optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
        scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=3, verbose=True)
        recon_criterion = nn.MSELoss()
        supervised_criterion = nn.MSELoss()
        scaler = amp.GradScaler()
        early_stopping = EarlyStopping(patience=5)

        best_val_loss = float('inf')
        
        for epoch in range(1, EPOCHS + 1):
            model.train()
            train_loss = 0.0
            train_steps = 0
            
            for file in files:
                features, targets, weights = load_and_scale_data(file, feature_scaler, EXCLUDE_COLS)
                train_dataset = FinancialDataset(features, targets, weights)
                train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=8, pin_memory=True)
                
                for batch_x, batch_y, batch_w in train_loader:
                    batch_x, batch_y, batch_w = batch_x.to(device), batch_y.to(device), batch_w.to(device)
                    
                    optimizer.zero_grad(set_to_none=True)
                    
                    with amp.autocast(device_type=device.type):
                        reconstructed, mlp_output, _ = model(batch_x)
                        loss_recon = recon_criterion(reconstructed, batch_x)
                        loss_sup = supervised_criterion(mlp_output.squeeze(), batch_y)
                        loss_sup = (loss_sup * batch_w).mean()
                        loss = loss_recon + loss_sup
                    
                    scaler.scale(loss).backward()
                    scaler.unscale_(optimizer)
                    torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
                    scaler.step(optimizer)
                    scaler.update()
                    
                    train_loss += loss.item()
                    train_steps += 1

            # Validation
            model.eval()
            val_loss = 0.0
            val_steps = 0
            all_targets, all_predictions = [], []

            with torch.no_grad():
                for file in files:
                    features, targets, weights = load_and_scale_data(file, feature_scaler, EXCLUDE_COLS)
                    val_dataset = FinancialDataset(features, targets, weights)
                    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=8, pin_memory=True)

                    for batch_x, batch_y, batch_w in val_loader:
                        batch_x, batch_y, batch_w = batch_x.to(device), batch_y.to(device), batch_w.to(device)
                        
                        with amp.autocast(device_type=device.type):
                            reconstructed, mlp_output, _ = model(batch_x)
                            loss_recon = recon_criterion(reconstructed, batch_x)
                            loss_sup = supervised_criterion(mlp_output.squeeze(), batch_y)
                            loss_sup = (loss_sup * batch_w).mean()
                            loss = loss_recon + loss_sup
                            
                        val_loss += loss.item()
                        val_steps += 1
                        
                        all_targets.extend(batch_y.cpu().numpy())
                        all_predictions.extend(mlp_output.squeeze().cpu().numpy())

            avg_train_loss = train_loss / train_steps
            avg_val_loss = val_loss / val_steps
            r2 = r2_score(all_targets, all_predictions)
            
            logger.info(f"[Fold {fold_idx} | Epoch {epoch}] Train Loss: {avg_train_loss:.5f} | Val Loss: {avg_val_loss:.5f} | R²: {r2:.5f}")
            
            scheduler.step(avg_val_loss)
            
            if avg_val_loss < best_val_loss:
                best_val_loss = avg_val_loss
                torch.save(model.state_dict(), f"best_model_fold_{fold_idx}.pt")
                
            early_stopping(avg_val_loss)
            if early_stopping.early_stop:
                logger.info("Early stopping triggered")
                break

# Load or build feature scaler
if os.path.exists("feature_scaler.pkl"):
    with open("feature_scaler.pkl", "rb") as f:
        feature_scaler = pickle.load(f)
    logger.info("Loaded existing feature scaler")
else:
    df_tmp = pl.read_parquet(all_part_files[0])
    numeric_cols = [col for col in df_tmp.columns if col not in EXCLUDE_COLS]
    GLOBAL_INPUT_DIM = len(numeric_cols)
    features = df_tmp.select(numeric_cols).to_numpy()
    feature_scaler = StandardScaler().fit(features)
    with open("feature_scaler.pkl", "wb") as f:
        pickle.dump(feature_scaler, f)
    logger.info("Built and saved new feature scaler")
    del df_tmp

# Get GLOBAL_INPUT_DIM
GLOBAL_INPUT_DIM = feature_scaler.n_features_in_

# Create folds
sorted_dates = get_unique_dates(all_part_files)
folds = create_rolling_folds(sorted_dates, N_FOLDS, GAP_SIZE, VAL_SIZE, TRAIN_SIZE)
logger.info(f"Created {len(folds)} folds")

# Train model
train_model_with_cv(folds, all_part_files, feature_scaler)

2025-01-12 14:13:24,589 [INFO] Using device: cuda
2025-01-12 14:13:24,593 [INFO] Found 1 partitioned files.
2025-01-12 14:14:45,560 [INFO] ✅ Loaded existing feature scaler from disk.
2025-01-12 14:14:53,077 [INFO] ✅ Created 3 rolling folds for cross-validation.
2025-01-12 14:14:53,080 [INFO] Training Fold 1/3...
In addition, using fork() with Python in general is a recipe for mysterious
deadlocks and crashes.

The most likely reason you are seeing this error is because you are using the
multiprocessing module on Linux, which uses fork() by default. This will be
fixed in Python 3.14. Until then, you want to use the "spawn" context instead.

See https://docs.pola.rs/user-guide/misc/multiprocessing/ for details.

or by setting POLARS_ALLOW_FORKING_THREAD=1.

  self.pid = os.fork()


In [1]:
import polars as pl
import os

def partition_by_chunk_size(input_file, output_dir, chunk_size):
    """
    Reads a Parquet file and saves it as partitioned Parquet files based on a fixed row chunk size.

    Parameters:
    - input_file (str): Path to the input Parquet file.
    - output_dir (str): Path to save the partitioned Parquet files.
    - chunk_size (int): Number of rows per partitioned file.
    """
    # Ensure the output directory exists
    os.makedirs(output_dir, exist_ok=True)
    
    # Load the full Parquet dataset (Polars loads data lazily)
    df = pl.read_parquet(input_file)
    print(f"Loaded Parquet file with {df.shape[0]} rows and {df.shape[1]} columns.")
    
    # Total number of rows and how many partitions will be needed
    total_rows = df.shape[0]
    num_partitions = (total_rows + chunk_size - 1) // chunk_size  # Ceiling division

    # Split and save each chunk
    for i in range(num_partitions):
        start_idx = i * chunk_size
        end_idx = min(start_idx + chunk_size, total_rows)
        partition_df = df.slice(start_idx, end_idx - start_idx)  # Slice operation in Polars
        partition_file_path = os.path.join(output_dir, f"chunk_{i+1}.parquet")
        
        # Save the chunk as a separate Parquet file
        partition_df.write_parquet(partition_file_path)
        print(f"Saved chunk {i+1}: Rows {start_idx} to {end_idx} ({partition_df.shape[0]} rows)")

    print(f"Partitioning complete! {num_partitions} files saved in: {output_dir}")

# Example usage
input_file = "/home/jupyter/data/int32Features.parquet"  # Specify your input Parquet file path
output_dir = "/home/jupyter/data/partitioned/"  # Specify your output directory
chunk_size = 5500000  # Number of rows per partition

partition_by_chunk_size(input_file, output_dir, chunk_size)


Loaded Parquet file with 46334372 rows and 180 columns.
Saved chunk 1: Rows 0 to 5500000 (5500000 rows)
Saved chunk 2: Rows 5500000 to 11000000 (5500000 rows)
Saved chunk 3: Rows 11000000 to 16500000 (5500000 rows)
Saved chunk 4: Rows 16500000 to 22000000 (5500000 rows)
Saved chunk 5: Rows 22000000 to 27500000 (5500000 rows)
Saved chunk 6: Rows 27500000 to 33000000 (5500000 rows)
Saved chunk 7: Rows 33000000 to 38500000 (5500000 rows)
Saved chunk 8: Rows 38500000 to 44000000 (5500000 rows)
Saved chunk 9: Rows 44000000 to 46334372 (2334372 rows)
Partitioning complete! 9 files saved in: /home/jupyter/data/partitioned/


In [None]:
#/home/jupyter/data/int32Features.parquet

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import polars as pl
import optuna
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import TimeSeriesSplit
from sklearn.preprocessing import MinMaxScaler
import os

# Ensure Polars compatibility with multiprocessing
os.environ["POLARS_ALLOW_FORKING_THREAD"] = "1"

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


In [2]:
# Load dataset and enforce positive weights
train_data = pl.read_parquet("/home/jupyter/data/int32Features.parquet").with_columns(pl.col("weight").abs())

# Convert to NumPy arrays (Ensure compatibility)
responder_6_target = train_data['responder_6'].to_numpy().astype(np.float32)
weights = train_data['weight'].to_numpy().astype(np.float32)

# Select numerical columns and convert to NumPy
exclude_columns = ['date_id', 'time_id', 'symbol_id', 'weight', 'partition_id'] + \
                  [col for col in train_data.columns if col.startswith('responder_')]
numerical_columns = [col for col in train_data.columns if col not in exclude_columns]
scaled_features = train_data.select(numerical_columns).to_numpy().astype(np.float32)

print(f"Loaded {scaled_features.shape[0]} samples with {scaled_features.shape[1]} features.")


Loaded 46334372 samples with 158 features.


In [3]:
class FinancialDataset(Dataset):
    def __init__(self, features, target, weights):
        self.features = np.array(features, dtype=np.float32)
        self.target = np.array(target, dtype=np.float32).reshape(-1)  
        self.weights = np.array(weights, dtype=np.float32).reshape(-1)  

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        return (
            torch.tensor(self.features[idx], dtype=torch.float32),
            torch.tensor(self.target[idx], dtype=torch.float32),
            torch.tensor(self.weights[idx], dtype=torch.float32)
        )

In [4]:
class SupervisedAutoencoder(nn.Module):
    def __init__(self, input_dim, hidden_dim=128, dropout_rate=0.2):
        super(SupervisedAutoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.SiLU(),
            nn.BatchNorm1d(hidden_dim),
            nn.Dropout(dropout_rate),
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.SiLU(),
            nn.BatchNorm1d(hidden_dim // 2)
        )
        self.bottleneck = nn.Linear(hidden_dim // 2, hidden_dim // 4)
        self.supervised_output = nn.Linear(hidden_dim // 4, 1)  # No sigmoid for BCEWithLogitsLoss compatibility
        self.decoder = nn.Sequential(
            nn.Linear(hidden_dim // 4, hidden_dim // 2),
            nn.SiLU(),
            nn.BatchNorm1d(hidden_dim // 2),
            nn.Linear(hidden_dim // 2, hidden_dim),
            nn.SiLU(),
            nn.BatchNorm1d(hidden_dim),
            nn.Linear(hidden_dim, input_dim)
        )

    def forward(self, x):
        encoded = self.encoder(x)
        latent = self.bottleneck(encoded)
        supervised_output = self.supervised_output(latent)  
        reconstructed = self.decoder(latent)
        return reconstructed, supervised_output, latent

In [5]:
def initialize_weights(model):
    for m in model.modules():
        if isinstance(m, nn.Linear):
            nn.init.kaiming_normal_(m.weight, nonlinearity='relu')
            if m.bias is not None:
                nn.init.zeros_(m.bias)

In [6]:
def train_autoencoder(model, train_loader, val_loader, epochs=10, learning_rate=0.001, save_model_path=None, save_features_path=None):
    model = model.to(device)
    criterion_reconstruction = nn.HuberLoss()
    criterion_supervised = nn.BCEWithLogitsLoss(reduction='none')
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    for epoch in range(epochs):
        model.train()
        total_loss = 0
        for batch_features, batch_labels, batch_weights in train_loader:
            batch_features, batch_labels, batch_weights = (
                batch_features.to(device),
                batch_labels.to(device),
                batch_weights.to(device)
            )

            optimizer.zero_grad()
            reconstructed, supervised_output, _ = model(batch_features)

            # Proper weighted loss handling
            reconstruction_loss = criterion_reconstruction(reconstructed, batch_features)
            supervised_loss = criterion_supervised(supervised_output.squeeze(), batch_labels)
            supervised_loss = (supervised_loss * torch.clamp(batch_weights, min=0)).mean()

            # Total loss and backward pass
            loss = reconstruction_loss + supervised_loss
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()
            total_loss += loss.item()

        print(f"Epoch {epoch+1}/{epochs}, Train Loss: {total_loss / len(train_loader):.4f}")

    # Save Final Model and Features Once
    if save_model_path:
        torch.save(model.state_dict(), save_model_path)

    if save_features_path:
        model.eval()
        latent_features = []
        with torch.no_grad():
            for batch_features, _, _ in train_loader:
                batch_features = batch_features.to(device)
                _, _, latent = model(batch_features)
                latent_features.append(latent.cpu().numpy())

        latent_features = np.vstack(latent_features)
        df_latent = pl.DataFrame(latent_features)
        df_latent.write_parquet(save_features_path)

In [7]:
def objective(trial):
    hidden_dim = trial.suggest_int("hidden_dim", 128, 256)
    dropout_rate = trial.suggest_float("dropout_rate", 0.1, 0.5)
    learning_rate = trial.suggest_float("learning_rate", 1e-5, 1e-2, log=True)
    epochs = 10

    model = SupervisedAutoencoder(input_dim=scaled_features.shape[1], hidden_dim=hidden_dim, dropout_rate=dropout_rate).to(device)
    model.apply(initialize_weights)

    tscv = TimeSeriesSplit(n_splits=5)
    total_val_loss = 0

    for train_idx, val_idx in tscv.split(scaled_features):
        train_dataset = FinancialDataset(scaled_features[train_idx], responder_6_target[train_idx], weights[train_idx])
        val_dataset = FinancialDataset(scaled_features[val_idx], responder_6_target[val_idx], weights[val_idx])

        train_loader = DataLoader(train_dataset, batch_size=1024, shuffle=True, num_workers=0)
        val_loader = DataLoader(val_dataset, batch_size=1024, shuffle=False, num_workers=0)

        train_autoencoder(model, train_loader, val_loader, epochs, learning_rate)

    return total_val_loss / 5

In [8]:
# Running Optuna Hyperparameter Tuning
study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=20)
print("Best Hyperparameters:", study.best_params)

# Train Final Model with the Best Hyperparameters and Save Results
best_model = SupervisedAutoencoder(
    input_dim=train_data.shape[1],
    hidden_dim=study.best_params['hidden_dim'],
    dropout_rate=study.best_params['dropout_rate']
).to(device)

best_model.apply(initialize_weights)

train_autoencoder(
    best_model, 
    train_loader, 
    train_loader, 
    epochs=20, 
    learning_rate=study.best_params['learning_rate'],
    save_model_path="/home/jupyter/data/best_supervised_autoencoder_with_weights.pth",
    save_features_path="/home/jupyter/data/train_latent_features_with_weights.parquet"
)

print("Training complete. Final model and latent features saved.")

[I 2025-01-08 17:16:55,347] A new study created in memory with name: no-name-5ae98a5e-65d1-45d2-a097-c9e27b27047d


Epoch 1/10, Train Loss: -2.4606


[W 2025-01-08 17:22:17,583] Trial 0 failed with parameters: {'hidden_dim': 166, 'dropout_rate': 0.35954558730731856, 'learning_rate': 7.185737766279661e-05} because of the following error: KeyboardInterrupt().
Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/optuna/study/_optimize.py", line 197, in _run_trial
    value_or_values = func(trial)
  File "/var/tmp/ipykernel_6198/1380901555.py", line 20, in objective
    train_autoencoder(model, train_loader, val_loader, epochs, learning_rate)
  File "/var/tmp/ipykernel_6198/517933719.py", line 10, in train_autoencoder
    for batch_features, batch_labels, batch_weights in train_loader:
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/dataloader.py", line 701, in __next__
    data = self._next_data()
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/dataloader.py", line 757, in _next_data
    data = self._dataset_fetcher.fetch(index)  # may raise StopIteration
  File "/opt/cond

KeyboardInterrupt: 