In [3]:
%load_ext autoreload
%autoreload 2
from recsys.model import MultitaskRecommender
from recsys.dataset import NewsDataModule

if "datamodule" not in locals():
    datamodule = NewsDataModule("../data", batch_size=32)
    datamodule.prepare_data()
    datamodule.setup()

model = MultitaskRecommender(768, n_categories=datamodule.train_dataset.max_categories)


The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [4]:
datamodule.setup('test')

In [34]:
import polars as pl

articles = pl.read_parquet("../data/demo/articles.parquet")
articles.head()

article_id,title,subtitle,last_modified_time,premium,body,published_time,image_ids,article_type,url,ner_clusters,entity_groups,topics,category,subcategory,category_str,total_inviews,total_pageviews,total_read_time,sentiment_score,sentiment_label
i32,str,str,datetime[μs],bool,str,datetime[μs],list[i64],str,str,list[str],list[str],list[str],i16,list[i16],str,i32,i32,f32,f32,str
3037230,"""Ishockey-spill…","""ISHOCKEY: Isho…",2023-06-29 06:20:57,False,"""Ambitionerne o…",2003-08-28 08:55:00,,"""article_defaul…","""https://ekstra…",[],[],"[""Kriminalitet"", ""Kendt"", … ""Mindre ulykke""]",142,"[327, 334]","""sport""",,,,0.9752,"""Negative"""
3044020,"""Prins Harry tv…","""Hoffet tvang P…",2023-06-29 06:21:16,False,"""Den britiske t…",2005-06-29 08:47:00,"[3097307, 3097197, 3104927]","""article_defaul…","""https://ekstra…","[""Harry"", ""James Hewitt""]","[""PER"", ""PER""]","[""Kriminalitet"", ""Kendt"", … ""Personfarlig kriminalitet""]",414,[432],"""underholdning""",,,,0.7084,"""Negative"""
3057622,"""Rådden kørsel …","""Kan ikke straf…",2023-06-29 06:21:24,False,"""Slingrende spr…",2005-10-10 07:20:00,[3047102],"""article_defaul…","""https://ekstra…",[],[],"[""Kriminalitet"", ""Transportmiddel"", ""Bil""]",118,[133],"""nyheder""",,,,0.9236,"""Negative"""
3073151,"""Mærsk-arvinger…","""FANGET I FLODB…",2023-06-29 06:21:38,False,"""To oldebørn af…",2005-01-04 06:59:00,"[3067474, 3067478, 3153705]","""article_defaul…","""https://ekstra…",[],[],"[""Erhverv"", ""Privat virksomhed"", … ""Rejse""]",118,[133],"""nyheder""",,,,0.9945,"""Negative"""
3193383,"""Skød svigersøn…","""44-årig kvinde…",2023-06-29 06:22:57,False,"""En 44-årig mor…",2003-09-15 15:30:00,,"""article_defaul…","""https://ekstra…",[],[],"[""Kriminalitet"", ""Personfarlig kriminalitet""]",140,[],"""krimi""",,,,0.9966,"""Negative"""


In [12]:
from random import shuffle
from typing import Any, Literal
from recsys.dataset import NewsDataset, batch_random_choice_with_reset, sort_and_select
from recsys.utils.classes import PolarsDataFrameWrapper
from recsys.utils.download import CHALLENGE_DATASET, download_file, unzip_file
from transformers import AutoTokenizer
import numpy as np
import polars as pl
import torch
from datasets import Dataset

from ebrec.utils._behaviors import create_binary_labels_column, truncate_history
from ebrec.utils._constants import (
    DEFAULT_ARTICLE_ID_COL,
    DEFAULT_BODY_COL,
    DEFAULT_CATEGORY_STR_COL,
    DEFAULT_CATEGORY_COL,
    DEFAULT_CLICKED_ARTICLES_COL,
    DEFAULT_HISTORY_ARTICLE_ID_COL,
    DEFAULT_IMPRESSION_ID_COL,
    DEFAULT_INVIEW_ARTICLES_COL,
    DEFAULT_LABELS_COL,
    DEFAULT_SUBTITLE_COL,
    DEFAULT_TITLE_COL,
    DEFAULT_TOPICS_COL,
    DEFAULT_USER_COL,
)
from ebrec.utils._polars import slice_join_dataframes
from ebrec.utils._python import (
    create_lookup_dict,
    create_lookup_objects,
    generate_unique_name,
)
from torch.utils.data import Dataset as TorchDataset
from pytorch_lightning import LightningDataModule
import json

COLUMNS = [
    DEFAULT_USER_COL,
    DEFAULT_IMPRESSION_ID_COL,
    DEFAULT_HISTORY_ARTICLE_ID_COL,
    DEFAULT_INVIEW_ARTICLES_COL,
    DEFAULT_CLICKED_ARTICLES_COL,
]


