In [7]:
import transformers
from transformers import PreTrainedTokenizer
from transformers import BertTokenizer, BertForPreTraining, Trainer, TrainingArguments

import torch
from torch import nn
from torch.utils.data import Dataset

MAX_LENGTH = 128
HIDDEN_SIZE = 64
VOCAB_SIZE = 128

In [10]:
from typing import Dict, List, Optional
from transformers.tokenization_utils import AddedToken, PreTrainedTokenizer
"""
https://github.com/huggingface/transformers/blob/v4.23.1/src/transformers/models/canine/tokenization_canine.py
"""



PAD = 0
CLS = 1
SEP = 2
BOS = 3
MASK = 4
RESERVED = 5
UNK = 6
# the first keycode starts with 8

SPECIAL_CODEPOINTS: Dict[int, str] = {
    CLS: "[CLS]",
    SEP: "[SEP]",
    BOS: "[BOS]",
    MASK: "[MASK]",
    PAD: "[PAD]",
    RESERVED: "[RESERVED]",
    UNK: "[UNK]",
}

SPECIAL_CODEPOINTS_BY_NAME: Dict[str, int] = {name: codepoint for codepoint, name in SPECIAL_CODEPOINTS.items()}


class KeysTokenizer(PreTrainedTokenizer):
    r"""
    Construct a CANINE tokenizer (i.e. a character splitter). It turns text into a sequence of characters, and then
    converts each character into its Unicode code point.

    [`CanineTokenizer`] inherits from [`PreTrainedTokenizer`].

    Refer to superclass [`PreTrainedTokenizer`] for usage examples and documentation concerning parameters.

    Args:
        model_max_length (`int`, *optional*, defaults to 2048):
                The maximum sentence length the model accepts.
    """

    max_model_input_sizes = 128

    def __init__(
        self,
        add_prefix_space=False,
        model_max_length=2048,
        **kwargs
    ):
        bos_token = AddedToken("[BOS]", lstrip=False, rstrip=False)
        eos_token = AddedToken("[SEP]", lstrip=False, rstrip=False)
        sep_token = AddedToken("[SEP]", lstrip=False, rstrip=False)
        cls_token = AddedToken("[CLS]", lstrip=False, rstrip=False)
        pad_token = AddedToken("[PAD]", lstrip=False, rstrip=False)
        mask_token = AddedToken("[MASK]", lstrip=False, rstrip=False)
        unk_token = AddedToken("[UNK]", lstrip=False, rstrip=False)

        super().__init__(
            bos_token=bos_token,
            eos_token=eos_token,
            sep_token=sep_token,
            cls_token=cls_token,
            pad_token=pad_token,
            mask_token=mask_token,
            unk_token=unk_token,
            add_prefix_space=add_prefix_space,
            model_max_length=model_max_length,
            **kwargs,
        )

    @property
    def vocab_size(self) -> int:
        return 256
    
    def get_vocab(self) -> Dict[str, int]:
        vocab = {ch: key for key, ch in readable_keymap.items()}
        vocab.update(self.added_tokens_encoder)
        return vocab

    def _tokenize(self, text: str) -> List[str]:
        """Tokenize a string (i.e. perform character splitting)."""
        return list(text)

    def _convert_token_to_id(self, token: str) -> int:
        """Converts a token (i.e. a Unicode character) in an id (i.e. its integer Unicode code point value)."""
        try:
            return ord(token)
        except TypeError:
            raise ValueError(f"invalid token: '{token}'")

    def _convert_id_to_token(self, index: int) -> str:
        """
        Converts a id in a token (str). In case it's a special code point, convert to
        human-readable format.
        """
        if index in SPECIAL_CODEPOINTS:
            return SPECIAL_CODEPOINTS[index]
        if index in readable_keymap:
            return readable_keymap[index]
        raise ValueError(f"invalid id: '{index}'")

    def convert_tokens_to_string(self, tokens):
        return "".join(tokens)

    def build_inputs_with_special_tokens(
        self, token_ids_0: List[int], token_ids_1: Optional[List[int]] = None
    ) -> List[int]:
        """
        Build model inputs from a sequence or a pair of sequence for sequence classification tasks by concatenating and
        adding special tokens. A CANINE sequence has the following format:

        - single sequence: `[CLS] X [SEP]`
        - pair of sequences: `[CLS] A [SEP] B [SEP]`

        Args:
            token_ids_0 (`List[int]`):
                List of IDs to which the special tokens will be added.
            token_ids_1 (`List[int]`, *optional*):
                Optional second list of IDs for sequence pairs.

        Returns:
            `List[int]`: List of [input IDs](../glossary#input-ids) with the appropriate special tokens.
        """
        sep = [self.sep_token_id]
        cls = [self.cls_token_id]

        result = cls + token_ids_0 + sep
        if token_ids_1 is not None:
            result += token_ids_1 + sep
        return result

    def get_special_tokens_mask(
        self, token_ids_0: List[int], token_ids_1: Optional[List[int]] = None, already_has_special_tokens: bool = False
    ) -> List[int]:
        """
        Retrieve sequence ids from a token list that has no special tokens added. This method is called when adding
        special tokens using the tokenizer `prepare_for_model` method.

        Args:
            token_ids_0 (`List[int]`):
                List of IDs.
            token_ids_1 (`List[int]`, *optional*):
                Optional second list of IDs for sequence pairs.
            already_has_special_tokens (`bool`, *optional*, defaults to `False`):
                Whether or not the token list is already formatted with special tokens for the model.

        Returns:
            `List[int]`: A list of integers in the range [0, 1]: 1 for a special token, 0 for a sequence token.
        """
        if already_has_special_tokens:
            return super().get_special_tokens_mask(
                token_ids_0=token_ids_0, token_ids_1=token_ids_1, already_has_special_tokens=True
            )

        result = [1] + ([0] * len(token_ids_0)) + [1]
        if token_ids_1 is not None:
            result += ([0] * len(token_ids_1)) + [1]
        return result

    def create_token_type_ids_from_sequences(
        self, token_ids_0: List[int], token_ids_1: Optional[List[int]] = None
    ) -> List[int]:
        """
        Create a mask from the two sequences passed to be used in a sequence-pair classification task. A CANINE
        sequence pair mask has the following format:

        ```
        0 0 0 0 0 0 0 0 0 0 0 1 1 1 1 1 1 1 1 1
        | first sequence    | second sequence |
        ```

        If `token_ids_1` is `None`, this method only returns the first portion of the mask (0s).

        Args:
            token_ids_0 (`List[int]`):
                List of IDs.
            token_ids_1 (`List[int]`, *optional*):
                Optional second list of IDs for sequence pairs.

        Returns:
            `List[int]`: List of [token type IDs](../glossary#token-type-ids) according to the given sequence(s).
        """
        sep = [self.sep_token_id]
        cls = [self.cls_token_id]

        result = len(cls + token_ids_0 + sep) * [0]
        if token_ids_1 is not None:
            result += len(token_ids_1 + sep) * [1]
        return result

    # CanineTokenizer has no vocab file
    def save_vocabulary(self, save_directory: str, filename_prefix: Optional[str] = None):
        return ()

