In [16]:
from transformers import RobertaTokenizer, DataCollatorForLanguageModeling

tokenizer = RobertaTokenizer.from_pretrained("roberta-base")

# Создаём collator с динамическим маскированием
data_collator = DataCollatorForLanguageModeling(
    tokenizer=tokenizer,
    mlm=True,
    mlm_probability=0.15
)

# Токенизируем ТЕКСТЫ (отдельно, без return_tensors)
texts = ["I love machine learning", "Transformers are powerful"]
tokenized = [tokenizer(t, return_tensors=None) for t in texts]

# Collator применяет динамическое маскирование
batch = data_collator(tokenized)

print("input_ids:\n", batch["input_ids"])
print("labels:\n", batch["labels"])



input_ids:
 tensor([[    0, 50264,   657, 50264,  2239,     2],
        [    0, 44820,   268,    32, 50264,     2]])
labels:
 tensor([[-100,  100, -100, 3563, -100, -100],
        [-100, -100, -100, -100, 2247, -100]])


In [2]:
# robeta_architecture.py
import math
from dataclasses import dataclass
from typing import Optional, Tuple

import torch
import torch.nn as nn
import torch.nn.functional as F


@dataclass
class RobertaConfig:
    vocab_size: int = 50265
    hidden_size: int = 768
    num_hidden_layers: int = 12
    num_attention_heads: int = 12
    intermediate_size: int = 3072
    hidden_act: str = "gelu"
    hidden_dropout_prob: float = 0.1
    attention_probs_dropout_prob: float = 0.1
    max_position_embeddings: int = 514  # RoBERTa often uses 514 (including <s>, </s>)
    type_vocab_size: int = 1  # RoBERTa не использует сегментные эмбеддинги, но оставим параметр
    layer_norm_eps: float = 1e-12
    initializer_range: float = 0.02
    pad_token_id: int = 1


def get_activation(name: str):
    if name in ("gelu", "gelu_new"):
        return F.gelu
    if name == "relu":
        return F.relu
    if name == "swish":
        return lambda x: x * torch.sigmoid(x)
    raise ValueError(f"Unknown activation: {name}")


class RobertaEmbeddings(nn.Module):
    """
    Token embeddings + position embeddings.
    RoBERTa: нет сегментных эмбеддингов (type_embeddings обычно отключены).
    """
    def __init__(self, config: RobertaConfig):
        super().__init__()
        self.word_embeddings = nn.Embedding(config.vocab_size, config.hidden_size, padding_idx=config.pad_token_id)
        self.position_embeddings = nn.Embedding(config.max_position_embeddings, config.hidden_size)
        # RoBERTa doesn't use token_type embeddings, but keep for API-compatibility
        self.token_type_embeddings = nn.Embedding(config.type_vocab_size, config.hidden_size)

        self.layernorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)

        # Initialize positions 0..max_position-1 as usual
        self._reset_parameters(config)

    def _reset_parameters(self, config: RobertaConfig):
        nn.init.normal_(self.word_embeddings.weight, mean=0.0, std=config.initializer_range)
        nn.init.normal_(self.position_embeddings.weight, mean=0.0, std=config.initializer_range)
        if config.type_vocab_size > 0:
            nn.init.normal_(self.token_type_embeddings.weight, mean=0.0, std=config.initializer_range)

    def forward(self, input_ids: torch.LongTensor, position_ids: Optional[torch.LongTensor] = None, token_type_ids: Optional[torch.LongTensor] = None):
        seq_length = input_ids.size(1)
        if position_ids is None:
            # positions start at 0
            device = input_ids.device
            position_ids = torch.arange(seq_length, dtype=torch.long, device=device).unsqueeze(0).expand_as(input_ids)

        if token_type_ids is None:
            token_type_ids = torch.zeros_like(input_ids)

        words = self.word_embeddings(input_ids)
        positions = self.position_embeddings(position_ids)
        types = self.token_type_embeddings(token_type_ids) if self.token_type_embeddings.num_embeddings > 0 else 0

        embeddings = words + positions + types
        embeddings = self.layernorm(embeddings)
        embeddings = self.dropout(embeddings)
        return embeddings


