In [2]:
import os
import gc
from configparser import ConfigParser
import datetime

import polars as pl

import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader

import pytorch_lightning as L
from pytorch_lightning.loggers.neptune import NeptuneLogger
from pytorch_lightning.callbacks.early_stopping import EarlyStopping
from pytorch_lightning.callbacks import ModelCheckpoint
from torchmetrics import F1Score, Accuracy

import sys
sys.path.append('..')

from models.code import CRNN, CNN, CBiRNN, CTransformer

In [None]:
config = ConfigParser()
config.read(['../configs/neptune_secret.ini'], encoding='utf-8')

cfg = {
    'debug': True,
    'seed': 123,
    'device': 'gpu' if torch.cuda.is_available() else 'cpu',
    'n_devices': torch.cuda.device_count() if torch.cuda.is_available() else 1,
    'companies': ['CHMF', 'GAZP', 'GMKN', 'LKOH', 'MOEX', 'NLMK', 'NVTK', 'PLZL', 'ROSN', 'SBER', 'SNGS', 'T', 'TATN'],

    'batch_size': 32,
    'epochs': 100,
    'accamulate_grad_batches': 1,
    'lr': 3e-4,
    'mixed': True,
    'patience': 10,
    'save_top_k': 5,

    'weight_decay': 1e-2,
    'max_grad_norm': 1e6,

    'sentiment': True,
    'model_name': 'CNN',
    'emb_type': 'vk',
    'out_dir': 'D:/Jora/Аспирантура/Stock_forecast_with_investors_sentiment/models/weights/'
}

models = {
    'CNN': CNN,
    'CRNN': CRNN,
    'CBiRNN': CBiRNN,
    'CTransformer': CTransformer
}

base_model = models[cfg['model_name']]()

if cfg['debug']:
    kwargs = {'limit_train_batches': 2,
              'limit_val_batches': 2, 
              'num_sanity_val_steps': 0}
    cfg['batch_size'] = 2
    cfg['epochs'] = 2
    cfg['mixed'] = False if cfg['device'] == 'cpu' else True
else:
    kwargs = {}

os.makedirs(cfg['out_dir'] + cfg['model_name'], exist_ok=True)

In [None]:
class PulseSent(L.LightningModule):

    def __init__(self, model, lr, epochs, out_dir=cfg['out_dir']):
        super(PulseSent, self).__init__()

        self.model = model
        self.lr = lr
        self.epochs = epochs
        self.loss = nn.MSELoss()
        dev = 'cuda' if cfg['device'] == 'gpu' else 'cpu'
        self.metrics = {'f1': F1Score(task='binary').to(dev), 'acc': Accuracy(task='binary').to(dev)}
        self.out_dir = out_dir

    def forward(self, x):
        return self.model(x)

    def all_gather_reshape(self, data_lst):
        """Collects tensors from all GPUs and reshapes them to (NUM_GPUS*N, D, H, W)"""
        data_lst = list(self.all_gather(data_lst))
        for i in range(len(data_lst)):
            data_lst[i] = data_lst[i].reshape(-1, *data_lst[i].shape[2:])
        return data_lst

    def training_step(self, batch):
        x, y = batch
        out = self(x)
        loss = self.loss(out, y)
        self.log(f"train_loss", 100*loss.sqrt(), on_step=False, on_epoch=True, prog_bar=True, sync_dist=True)
        return loss

    def validation_step(self, batch):
        self._shared_step(batch, 'val')

    def test_step(self, batch):
        self._shared_step(batch, 'test')

    def _shared_step(self, batch, name):
        x, y = batch
        out = self(x)
        val_loss = self.loss(out, y)
        if cfg['n_devices'] > 1:
            out, y = self.all_gather_reshape((out, y))
        for metric in self.metrics:
            self.metrics[metric].update((out > 0).type(torch.int), (y > 0).type(torch.int))
            self.log(f"{name}_{metric}", self.metrics[metric].compute(), on_step=False, on_epoch=True, logger=True, prog_bar=True, sync_dist=True)
        self.log(f"{name}_loss", 100*val_loss.sqrt(), on_step=False, on_epoch=True, logger=True, prog_bar=True, sync_dist=True)


    def predict_step(self, x):
        return self(x)

    def configure_optimizers(self):
        optimizer = torch.optim.AdamW(self.parameters(), lr=self.lr, weight_decay=cfg['weight_decay'])
        return optimizer

