### To train a baseline model using RobertA + Preprocessing function

In [62]:
import json
import os
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Tuple, Union

import torch
from dotenv import load_dotenv
from numpy.typing import NDArray
from rich import print
from torch.utils.data import DataLoader, Dataset
from tqdm.notebook import tqdm
from transformers import RobertaModel, RobertaTokenizer

In [83]:
if torch.backends.mps.is_available():
    device = torch.device("mps")
else:
    device = torch.device("cpu")

device

device(type='mps')

In [2]:
@dataclass
class ProcessedText:
    """
    Container for processed text data
    """

    cleaned_text: str
    sentences: List[str]
    num_sentences: int

In [6]:
class RoBERTaPreprocessor:
    """
    Housing all the relevant preprocessing steps for RoBERTa within this class - namely:
    1. Cleaning all text while preserving important punctuation and structure.
    2. Splitting text into stenctence while taking care of abbreviations.
    3. Tokenizing and encoding the text.
    """

    def __init__(self, max_length: int = 512) -> None:
        self.tokenizer = RobertaTokenizer.from_pretrained("roberta-base")
        self.max_length = max_length
        self.abbreviations = {"mr.", "mrs.", "dr.", "st.", "ave.", "prof."}

    def clean_text(self, text: str) -> str:
        """
        Cleaning all text while preserving important punctuation and structure.
        """

        ## Replace multiple newlines/spaces with single space
        text = re.sub(r"\n+", " ", text)
        text = re.sub(r"\s+", " ", text)

        ## Remove URLs and emails:
        text = re.sub(r"http\S+|www\.\S+", "", text)  # Remove URLs
        text = re.sub(r"\b[\w-]+@[\w-]+[.][\w-]+", "", text)  # Remove emails

        ## Repalce multiple white-spaces with a single space
        text = " ".join(text.split())

        ## Normalize dashes to hyphen
        text = text.replace("—", "-").replace("–", "-")

        ## Fix spacing around punctuation
        text = re.sub(r"\s+([.,!?;:])", r"\1", text)
        text = re.sub(r"\(\s+", "(", text)
        text = re.sub(r"\s+\)", ")", text)

        ## Additional cleaning:
        text = re.sub(r"[\u0080-\uFFFF]", "", text)  # Remove non-ASCII characters
        text = re.sub(r"\d+", "NUM", text)  # Replace numbers with "NUM"

        # Remove leading/trailing whitespace
        return text.strip()

    def text2sentences(self, text: str) -> List[str]:
        """
        Split the text into sentences while handling common abbreviations
        """
        sentences = []
        current = []

        words = text.split()  ## Splitting based on whitespaces

        ## Iterate through each word until a stop-character is found.
        for word in words:
            current.append(word)
            if word.lower() in self.abbreviations:
                continue
            if word.endswith((".", "!", "?")):
                sentences.append(" ".join(current))
                current = []

        ## To add the last uncompleted sentence is any:
        if len(current) > 0:
            sentences.append(" ".join(current))

        return sentences

    def process_text(self, text: str) -> ProcessedText:
        """
        Clean & split into sentences
        Returns:
            ProcessedText object with cleaned text and sentences
        """
        cleaned_text = self.clean_text(text)
        sentences = self.text2sentences(cleaned_text)

        return ProcessedText(
            cleaned_text=cleaned_text,
            sentences=sentences,
            num_sentences=len(sentences),
        )

    def encode_for_model(
        self,
        processed_text: ProcessedText,
        add_special_tokens: bool = True,
        truncation: bool = True,
        padding: str = "max_length",
    ) -> Dict[str, List[int]]:
        """
        Encode processed text for RoBERTa.
        """
        try:
            return self.tokenizer(
                processed_text.cleaned_text,
                add_special_tokens=add_special_tokens,
                max_length=self.max_length,
                padding=padding,
                truncation=truncation,
                return_attention_mask=True,
                return_tensors="pt",
            )
        except Exception as e:
            print(f"Error encoding text: {str(e)}")
            return None

    def batch_encode(self, texts: List[str], **kwargs) -> Dict[str, List[List[int]]]:
        """
        Process and encode a batch of texts.
        """
        processed_texts = [self.process_text(text).cleaned_text for text in texts]

        return self.tokenizer(
            processed_texts,
            max_length=self.max_length,
            padding=True,
            truncation=True,
            return_attention_mask=True,
            return_tensors="pt",
            **kwargs,
        )


preprocessor = RoBERTaPreprocessor()