class RobertaSelfAttention(nn.Module):
    def __init__(self, config: RobertaConfig):
        super().__init__()
        if config.hidden_size % config.num_attention_heads != 0:
            raise ValueError("hidden_size must be divisible by num_attention_heads")
        self.num_attention_heads = config.num_attention_heads
        self.attention_head_size = int(config.hidden_size / config.num_attention_heads)
        self.all_head_size = self.num_attention_heads * self.attention_head_size

        self.query = nn.Linear(config.hidden_size, self.all_head_size)
        self.key = nn.Linear(config.hidden_size, self.all_head_size)
        self.value = nn.Linear(config.hidden_size, self.all_head_size)

        self.dropout = nn.Dropout(config.attention_probs_dropout_prob)

        self._reset_parameters(config)

    def _reset_parameters(self, config: RobertaConfig):
        nn.init.normal_(self.query.weight, mean=0.0, std=config.initializer_range)
        nn.init.normal_(self.key.weight, mean=0.0, std=config.initializer_range)
        nn.init.normal_(self.value.weight, mean=0.0, std=config.initializer_range)
        if self.query.bias is not None:
            nn.init.zeros_(self.query.bias)
            nn.init.zeros_(self.key.bias)
            nn.init.zeros_(self.value.bias)

    def transpose_for_scores(self, x: torch.Tensor):
        # x: [batch, seq_len, all_head_size] -> [batch, num_heads, seq_len, head_size]
        new_x_shape = x.size()[:-1] + (self.num_attention_heads, self.attention_head_size)
        x = x.view(*new_x_shape)
        return x.permute(0, 2, 1, 3)

    def forward(self, hidden_states: torch.Tensor, attention_mask: Optional[torch.Tensor] = None, head_mask: Optional[torch.Tensor] = None):
        # hidden_states: [batch, seq_len, hidden_size]
        query_layer = self.transpose_for_scores(self.query(hidden_states))
        key_layer = self.transpose_for_scores(self.key(hidden_states))
        value_layer = self.transpose_for_scores(self.value(hidden_states))

        # Attention scores: [batch, heads, seq_len, seq_len]
        attention_scores = torch.matmul(query_layer, key_layer.transpose(-1, -2))
        attention_scores = attention_scores / math.sqrt(self.attention_head_size)

        if attention_mask is not None:
            # attention_mask should be broadcastable to [batch, heads, seq_len, seq_len]
            attention_scores = attention_scores + attention_mask

        attention_probs = nn.Softmax(dim=-1)(attention_scores)
        attention_probs = self.dropout(attention_probs)

        # apply head_mask if provided (usually None)
        if head_mask is not None:
            attention_probs = attention_probs * head_mask

        context_layer = torch.matmul(attention_probs, value_layer)  # [batch, heads, seq_len, head_size]
        # back to [batch, seq_len, all_head_size]
        context_layer = context_layer.permute(0, 2, 1, 3).contiguous()
        new_context_shape = context_layer.size()[:-2] + (self.all_head_size,)
        context_layer = context_layer.view(*new_context_shape)
        return context_layer, attention_probs


class RobertaSelfOutput(nn.Module):
    def __init__(self, config: RobertaConfig):
        super().__init__()
        self.dense = nn.Linear(config.hidden_size, config.hidden_size)
        self.layernorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        nn.init.normal_(self.dense.weight, mean=0.0, std=config.initializer_range)
        if self.dense.bias is not None:
            nn.init.zeros_(self.dense.bias)

    def forward(self, hidden_states: torch.Tensor, input_tensor: torch.Tensor):
        hidden_states = self.dense(hidden_states)
        hidden_states = self.dropout(hidden_states)
        hidden_states = self.layernorm(hidden_states + input_tensor)
        return hidden_states


class RobertaAttention(nn.Module):
    def __init__(self, config: RobertaConfig):
        super().__init__()
        self.self = RobertaSelfAttention(config)
        self.output = RobertaSelfOutput(config)

    def forward(self, hidden_states: torch.Tensor, attention_mask: Optional[torch.Tensor] = None, head_mask: Optional[torch.Tensor] = None):
        self_outputs = self.self(hidden_states, attention_mask, head_mask)
        attention_output = self.output(self_outputs[0], hidden_states)
        return attention_output, self_outputs[1]  # (output, attention_probs)


