# Kaggle Competition 

In [1]:
from streaming import StreamingDataset
# from streaming.base.util import clean_stale_shared_memory
import streaming
from shutil import rmtree
import pytorch_lightning as pl
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
import polars

In [2]:
feature_cols = ["symbol_id", "time_id"] +  [f"feature_{idx:02d}" for idx in range(79)]+ [f"responder_{idx}_lag_1" for idx in range(9)]

#select target values
target_cols = ["responder_6"]

# select the weight values
weight_cols = ["weight"]

In [3]:
class Customdataset_train(Dataset):
    def __init__(self, data):
        self.data = data
    def __len__(self):
        # Returb the length of the dataset
        return len(self.data)

    def __getitem__(self, idx):
        # return the data at a given index
        # x = all features defined in the feature_cols
        # y = the target values that needed to be predicted definded in the target_cols
        # w = the weight values for calculating the loss defined in the weight_cols
        x = self.data[idx]["x"]
        y = self.data[idx]["y"]
        w = self.data[idx]["w"]
        return x, y, w

In [4]:
class CustomDataset_val(Dataset):
    def __init__(self, dataframe, accerlerator):
        # Store each part of the dataframe that needed as a tensor
        self.features = torch.FloatTensor(dataframe.select(feature_cols).to_numpy()).to(accerlerator)
        self.labels = torch.FloatTensor(dataframe.select(target_cols).to_numpy()).to(accerlerator)
        self.weights = torch.FloatTensor(dataframe.select(weight_cols).to_numpy()).to(accerlerator)


    def __len__(self):
        # Returb the length of the dataset
        return len(self.labels)

    def __getitem__(self, idx):
        # return the data at a given index
        # x = all features defined in the feature_cols
        # y = the target values that needed to be predicted definded in the target_cols
        # w = the weight values for calculating the loss defined in the weight_cols
        x = self.features[idx]
        y = self.labels[idx]
        w = self.weights[idx]
        return x, y, w

In [5]:
class MyModel(pl.LightningModule):
    def __init__(self, input_dim, hidden_dims, dropouts, lr, weight_decay):
        super().__init__()
        self.save_hyperparameters()
        layers = []
        in_dim = input_dim
        for i, hidden_dim in enumerate(hidden_dims):
            layers.append(nn.BatchNorm1d(in_dim))
            if i > 0:
                layers.append(nn.SiLU())
            if i < len(dropouts):
                layers.append(nn.Dropout(dropouts[i]))
            layers.append(nn.Linear(in_dim, hidden_dim))
            # layers.append(nn.ReLU())
            in_dim = hidden_dim
        layers.append(nn.Linear(in_dim, 1))  # 输出层
        layers.append(nn.Tanh())
        self.model = nn.Sequential(*layers)
        self.lr = lr
        self.weight_decay = weight_decay
        self.validation_step_outputs = []
    
    def forward(self, x):
        return 5 * self.model(x).squeeze(-1)
    
    def training_step(self, batch):
        x, y, w = batch
        y_hat = self(x)
        y = y.view(-1)
        loss = F.mse_loss(y_hat, y, reduction='none') * w  
        loss = loss.mean()
        self.log('train_loss', loss, on_step=False, on_epoch=True, batch_size=x.size(0))
        return loss

    def validation_step(self, batch):
        x, y, w = batch
        y_hat = self(x)
        y = y.view(-1)
        loss = F.mse_loss(y_hat, y, reduction='none') * w
        loss = loss.mean()
        self.log('val_loss', loss, on_step=False, on_epoch=True, batch_size=x.size(0))
        self.validation_step_outputs.append((y_hat, y, w))
        return loss

    # def on_validation_epoch_end(self):
    #     """Calculate validation WRMSE at the end of the epoch."""
    #     y = torch.cat([x[1] for x in self.validation_step_outputs]).cpu().numpy()
    #     if self.trainer.sanity_checking:
    #         prob = torch.cat([x[0] for x in self.validation_step_outputs]).cpu().numpy()
    #     else:
    #         prob = torch.cat([x[0] for x in self.validation_step_outputs]).cpu().numpy()
    #         weights = torch.cat([x[2] for x in self.validation_step_outputs]).cpu().numpy()
    #         # Ensure the shapes match by specifying the axis
    #         val_r_square = r2_val(y, prob, weights, axis=0)
    #         self.log("val_r_square", val_r_square, prog_bar=True, on_step=False, on_epoch=True)
    #     self.validation_step_outputs.clear()

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=self.lr, weight_decay=self.weight_decay)
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5,
                                                               verbose=True)
        return {
            'optimizer': optimizer,
            'lr_scheduler': {
                'scheduler': scheduler,
                'monitor': 'val_loss',
            }
        }

    def on_train_epoch_end(self):
        if self.trainer.sanity_checking:
            return
        epoch = self.trainer.current_epoch
        metrics = {k: v.item() if isinstance(v, torch.Tensor) else v for k, v in self.trainer.logged_metrics.items()}
        formatted_metrics = {k: f"{v:.5f}" for k, v in metrics.items()}
        print(f"Epoch {epoch}: {formatted_metrics}")

