In [5]:
%pip install pytorch-lifestream

Defaulting to user installation because normal site-packages is not writeable
Collecting pytorch-lifestream
  Downloading pytorch-lifestream-0.6.0.tar.gz (163 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m163.4/163.4 kB[0m [31m1.3 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25h  Preparing metadata (pyproject.toml) ... [?25ldone
Collecting pytorch-lightning>=1.6.0 (from pytorch-lifestream)
  Downloading pytorch_lightning-2.3.0-py3-none-any.whl.metadata (21 kB)
Collecting torchmetrics>=0.9.0 (from pytorch-lifestream)
  Downloading torchmetrics-1.4.0.post0-py3-none-any.whl.metadata (19 kB)
Collecting transformers (from pytorch-lifestream)
  Downloading transformers-4.41.2-py3-none-any.whl.metadata (43 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.8/43.8 kB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
Collecting lightning-utilities>=0.8.0 (from pytorch-lightning>=1.6.0->pytorch-lifestream)
  Downloading lightning_utilities-0.11.2-py3-no

In [None]:
import os
os.environ["OMP_NUM_THREADS"] = "16"

import pandas as pd
import numpy as np
import torch
from functools import partial
import pytorch_lightning as pl
import warnings
warnings.filterwarnings("ignore")

from torch.utils.data import DataLoader

from ptls.data_load.datasets import MemoryMapDataset
from ptls.data_load.iterable_processing.iterable_seq_len_limit import ISeqLenLimit
from ptls.data_load.iterable_processing.to_torch_tensor import ToTorch
from ptls.data_load.iterable_processing.feature_filter import FeatureFilter
from ptls.nn import TrxEncoder, RnnSeqEncoder
from ptls.frames.coles import CoLESModule
from ptls.data_load.iterable_processing import SeqLenFilter
from ptls.frames.coles import ColesIterableDataset
from ptls.frames.coles.split_strategy import SampleSlices
from ptls.frames import PtlsDataModule
from ptls.preprocessing import PandasDataPreprocessor
from ptls.data_load.utils import collate_feature_dict
from ptls.data_load.iterable_processing_dataset import IterableProcessingDataset

from tqdm.auto import tqdm
import lightgbm as ltb

Unknown instance spec: 

# Data preprocessing

In [2]:
transactions_train = pd.read_parquet("trx_train.parquet")

In [3]:
transactions_test = pd.read_parquet("trx_test.parquet")

In [4]:
preprocessor = PandasDataPreprocessor(
    col_id="client_id",
    col_event_time="event_time",
    event_time_transformation="dt_to_timestamp",
    cols_category=["event_type",
                   "event_subtype",
                   "currency",
                   "src_type11",
                   "src_type12",
                   "dst_type11",
                   "dst_type12",
                   "src_type21",
                   "src_type22",
                   "src_type31",
                   "src_type32"],
    cols_identity="amount",
    return_records=False,
)

In [None]:
processed_train = preprocessor.fit_transform(transactions_train)

IOStream.flush timed out
IOStream.flush timed out
IOStream.flush timed out
IOStream.flush timed out
IOStream.flush timed out
IOStream.flush timed out


In [None]:
processed_test = preprocessor.transform(transactions_test)

IOStream.flush timed out
IOStream.flush timed out


In [None]:
import pickle

with open('transactions_preprocessor.pkl', 'wb') as f:
    pickle.dump(preprocessor, f)

In [12]:
# processed_train.to_pickle('processed_train.pkl')
# processed_test.to_pickle('processed_test.pkl')

In [13]:
target_train = pd.read_parquet("train_target.parquet")

In [14]:
# target_train = pd.read_parquet("train_target.parquet")

target_preprocessor = PandasDataPreprocessor(
    col_id="client_id",
    col_event_time="mon",
    event_time_transformation="dt_to_timestamp",
    cols_identity=["target_1", "target_2", "target_3", "target_4"],
    return_records=False,
)

processed_target = target_preprocessor.fit_transform(target_train)

In [15]:
import pickle

with open('target_preprocessor.pkl', 'wb') as f:
    pickle.dump(target_preprocessor, f)

In [16]:
test_target_b = pd.read_parquet("test_target_b.parquet")

In [18]:
1

1

In [19]:
1

1

**Обработка датасета:**

- Транзакции, у которых размер < min_seq_len выкидываются
- Транзакции, у которых длина > max_seq_len, обрезаются и конвертируются в torch.tensor
- Не нужные для CoLES фичи удаляются

In [None]:
train = MemoryMapDataset(
    data=processed_train.to_dict("records"),
    i_filters=[
        FeatureFilter(drop_feature_names=['client_id', 'target_1', 'target_2', 'target_3', 'target_4']),
        SeqLenFilter(min_seq_len=32),
        ISeqLenLimit(max_seq_len=4096),
        ToTorch()
    ]
)

test = MemoryMapDataset(
    data=processed_test.to_dict("records"),
    i_filters=[
        FeatureFilter(drop_feature_names=['client_id', 'target_1', 'target_2', 'target_3', 'target_4']),
        SeqLenFilter(min_seq_len=32),
        ISeqLenLimit(max_seq_len=4096),
        ToTorch()
    ]
)

ExecutionException: Kernel has been crashed

In [None]:
train_ds = ColesIterableDataset(
    data=train,
    splitter=SampleSlices(
        split_count=5,
        cnt_min=32,
        cnt_max=180
    )
)

valid_ds = ColesIterableDataset(
    data=test,
    splitter=SampleSlices(
        split_count=5,
        cnt_min=32,
        cnt_max=180
    )
)

ExecutionException: Kernel has been crashed

In [None]:
train_dl = PtlsDataModule(
    train_data=train_ds,
    train_num_workers=16,
    train_batch_size=256,
    valid_data=valid_ds,
    valid_num_workers=16,
    valid_batch_size=256
)

ExecutionException: Kernel has been crashed

# Model

- numeric_values обрабатываются как BatchNorm+Linear
- embedidngs - nn.Embedidngs

In [None]:
trx_encoder_params = dict(
    embeddings_noise=0.003,
    numeric_values={'amount': 'log'},
    embeddings={
        "event_type": {'in': preprocessor.get_category_dictionary_sizes()["event_type"], "out": 24},
        "event_subtype": {'in': preprocessor.get_category_dictionary_sizes()["event_subtype"], "out": 24},
        'src_type11': {'in': preprocessor.get_category_dictionary_sizes()["src_type11"], 'out': 24},
        'src_type12': {'in': preprocessor.get_category_dictionary_sizes()["src_type12"], 'out': 24},
        'dst_type11': {'in': preprocessor.get_category_dictionary_sizes()["dst_type11"], 'out': 24},
        'dst_type12': {'in': preprocessor.get_category_dictionary_sizes()["dst_type12"], 'out': 24},
        'src_type22': {'in': preprocessor.get_category_dictionary_sizes()["src_type22"], 'out': 24},
        'src_type31': {'in': preprocessor.get_category_dictionary_sizes()["src_type31"], 'out': 24},
        'src_type32': {'in': preprocessor.get_category_dictionary_sizes()["src_type32"], 'out': 24},
      }
)

ExecutionException: Kernel has been crashed

- **TrxEncoder** - обрабатывает каждую тразнакцию (строит для неё эмбеддиг)
- **SeqEncoder** - обрабатывает последовательность

In [None]:
seq_encoder = RnnSeqEncoder(
    trx_encoder=TrxEncoder(**trx_encoder_params),
    hidden_size=128,
    type='gru',
)

ExecutionException: Kernel has been crashed

In [27]:
model = CoLESModule(
    seq_encoder=seq_encoder,
    optimizer_partial=partial(torch.optim.Adam, lr=0.001),
    lr_scheduler_partial=partial(torch.optim.lr_scheduler.StepLR, step_size=3, gamma=0.9025)
)

# Train

In [29]:
trainer = pl.Trainer(
    max_epochs=30,
    limit_val_batches=5000,
    # gpus=[0],
    enable_progress_bar=True,
    gradient_clip_val=0.5,
    logger=pl.loggers.TensorBoardLogger(
        save_dir='./logdir',
        name='baseline_result'
    ),
    callbacks=[
        pl.callbacks.LearningRateMonitor(logging_interval='step'),
        pl.callbacks.ModelCheckpoint(every_n_train_steps=5000, save_top_k=-1),
    ]
)

GPU available: False, used: False
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs


In [30]:
trainer.fit(model, train_dl)

Missing logger folder: ./logdir/baseline_result
2024-06-15 14:52:15.675735: I tensorflow/core/util/port.cc:110] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-06-15 14:52:17.443801: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F AVX512_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.

  | Name               | Type            | Params | Mode 
---------------------------------------------------------------
0 | _loss              | ContrastiveLoss | 0      | train
1 | _seq_encoder       | RnnSeqEncoder   | 448 K  | train
2 | _validation_metric | BatchRecallTopK | 0      | train
3 | _head              | Head

Epoch 0:   3%|▎         | 71/2312 [07:50<4:07:34,  0.15it/s, v_num=0, seq_len=86.80]

In [31]:
torch.save(model.state_dict(), './model.pt')

# Inference

Для каждого пользователя известно 12 таргетов, инференс происходит следующим образом:

Чтобы не происходило лика нужно для каждого клиента делать срез до текущего месяца:

Берутся все тразнакции за первый месяц, им соответствует 1-ый таргет из 12,
потом берутся транзакции за первый и второй месяц пользователя и им соотвествует 2-ой таргет и так далее.
То есть для данного пользователя, имеющего транзакции за год, мы можем получить 12 эмбеддингов, каждому из которых соответствует 1 таргет

In [None]:
class GetSplit(IterableProcessingDataset):
    def __init__(
        self,
        start_month,
        end_month,
        year=2022,
        col_id='client_id',
        col_time='event_time'
    ):
        super().__init__()
        self.start_month = start_month
        self.end_month = end_month
        self._year = year
        self._col_id = col_id
        self._col_time = col_time

    def __iter__(self):
        for rec in self._src:
            for month in range(self.start_month, self.end_month+1):
                features = rec[0] if type(rec) is tuple else rec
                features = features.copy()

                if month == 12:
                    month_event_time = datetime(self._year + 1, 1, 1).timestamp()
                else:
                    month_event_time = datetime(self._year, month + 1, 1).timestamp()

                year_event_time = datetime(self._year, 1, 1).timestamp()

                mask = features[self._col_time] < month_event_time

                for key, tensor in features.items():
                    if key.startswith('target'):
                        features[key] = tensor[month - 1].tolist()
                    elif key != self._col_id:
                        features[key] = tensor[mask]

                features[self._col_id] += '_month=' + str(month)

                yield features

def collate_feature_dict_with_target(batch, col_id='client_id', targets=False):
    batch_ids = []
    target_cols = []
    for sample in batch:
        batch_ids.append(sample[col_id])
        del sample[col_id]

        if targets:
            target_cols.append([sample[f'target_{i}'] for i in range(1, 5)])
            del sample['target_1']
            del sample['target_2']
            del sample['target_3']
            del sample['target_4']

    padded_batch = collate_feature_dict(batch)
    if targets:
        return padded_batch, batch_ids, target_cols
    return padded_batch, batch_ids


class InferenceModuleMultimodal(pl.LightningModule):
    def __init__(self, model, pandas_output=True, drop_seq_features=True, model_out_name='out'):
        super().__init__()

        self.model = model
        self.pandas_output = pandas_output
        self.drop_seq_features = drop_seq_features
        self.model_out_name = model_out_name

    def forward(self, x):
        x_len = len(x)
        if x_len == 3:
            x, batch_ids, target_cols = x
        else:
            x, batch_ids = x

        out = self.model(x)
        if x_len == 3:
            target_cols = torch.tensor(target_cols)
            x_out = {
                'client_id': batch_ids,
                'target_1': target_cols[:, 0],
                'target_2': target_cols[:, 1],
                'target_3': target_cols[:, 2],
                'target_4': target_cols[:, 3],
                self.model_out_name: out
            }
        else:
            x_out = {
                'client_id': batch_ids,
                self.model_out_name: out
            }
        torch.cuda.empty_cache()

        if self.pandas_output:
            return self.to_pandas(x_out)
        return x_out

    @staticmethod
    def to_pandas(x):
        expand_cols = []
        scalar_features = {}

        for k, v in x.items():
            if type(v) is torch.Tensor:
                v = v.cpu().numpy()

            if type(v) is list or len(v.shape) == 1:
                scalar_features[k] = v
            elif len(v.shape) == 2:
                expand_cols.append(k)
            else:
                scalar_features[k] = None

        dataframes = [pd.DataFrame(scalar_features)]
        for col in expand_cols:
            v = x[col].cpu().numpy()
            dataframes.append(pd.DataFrame(v, columns=[f'{col}_{i:04d}' for i in range(v.shape[1])]))

        return pd.concat(dataframes, axis=1)

In [None]:
%%time

train = MemoryMapDataset(
    data=processed_train.merge(processed_target.drop("event_time", axis=1), on="client_id", how="inner").to_dict("records"),
    i_filters=[
        ISeqLenLimit(max_seq_len=4096),
        FeatureFilter(keep_feature_names=['client_id', 'target_1', 'target_2', 'target_3', 'target_4']),
        GetSplit(start_month=1, end_month=12),
        ToTorch(),
    ]
)

test = MemoryMapDataset(
    data=processed_test.to_dict("records"),
    i_filters=[
        ISeqLenLimit(max_seq_len=4096),
        FeatureFilter(keep_feature_names=['client_id', 'target_1', 'target_2', 'target_3', 'target_4']),
        ToTorch(),
    ]
)

CPU times: user 17min 46s, sys: 2min 11s, total: 19min 58s
Wall time: 19min 46s


In [None]:
inference_train_dl = DataLoader(
        dataset=train,
        collate_fn=partial(collate_feature_dict_with_target, targets=True),
        shuffle=False,
        num_workers=0,
        batch_size=256,
    )

inference_test_dl = DataLoader(
        dataset=test,
        collate_fn=collate_feature_dict_with_target,
        shuffle=False,
        num_workers=0,
        batch_size=256,
    )

In [None]:
inf_module = InferenceModuleMultimodal(
        model=model,
        pandas_output=True,
        drop_seq_features=True,
        model_out_name='emb',
    )

In [None]:
trainer = pl.Trainer(gpus=[0], max_epochs=-1)

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs


In [None]:
inf_test_embeddings = pd.concat(
        trainer.predict(inf_module, inference_test_dl)
    )
inf_test_embeddings.to_parquet("test.parquet", index=False, engine="pyarrow", compression="snappy")

In [None]:
del inf_test_embeddings

In [None]:
inf_train_embeddings = pd.concat(
        trainer.predict(inf_module, inference_train_dl)
    )

inf_train_embeddings.to_parquet("train.parquet", index=False, engine="pyarrow", compression="snappy")

In [None]:
del inf_train_embeddings

Файл **sample_submission** составляется из **client_id** файла **test_target_b**. Так как не у всех пользователей может быть транзакционная история, мы для простоты заполняем их фичи нулями.

In [None]:
not_only_trx = pd.DataFrame({"client_id": test_target_b["client_id"].unique()}).merge(inf_test_embeddings, how="left").fillna(0)
not_only_trx

In [None]:
not_only_trx.to_parquet("not_only_trx.parquet", index=False, engine="pyarrow", compression="snappy")

# Downstream

Использование эмбеддингов для даунстрим задачи. Для всех таргетов одни и те же параметры бустинга для простоты

In [None]:
class Downstream:
    def __init__(
        self,
        train_path,
        test_path,
        params,
        result_path,
        col_id='client_id',
        targets=(
            'target_1',
            'target_2',
            'target_3',
            'target_4'
        )
    ):
        self.train_path = train_path
        self.test_path = test_path

        self.col_id = col_id
        self.all_targets = targets
        self.params = params
        self.result_path = result_path
        self.drop_feat = list(self.all_targets) + [self.col_id]

    def fit(self):

        train_embeddings = pd.read_parquet(self.train_path)
        X_train = train_embeddings.drop(columns=self.drop_feat)

        clfs = dict()
        for col_target in self.all_targets:
            clf = ltb.LGBMClassifier(**self.params)
            y_train = train_embeddings[col_target]
            clf.fit(X_train, y_train)
            print(f'Model fitted, target: {col_target}')
            clfs[col_target] = clf

        return clfs

    def get_scores(
        self,
        clfs
    ):
        scores = pd.DataFrame([])

        test_embeddings_curr = pd.read_parquet(self.test_path).drop_duplicates('client_id')
        X_test = test_embeddings_curr.drop(columns=[self.col_id])
        ids = test_embeddings_curr[self.col_id]
        scores[self.col_id] = ids

        for col_target in self.all_targets:
            clf = clfs[col_target]
            score = clf.predict_proba(X_test)[:, 1]
            scores[col_target] = score

        return scores

    def run(self):
        clfs = self.fit()
        scores = self.get_scores(clfs)

        scores.to_csv(self.result_path)

        return scores

In [None]:
params = {
    "n_estimators": 500,
      "boosting_type": "gbdt",
      "objective": "binary",
      "subsample": 0.5,
      "subsample_freq": 1,
      "learning_rate": 0.02,
      "feature_fraction": 0.75,
      "max_depth": 6,
      "lambda_l1": 1,
      "lambda_l2": 1,
      "min_data_in_leaf": 50,
      "random_state": 42,
      "n_jobs": 8,
}

dw = Downstream(
    train_path="train.parquet",
    test_path="not_only_trx.parquet",
    params=params,
    result_path='sample_submission.csv'
)

scores = dw.run()
scores