class RobertaIntermediate(nn.Module):
    def __init__(self, config: RobertaConfig):
        super().__init__()
        self.dense = nn.Linear(config.hidden_size, config.intermediate_size)
        self.intermediate_act_fn = get_activation(config.hidden_act)
        nn.init.normal_(self.dense.weight, mean=0.0, std=config.initializer_range)
        if self.dense.bias is not None:
            nn.init.zeros_(self.dense.bias)

    def forward(self, hidden_states: torch.Tensor):
        hidden_states = self.dense(hidden_states)
        hidden_states = self.intermediate_act_fn(hidden_states)
        return hidden_states


class RobertaOutput(nn.Module):
    def __init__(self, config: RobertaConfig):
        super().__init__()
        self.dense = nn.Linear(config.intermediate_size, config.hidden_size)
        self.layernorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        nn.init.normal_(self.dense.weight, mean=0.0, std=config.initializer_range)
        if self.dense.bias is not None:
            nn.init.zeros_(self.dense.bias)

    def forward(self, hidden_states: torch.Tensor, input_tensor: torch.Tensor):
        hidden_states = self.dense(hidden_states)
        hidden_states = self.dropout(hidden_states)
        hidden_states = self.layernorm(hidden_states + input_tensor)
        return hidden_states


class RobertaLayer(nn.Module):
    def __init__(self, config: RobertaConfig):
        super().__init__()
        self.attention = RobertaAttention(config)
        self.intermediate = RobertaIntermediate(config)
        self.output = RobertaOutput(config)

    def forward(self, hidden_states: torch.Tensor, attention_mask: Optional[torch.Tensor] = None, head_mask: Optional[torch.Tensor] = None):
        attention_output, attn_probs = self.attention(hidden_states, attention_mask, head_mask)
        intermediate_output = self.intermediate(attention_output)
        layer_output = self.output(intermediate_output, attention_output)
        return layer_output, attn_probs


class RobertaEncoder(nn.Module):
    def __init__(self, config: RobertaConfig):
        super().__init__()
        self.layer = nn.ModuleList([RobertaLayer(config) for _ in range(config.num_hidden_layers)])

    def forward(self, hidden_states: torch.Tensor, attention_mask: Optional[torch.Tensor] = None, head_mask: Optional[torch.Tensor] = None, output_attentions: bool = False, output_hidden_states: bool = False):
        all_hidden_states = () if output_hidden_states else None
        all_attentions = () if output_attentions else None

        for i, layer_module in enumerate(self.layer):
            if output_hidden_states:
                all_hidden_states = all_hidden_states + (hidden_states,)

            layer_outputs = layer_module(hidden_states, attention_mask, head_mask[i] if head_mask is not None else None)
            hidden_states = layer_outputs[0]

            if output_attentions:
                all_attentions = all_attentions + (layer_outputs[1],)

        # final
        if output_hidden_states:
            all_hidden_states = all_hidden_states + (hidden_states,)

        outputs = (hidden_states, )
        if output_hidden_states:
            outputs = outputs + (all_hidden_states, )
        if output_attentions:
            outputs = outputs + (all_attentions, )
        return outputs  # (last_hidden_state, optional all_hidden_states, optional all_attentions)