In [106]:
class News24Dataset(Dataset):
    def __init__(
        self,
        texts: Union[NDArray, List],
        labels: List,
        preprocessor: RoBERTaPreprocessor,
    ):
        self.texts = texts
        self.labels = labels
        self.preprocessor = preprocessor

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = str(self.texts[idx])
        label = self.labels[idx]

        ## Process and encode the indexed text using the preprocessor:
        processed_text = self.preprocessor.process_text(text)
        encoding = self.preprocessor.encode_for_model(processed_text)
        label_tensor = torch.tensor(label, dtype=torch.long)
        if encoding is None:
            raise Exception(f"Failed to produce encoding for index: {idx}")

        return {
            "input_ids": encoding["input_ids"].flatten(),
            "attention_mask": encoding[
                "attention_mask"
            ].flatten(),  ## From the sample text: 1 for info, 0 for padding.
            "label": label_tensor,
        }

In [128]:
from torch import nn


class SectionClassifier_Text(nn.Module):
    def __init__(self, n_classes: int = 24):
        super(SectionClassifier_Text, self).__init__()
        self.n_classes = n_classes
        self.text_model = RobertaModel.from_pretrained("roberta-base")
        self.dropout = nn.Dropout(p=0.3)
        self.fc = nn.Linear(768, n_classes)

    def forward(
        self, input_ids: torch.Tensor, attention_mask: torch.Tensor
    ) -> torch.Tensor:
        outputs = self.text_model(
            input_ids=input_ids,
            attention_mask=attention_mask,
        )
        # last_hidden_state = outputs.last_hidden_state
        # Get the [CLS] token representation (first token)
        pooled_output = outputs.last_hidden_state[:, 0, :]
        pooled_output = self.dropout(pooled_output)
        logits = self.fc(pooled_output)
        # output = self.dropout(output)
        # output = self.fc(output)
        return logits

In [108]:
ENV_Path = Path("../envs/n24.env")
load_dotenv(str(ENV_Path))
data_dir = Path(os.environ.get("DATASET_ROOT"))
os.listdir(data_dir)

['.DS_Store', 'imgs', 'news']

In [109]:
import pandas as pd


def load_datajson(data_dir: Union[str, Path]) -> Tuple[List, List, int]:
    """
    Load and preprocess the N24News dataset from JSON for the PyTorch dataset
    """
    ## Reading JSON
    with open(data_dir, "r") as fp:
        data = json.load(fp)

    data_df = pd.DataFrame(data)
    label_dict = {cat: idx for idx, cat in enumerate(data_df["section"].unique())}

    labelmap_name = "_".join([data_dir.stem, "labelmap.json"])
    with open(data_dir.parent / labelmap_name, "w") as f:
        json.dump({v: k for k, v in label_dict.items()}, f)

    ## Convert the labels into numeric:
    labels = [label_dict[section] for section in data_df["section"]]

    ## Get the article texts:
    texts = data_df["article"].values

    del data_df
    return texts, labels, len(label_dict)

In [110]:
train_texts, train_labels, num_classes = load_datajson(
    data_dir=data_dir / "news" / "nytimes_train.json"
)

In [111]:
preprocessor = RoBERTaPreprocessor(max_length=512)
train_dataset = News24Dataset(
    texts=train_texts, labels=train_labels, preprocessor=preprocessor
)
# train_dl = DataLoader(train_dataset, batch_size=8, shuffle=True)

In [129]:
text_model = SectionClassifier_Text()
text_model = text_model.to(device)
text_model.eval()

Some weights of RobertaModel were not initialized from the model checkpoint at roberta-base and are newly initialized: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


SectionClassifier_Text(
  (text_model): RobertaModel(
    (embeddings): RobertaEmbeddings(
      (word_embeddings): Embedding(50265, 768, padding_idx=1)
      (position_embeddings): Embedding(514, 768, padding_idx=1)
      (token_type_embeddings): Embedding(1, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): RobertaEncoder(
      (layer): ModuleList(
        (0-11): 12 x RobertaLayer(
          (attention): RobertaAttention(
            (self): RobertaSdpaSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): RobertaSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (L

In [130]:
sample_batch = {key: value.to(device) for key, value in train_dataset[6].items()}

In [131]:
sample_batch.keys()

dict_keys(['input_ids', 'attention_mask', 'label'])

In [132]:
with torch.no_grad():
    output = text_model(
        input_ids=sample_batch["input_ids"].unsqueeze(0),
        attention_mask=sample_batch["attention_mask"].unsqueeze(0),
    )

In [136]:
predicted_class = torch.argmax(output, dim=1)

In [139]:
predicted_class, sample_batch["label"]

(tensor([9], device='mps:0'), tensor(5, device='mps:0'))