In [12]:
import os
import numpy as np
from typing import Dict

class KvcDataset(Dataset):
    @staticmethod
    def preprocess(sample):
        data = sample
        data[:, :2] = (data[:, :2] - data[0][0])
        duration = np.abs(data[:, 1] - data[:, 0])  # fix duration of keypress. ~349 items are broken
        diff_press = np.diff(data[:, 0], prepend=data[0, 0])
        data = np.column_stack((diff_press, duration, data[:, 2]))
        return data

    def __init__(self, scenario: str, split: str, tokenizer: KeysTokenizer, block_size: int):
        
        if split == 'train':
            filename = f'{scenario}/{scenario}_dev_set.npy'
        else:
            assert split == 'test'
            filename = f'{scenario}/{scenario}_test_sessions.npy'
        assert os.path.isfile(filename), f"Input file path {filename} not found"

        raw_data = np.load(filename, allow_pickle=True).item()
        train_data = []
        for user, sessions in raw_data.items():
            for _, session in sessions.items():
                train_data.append({
                    'keys': KvcDataset.preprocess(session),
                    'label': user
            })
            break  # TODO remove me!

        batch_encoding = tokenizer(train_data, add_special_tokens=True, truncation=True, max_length=block_size)
        self.examples = batch_encoding["input_ids"]
        self.examples = [{"input_ids": torch.tensor(e, dtype=torch.long)} for e in self.examples]

    def __len__(self):
        return len(self.examples)

    def __getitem__(self, i) -> Dict[str, torch.tensor]:
        return self.examples[i]
    