DEFAULT_TOKENS_COL = "tokens"
N_SAMPLES_COL = "n_samples"
HISTORY_TITLES_COL = "history_titles"
INVIEW_TITLES_COL = "inview_titles"


class NewsDatasetV2(TorchDataset):
    behaviors: pl.DataFrame
    history: pl.DataFrame
    articles: pl.DataFrame

    def __init__(
        self,
        tokenizer,
        behaviors: pl.DataFrame,
        history: pl.DataFrame,
        articles: pl.DataFrame,
        history_size: int = 30,
        max_labels: int = 5,
        padding_value: int = 0,
        max_length=128,
        test_mode=False,
    ):
        self.behaviors = behaviors
        self.history = history
        self.articles = articles
        self.history_size = history_size
        self.padding_value = padding_value

        self.tokenizer = tokenizer
        self.max_length = max_length
        self.max_labels = max_labels
        self.test_mode = test_mode

        # NOTE: Keep an eye on this if memory issues arise
        self.articles = self.articles.select(
            [
                DEFAULT_ARTICLE_ID_COL,  # article_id
                DEFAULT_TITLE_COL,  # title
                DEFAULT_BODY_COL,  # body
                DEFAULT_SUBTITLE_COL,  # subtitle
                DEFAULT_TOPICS_COL,  # topics
                DEFAULT_CATEGORY_STR_COL,  # category_str
            ]
        ).collect()

        self.history = self._process_history(self.history, history_size, padding_value)
        # Prepare the actual training data
        self.behaviors = self.behaviors.collect()
        self._prepare_articles()

        if test_mode:
            self._prepare_test_data()
        else:
            self._prepare_training_data()

    def save_preprocessed(self, path: str):
        """Save the preprocessed data to the given path directory."""
        data = {
            "history_size": self.history_size,
            "padding_value": self.padding_value,
            "max_labels": self.max_labels,
            "max_categories": self.max_categories,
            "test_mode": self.test_mode,
        }

        with open(path + "/parameters.json", "w") as f:
            json.dump(data, f)
        self.lookup_matrix.save_to_disk(path + "/lookup_matrix")
        self.behaviors.write_parquet(path + "/behaviors.parquet")
        self.history.write_parquet(path + "/history.parquet")
        self.articles.write_parquet(path + "/articles.parquet")
        self.data.dataframe.write_parquet(path + "/data.parquet")

    @staticmethod
    def from_preprocessed(path: str):
        """Load the preprocessed data from the given path directory."""
        dataset = NewsDataset.__new__(NewsDataset)
        with open(path + "/parameters.json", "r") as f:
            data = json.load(f)
            dataset.history_size = data["history_size"]
            dataset.padding_value = data["padding_value"]
            dataset.max_labels = data["max_labels"]
            dataset.max_categories = data["max_categories"]
            dataset.test_mode = data["test_mode"]

        dataset.lookup_matrix = Dataset.load_from_disk(path + "/lookup_matrix")
        dataset.behaviors = pl.read_parquet(path + "/behaviors.parquet")
        dataset.history = pl.read_parquet(path + "/history.parquet")
        dataset.articles = pl.read_parquet(path + "/articles.parquet")
        dataset.data = PolarsDataFrameWrapper(pl.read_parquet(path + "/data.parquet"))

        return dataset

    @classmethod
    def _process_history(
        cls, history: pl.LazyFrame, history_size: int = 30, padding_value: int = 0
    ) -> pl.DataFrame:
        return (
            history.select(
                [
                    DEFAULT_USER_COL,  # user_id
                    DEFAULT_HISTORY_ARTICLE_ID_COL,  # article_id_fixed
                ]
            )
            .pipe(
                truncate_history,
                column=DEFAULT_HISTORY_ARTICLE_ID_COL,
                history_size=history_size,
                padding_value=padding_value,
                enable_warning=False,
            )
            .collect()
        )


    def _prepare_articles(self):
        self.articles = (
            self.articles.lazy()
            .with_columns(
                pl.col(DEFAULT_CATEGORY_STR_COL)
                .cast(pl.Categorical)
                .to_physical()
                .alias(DEFAULT_CATEGORY_COL)
            )
            .collect()
        )
        
        # Tokenize
        tokens = self.tokenizer([""] + self.articles[DEFAULT_TITLE_COL].to_list(), truncation=True, padding=True)
        
        # Create the lookup matrix        
        self.lookup_matrix = Dataset.from_dict(tokens).add_column(DEFAULT_CATEGORY_COL, [0] + self.articles[DEFAULT_CATEGORY_COL].cast(pl.UInt8).to_list())
    
        self.max_categories = max(self.lookup_matrix[DEFAULT_CATEGORY_COL]) + 1
        self.article_id_to_idx = {k: i for i, k in enumerate([0] + self.articles[DEFAULT_ARTICLE_ID_COL].to_list())}

    def _prepare_test_data(self):
        self.data = (
            slice_join_dataframes(
                df1=self.behaviors,
                df2=self.history,
                on=DEFAULT_USER_COL,
                how="left",
            ).select(COLUMNS[:-1])  # do not count clicked articles as these do not exist in test
        )
        
        self.data = self.data.with_columns(
            pl.col(DEFAULT_HISTORY_ARTICLE_ID_COL).list.eval(pl.element().replace(self.article_id_to_idx, default=0)),
            pl.col(DEFAULT_INVIEW_ARTICLES_COL).list.eval(pl.element().replace(self.article_id_to_idx, default=0))
        )

        self.data = PolarsDataFrameWrapper(self.data)



    def _prepare_training_data(self):

        # Map article_id to index

        self.data = (
            slice_join_dataframes(
                df1=self.behaviors,
                df2=self.history,
                on=DEFAULT_USER_COL,
                how="left",
            ).select(COLUMNS)
            .pipe(create_binary_labels_column, label_col=DEFAULT_LABELS_COL, shuffle=False)
            .pipe(sort_and_select, n=self.max_labels)
            .with_columns(pl.col(DEFAULT_LABELS_COL).list.len().alias(N_SAMPLES_COL))
        )
        
        self.data = self.data.with_columns(
            pl.col(DEFAULT_HISTORY_ARTICLE_ID_COL).list.eval(pl.element().replace(self.article_id_to_idx, default=0)),
            pl.col(DEFAULT_INVIEW_ARTICLES_COL).list.eval(pl.element().replace(self.article_id_to_idx, default=0))
        )
        
        self.data = PolarsDataFrameWrapper(self.data)


    def __len__(self):
        return len(self.data.dataframe)

    def __getitem__(self, index):
        """
        Get the samples for the given index.

        Args:
            index (int): An integer or a slice index.

        Returns:
            history: torch.Tensor: The history input features.
            candidate: torch.Tensor: The candidate input features.
            y: torch.Tensor: The target labels.
        """

        batch = self.data[index]
        
        # Construct the history vectors
        _hist = list(self.lookup_matrix[__hist] for __hist in batch[DEFAULT_HISTORY_ARTICLE_ID_COL].to_list())
        histories = {key: torch.tensor([val[key] for val in _hist]) for key in self.lookup_matrix.features.keys()}
        

        # Early return for test mode
        # ========================
        # Construct the candidate vectors
        _cand = list(self.lookup_matrix[__cand] for __cand in batch[DEFAULT_INVIEW_ARTICLES_COL].to_list())
        if self.test_mode:
            # Special treatment, as they are not guaranteed to be of the same length
            candidates = {key: [torch.tensor(val[key]) for val in _cand] for key in self.lookup_matrix.features.keys()}
            return histories, candidates
        # ========================

        labels = batch[DEFAULT_LABELS_COL].to_list()
        candidates = {key: torch.tensor([val[key] for val in _cand]) for key in self.lookup_matrix.features.keys()}
        y = torch.tensor(labels).float().squeeze()
        # # ========================
        return histories, candidates, y