In [None]:
class SentDataset(Dataset):

    def __init__(self, df_train: pl.DataFrame, n_lags: int, df_val: pl.DataFrame=None, target_word='diff', add_embs=cfg['sentiment']):
        super().__init__()
        df_with_lags = self.create_lags(df_train, n_lags, df_val)

        target_cols = [col for col in df_with_lags.columns if target_word in col and 'lag' not in col]
        ts_cols = [col for col in df_with_lags.columns if target_word in col and 'lag' in col]
        
        ts = df_with_lags.select(ts_cols).to_torch(dtype=pl.Float32).view((len(df_with_lags), n_lags, -1))

        if add_embs:
            emb_cols = [col for col in df_with_lags.columns if 'column' in col and 'lag' in col]
            embs = df_with_lags.select(emb_cols).to_torch(dtype=pl.Float32).view((len(df_with_lags), n_lags, -1))
            self.X = torch.cat([embs, ts], dim=-1)
        else:
            self.X = ts

        self.y = df_with_lags.select(target_cols).to_torch(dtype=pl.Float32)
        
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, ind):
        return self.X[ind], self.y[ind]
    
    @staticmethod
    def create_lags(df_train: pl.DataFrame, n_lags: int, df_val: pl.DataFrame=None):
        price_cols = [col for col in df_train.columns if 'diff' in col]
        emb_cols = [col for col in df_train.columns if 'column' in col]

        if df_val is not None:
            val_dates = df_val['date']
            df_train = pl.concat([df_train, df_val])
        
        lag_expressions = []

        for lag in range(1, n_lags + 1):
            for pc in price_cols:
                lag_expressions.append(
                    pl.col(pc).shift(lag).alias(f'{pc}_lag_{lag}')
                )
        
        for lag in range(1, n_lags+1):
            for ec in emb_cols:
                lag_expressions.append(
                    pl.col(ec).shift(lag).alias(f'{ec}_lag_{lag}')
                )

        df_with_lags = df_train.with_columns(lag_expressions)

        if df_val is not None:
            df_with_lags = df_with_lags.filter(pl.col('date').is_in(val_dates.implode())).sort('date')
        
        return df_with_lags.drop_nulls()

In [None]:
# df = pl.read_parquet('../data/final/GAZP.parquet').drop(['weighted_1d', 'open', 'close', 'high', 'low'])

# df_train = df.filter(pl.col('date') <= datetime.date(2024, 4, 30))
# df_val = df.filter(pl.col('date') > datetime.date(2024, 4, 30),
#                    pl.col('date') <= datetime.date(2024, 10, 31))
# df_test = df.filter(pl.col('date') > datetime.date(2024, 10, 31))

# ds_train = SentDataset(df_train, 10)
# ds_val = SentDataset(df_train, 10, df_val)
# ds_test = SentDataset(df_val, 10, df_test)

# dl_train = DataLoader(ds_train, batch_size=32, shuffle=True, drop_last=True, pin_memory=True)
# dl_val = DataLoader(ds_val, batch_size=32, shuffle=False, pin_memory=True)
# dl_test = DataLoader(ds_test, batch_size=32, shuffle=False, pin_memory=True)

# for batch in dl_train:
#     break

# batch[0].shape, batch[1].shape

In [None]:
def train_and_validate(model_version: str, company: str):

    df = pl.read_parquet(f'../data/final/{company}.parquet').drop(['weighted_1d', 'open', 'close', 'high', 'low'])

    df_train = df.filter(pl.col('date') <= datetime.date(2024, 4, 30))
    df_val = df.filter(pl.col('date') > datetime.date(2024, 4, 30),
                    pl.col('date') <= datetime.date(2024, 10, 31))
    df_test = df.filter(pl.col('date') > datetime.date(2024, 10, 31))

    if not cfg['debug']:
        neptune_logger = NeptuneLogger(
            project=config['NEPTUNE']['project'],
            api_key=config['NEPTUNE']['key'],
            log_model_checkpoints=False,
            name=cfg['model_name'] + f'_{company}' + (f'{cfg["sentiment"]}' if cfg["sentiment"] else '')
            )
    else:
        neptune_logger = None

    if not cfg['debug']:
        neptune_logger.log_hyperparams(cfg) if neptune_logger else None

    ds_train = SentDataset(df_train, 10)
    ds_val = SentDataset(df_train, 10, df_val)
    ds_test = SentDataset(df_val, 10, df_test)

    dl_train = DataLoader(ds_train, batch_size=32, shuffle=True, drop_last=True, pin_memory=True)
    dl_val = DataLoader(ds_val, batch_size=32, shuffle=False, pin_memory=True)
    dl_test = DataLoader(ds_test, batch_size=32, shuffle=False, pin_memory=True)

    checkpoint_callback = ModelCheckpoint(
        dirpath=cfg['out_dir'],
        monitor=f"val_loss_{i}",
        save_top_k=cfg['save_top_k'],
        save_last=False,
        save_weights_only=True,
        filename=f"{company}_fold_{i}" + "_epoch_{epoch}_" + "{val_loss:.4f}",
        save_on_train_epoch_end=False,
        verbose=False,
        auto_insert_metric_name=False,
        mode="min",
    )

    early_stop_callback = EarlyStopping(
        monitor=f"val_loss_{i}", min_delta=0.00, patience=cfg['patience'], verbose=False, mode="min")

    model = PulseSent(
        model=base_model,
        lr=cfg['lr'],
        epochs=cfg['epochs'],
        steps_per_epoch=len(train_loader),
        out_dir=cfg['out_dir']
    )

    trainer = L.Trainer(
        logger=False if cfg['debug'] else neptune_logger,
        max_epochs=cfg['epochs'],
        deterministic=False,
        accumulate_grad_batches=cfg['accamulate_grad_batches'],
        accelerator=cfg['device'],
        devices=cfg['n_devices'],
        callbacks=[checkpoint_callback, early_stop_callback],
        precision='16-mixed' if cfg['mixed'] else 32,
        gradient_clip_val=cfg['max_grad_norm'],
        log_every_n_steps=1,
        enable_model_summary=False,
        enable_progress_bar=False,
        **kwargs
    )

    trainer.fit(
        model=model,
        train_dataloaders=dl_train,
        val_dataloaders=dl_val
    )

    best_model = PulseSent.load_from_checkpoint(checkpoint_callback.best_model_path)
    best_model.eval()
    trainer.test(best_model, df_test)

    del trainer, model
    gc.collect()

if not cfg['debug']:
    neptune_logger.experiment.stop()