train_dataset = KvcDataset('desktop', 'train', KeysTokenizer(), MAX_LENGTH)

ValueError: text input must of type `str` (single example), `List[str]` (batch or single pretokenized example) or `List[List[str]]` (batch of pretokenized examples).

In [3]:
import torch
from transformers import BertTokenizer, BertForMaskedLM, LineByLineTextDataset, DataCollatorForLanguageModeling, Trainer, TrainingArguments


# Load the tokenizer and model
import string
tokenizer = transformers.CanineTokenizer(model_max_length=128)
model = transformers.MobileBertForMaskedLM(transformers.MobileBertConfig())

# Load the dataset
dataset = LineByLineTextDataset(
    tokenizer=tokenizer,
    file_path="/media/pupa/DataStorage/datasets/tinyshakespeare.txt",
    block_size=128,
)

class MyDataCollatorForLanguageModeling(DataCollatorForLanguageModeling):
    # passthrough constructor
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def __call__(self, features, return_tensors=None):
        res = super().__call__(features, return_tensors)
        return res

# Define the data collator
data_collator = MyDataCollatorForLanguageModeling(
    tokenizer=tokenizer,
    mlm=True,
    mlm_probability=0.15
)

# Define the training arguments
training_args = TrainingArguments(
    output_dir="./bert_finetuned",
    overwrite_output_dir=True,
    num_train_epochs=10,
    per_device_train_batch_size=256,
    save_steps=10_000,
    save_total_limit=2,
)

# Define the Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    data_collator=data_collator,
    train_dataset=dataset,
)

# Train the model
trainer.train()


RuntimeError: CUDA error: device-side assert triggered
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1.
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.


In [3]:
config = BertConfig(
    vocab_size=VOCAB_SIZE,
    hidden_size=HIDDEN_SIZE,
    num_hidden_layers=2,
    num_attention_heads=2,
    intermediate_size=128,
    hidden_act="gelu",
    hidden_dropout_prob=0.1,
    attention_probs_dropout_prob=0.1,
    max_position_embeddings=MAX_LENGTH,
    type_vocab_size=2,
    initializer_range=0.02,
    layer_norm_eps=1e-12,
    pad_token_id=0,
    position_embedding_type="absolute",
    use_cache=True,
    classifier_dropout=None,
    is_decoder=False,
    add_cross_attention=False,
)

In [4]:
model = BertForPreTraining(config)

In [5]:
char_embeddings = nn.Embedding(config.vocab_size, config.hidden_size, padding_idx=config.pad_token_id)

In [6]:
inputs = tokenizer("Hello, my dog is cute")
inputs

{'input_ids': [101, 7592, 1010, 2026, 3899, 2003, 10140, 102], 'token_type_ids': [0, 0, 0, 0, 0, 0, 0, 0], 'attention_mask': [1, 1, 1, 1, 1, 1, 1, 1]}

In [7]:
tokenizer.decode(inputs["input_ids"])

'[CLS] hello, my dog is cute [SEP]'

In [9]:
model.forward(
    inputs_embeds=char_embeddings(inputs["input_ids"]),
)

TypeError: embedding(): argument 'indices' (position 2) must be Tensor, not list