from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("bert-base-multilingual-uncased")
# tokens = tokenizer([""] + articles["title"].to_list(), truncation=True, padding=True)



In [8]:
# %%prun
from recsys.dataset import load_data

behaviors, history, articles = load_data("../data/small", "train")
dataset = NewsDatasetV2(tokenizer, behaviors, history, articles, test_mode=True)

In [14]:
NewsDatasetV2.from_preprocessed("../data/small")

<recsys.dataset.NewsDataset at 0x1803c6e90>

In [84]:
dataset.data = (
    slice_join_dataframes(
        df1=dataset.behaviors,
        df2=dataset.history,
        on=DEFAULT_USER_COL,
        how="left",
    ).select(COLUMNS)
    .with_columns(pl.col(DEFAULT_INVIEW_ARTICLES_COL).list.eval(pl.element().eq(DEFAULT_CLICKED_ARTICLES_COL)).cast(pl.UInt8)).alias(DEFAULT_LABELS_COL)
    .with_columns(pl.col(DEFAULT_LABELS_COL).list.len().alias(N_SAMPLES_COL))
)

AttributeError: 'Dataset' object has no attribute 'behaviors'

In [50]:
scores = model.predictions[0]
labels = model.labels[0]
from torch.nn import functional as F

F.binary_cross_entropy_with_logits(model.predictions[0], model.labels[0].float())

tensor(0.7424)

In [95]:
print(scores.shape) # (batch_size, candidates)
print(labels.shape) # (batch_size, candidates)

# loss = F.cross_entropy(scores, labels)
# print(loss)

torch.Size([5, 5])
torch.Size([5, 5])


RuntimeError: Expected floating point type for target with class probabilities, got Long

In [56]:
model.labels[0].T

tensor([[1, 0, 0, 0, 0],
        [0, 0, 1, 0, 1],
        [0, 1, 0, 0, 0],
        [0, 0, 0, 1, 0],
        [0, 0, 0, 0, 0]])