In [6]:
# finish
#have to test with train_data loader and val_dataloader
# the __init and setup seem to work fine
class DataModule(pl.LightningDataModule):
    def __init__(self, local,remote, batch_size, valid_df=None, accelerator='cuda'):
        super().__init__()
        self.local = local
        self.remote = remote
        self.batch_size = batch_size
        self.accelerator = accelerator
        self.train_dataset = None
        self.valid_df = valid_df
        self.val_dataset = None


    def train_dataloader(self, Pw=False, n_workers=0):
        dataset =  StreamingDataset(local=self.local, remote=self.remote,allow_unsafe_types=True, batch_size= self.batch_size)
        self.train_dataset = Customdataset_train(dataset)
        return DataLoader(self.train_dataset, batch_size=self.batch_size, persistent_workers=Pw, shuffle=False, num_workers=n_workers)

    def val_dataloader(self, Pw=False, n_workers=0):
        self.val_dataset = CustomDataset_val(self.valid_df, self.accelerator)
        return DataLoader(self.val_dataset, batch_size=self.batch_size, persistent_workers=Pw, shuffle=False, num_workers=n_workers)

In [7]:
n_hidden = [512, 512, 256]
dropout = [0.1, 0.1]
lr = 1e-3
max_epochs = 100
weight_decay = 5e-4
patience = 25
# try to increase batch size 
# 512   4.40
# 1024  3.30  have some bottle neck during dataloader # try to fix it 
# 8192 is fine
batch_size = 1024
pw = False # set to false when numloader = 0
num_loader = 0
if num_loader > 0: pw = True # set to false when numloader > 0
input_dim = 90

In [8]:
model = MyModel(
    input_dim= input_dim,
    hidden_dims= n_hidden,
    dropouts = dropout,
    lr = lr,
    weight_decay = weight_decay
)   

In [9]:
from pytorch_lightning.callbacks import EarlyStopping, ModelCheckpoint, Timer

In [10]:
early_stopping = EarlyStopping('val_loss', patience=patience, mode='min', verbose=False)
checkpoint_callback = ModelCheckpoint(monitor='val_loss', mode='min', save_top_k=1, verbose=False, filename=f"./models/nn.model") 
timer = Timer()

In [11]:
trainer = pl.Trainer(
    max_epochs= max_epochs,
    accelerator="gpu",
    devices = 1,
    logger = None,
    callbacks=[early_stopping, checkpoint_callback, timer],
    enable_progress_bar=True
)

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs


In [12]:
remote = 'test_output_dir'

# Local working dir where dataset is cached during operation
local = 'work'

In [13]:
valid_df = polars.read_parquet("data/validate_data.parquet")

In [14]:
data_module =  DataModule(local, remote,batch_size, valid_df= valid_df)

In [15]:
trainer.fit(model, data_module.train_dataloader(n_workers = num_loader), data_module.val_dataloader(n_workers = num_loader))

You are using a CUDA device ('NVIDIA GeForce RTX 4050 Laptop GPU') that has Tensor Cores. To properly utilize them, you should set `torch.set_float32_matmul_precision('medium' | 'high')` which will trade-off precision for performance. For more details, read https://pytorch.org/docs/stable/generated/torch.set_float32_matmul_precision.html#torch.set_float32_matmul_precision
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name  | Type       | Params | Mode 
---------------------------------------------
0 | model | Sequential | 443 K  | train
---------------------------------------------
443 K     Trainable params
0         Non-trainable params
443 K     Total params
1.772     Total estimated model params size (MB)
13        Modules in train mode
0         Modules in eval mode


Sanity Checking: |          | 0/? [00:00<?, ?it/s]

c:\Users\Asus\anaconda3\Lib\site-packages\pytorch_lightning\trainer\connectors\data_connector.py:425: The 'val_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=15` in the `DataLoader` to improve performance.
c:\Users\Asus\anaconda3\Lib\site-packages\pytorch_lightning\trainer\connectors\data_connector.py:425: The 'train_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=15` in the `DataLoader` to improve performance.
c:\Users\Asus\anaconda3\Lib\site-packages\pytorch_lightning\loops\fit_loop.py:310: The number of training batches (1) is smaller than the logging interval Trainer(log_every_n_steps=50). Set a lower value for log_every_n_steps if you want to see logs for the training epoch.


Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Epoch 0: {'val_loss': '46.02374', 'train_loss': '5.55317'}


Validation: |          | 0/? [00:00<?, ?it/s]

Epoch 1: {'val_loss': '44.09077', 'train_loss': '15.01641'}


Validation: |          | 0/? [00:00<?, ?it/s]

Epoch 2: {'val_loss': '42.43167', 'train_loss': '11.88064'}


Validation: |          | 0/? [00:00<?, ?it/s]


Detected KeyboardInterrupt, attempting graceful shutdown ...


NameError: name 'exit' is not defined