class RobertaModel(nn.Module):
    def __init__(self, config: RobertaConfig):
        super().__init__()
        self.config = config
        self.embeddings = RobertaEmbeddings(config)
        self.encoder = RobertaEncoder(config)
        # final layer_norm as in some implementations (optional)
        self.pooler = nn.Linear(config.hidden_size, config.hidden_size)
        self.pooler_activation = nn.Tanh()
        self._init_weights(config)

    def _init_weights(self, config: RobertaConfig):
        nn.init.normal_(self.pooler.weight, mean=0.0, std=config.initializer_range)
        if self.pooler.bias is not None:
            nn.init.zeros_(self.pooler.bias)

    def get_input_embeddings(self):
        return self.embeddings.word_embeddings

    def set_input_embeddings(self, new_embeddings):
        self.embeddings.word_embeddings = new_embeddings

    def forward(self, input_ids: torch.LongTensor, attention_mask: Optional[torch.Tensor] = None, token_type_ids: Optional[torch.Tensor] = None, position_ids: Optional[torch.Tensor] = None, output_attentions: bool = False, output_hidden_states: bool = False):
        # attention_mask: [batch, seq_len] with 1 for tokens to attend, 0 for pad
        if attention_mask is None:
            attention_mask = torch.ones_like(input_ids)

        # Convert attention mask to the shape [batch, 1, 1, seq_len] with 0.0 for keep and -10000.0 for mask (additive)
        extended_attention_mask = attention_mask.unsqueeze(1).unsqueeze(2)  # [B,1,1,S]
        extended_attention_mask = extended_attention_mask.to(dtype=next(self.parameters()).dtype)  # fp16 compatibility
        extended_attention_mask = (1.0 - extended_attention_mask) * -10000.0

        embedding_output = self.embeddings(input_ids, position_ids=position_ids, token_type_ids=token_type_ids)
        encoder_outputs = self.encoder(embedding_output, extended_attention_mask, output_attentions=output_attentions, output_hidden_states=output_hidden_states)
        sequence_output = encoder_outputs[0]
        pooled_output = self.pooler(sequence_output[:, 0])  # use first token (CLS)
        pooled_output = self.pooler_activation(pooled_output)

        outputs = (sequence_output, pooled_output) + encoder_outputs[1:]
        return outputs  # (sequence_output, pooled_output, optional hidden_states, optional attentions)


class RobertaLMHead(nn.Module):
    """ MLM head: maps hidden states to vocabulary logits (tied to input embeddings optionally) """
    def __init__(self, config: RobertaConfig, embedding_weights: Optional[nn.Embedding] = None):
        super().__init__()
        self.dense = nn.Linear(config.hidden_size, config.hidden_size)
        self.activation = get_activation(config.hidden_act)
        self.layer_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
        self.decoder = nn.Linear(config.hidden_size, config.vocab_size, bias=False)
        self.bias = nn.Parameter(torch.zeros(config.vocab_size))
        if embedding_weights is not None:
            # weight tying
            self.decoder.weight = embedding_weights.weight

        nn.init.normal_(self.dense.weight, mean=0.0, std=config.initializer_range)
        if self.dense.bias is not None:
            nn.init.zeros_(self.dense.bias)
        nn.init.zeros_(self.bias)

    def forward(self, hidden_states: torch.Tensor):
        x = self.dense(hidden_states)
        x = self.activation(x)
        x = self.layer_norm(x)
        x = self.decoder(x) + self.bias
        return x


class RobertaForMaskedLM(nn.Module):
    def __init__(self, config: RobertaConfig):
        super().__init__()
        self.roberta = RobertaModel(config)
        self.lm_head = RobertaLMHead(config, embedding_weights=self.roberta.get_input_embeddings())
        # tie weights already done in LMHead init

    def forward(self, input_ids: torch.LongTensor, attention_mask: Optional[torch.Tensor] = None, labels: Optional[torch.LongTensor] = None):
        outputs = self.roberta(input_ids, attention_mask=attention_mask)
        sequence_output = outputs[0]
        prediction_scores = self.lm_head(sequence_output)

        loss = None
        if labels is not None:
            # labels: [batch, seq_len], ignore_index = -100
            loss_fct = nn.CrossEntropyLoss(ignore_index=-100)
            loss = loss_fct(prediction_scores.view(-1, self.roberta.config.vocab_size), labels.view(-1))

        return (loss, prediction_scores) if loss is not None else (prediction_scores,)



if __name__ == "__main__":
    cfg = RobertaConfig()
    model = RobertaForMaskedLM(cfg)
    batch_size = 2
    seq_len = 16
    # случайные input_ids (значения 0..vocab_size-1)
    input_ids = torch.randint(0, cfg.vocab_size, (batch_size, seq_len), dtype=torch.long)
    attention_mask = torch.ones_like(input_ids)
    logits = model(input_ids, attention_mask=attention_mask)[0]
    print("logits.shape:", logits.shape)  # [batch, seq_len, vocab_size]


logits.shape: torch.Size([2, 16, 50265])
