### Load dataset

In [195]:
from datasets import load_dataset

test_df = load_dataset("mteb/tweet_sentiment_extraction")['test'].to_pandas()

Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`
Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`
Generating train split: 100%|██████████| 26732/26732 [00:00<00:00, 302255.92 examples/s]
Generating test split: 100%|██████████| 3432/3432 [00:00<00:00, 487696.55 examples/s]


In [196]:
test_df.head()

Unnamed: 0,id,text,label,label_text
0,f87dea47db,Last session of the day http://twitpic.com/67ezh,1,neutral
1,96d74cb729,Shanghai is also really exciting (precisely -...,2,positive
2,eee518ae67,"Recession hit Veronique Branquinho, she has to...",0,negative
3,33987a8ee5,http://twitpic.com/4w75p - I like it!!,2,positive
4,726e501993,that`s great!! weee!! visitors!,2,positive


In [197]:
test_df.label = test_df.label_text.map({'negative': 2, 'neutral': 1, 'positive': 0}) # Map labels to match our convention

In [198]:
test_df.label.value_counts(normalize=True)

label
1    0.400641
0    0.312937
2    0.286422
Name: proportion, dtype: float64

In [199]:
test_df.text.duplicated().sum()

np.int64(0)

In [200]:
test_df.isna().sum()

id            0
text          0
label         0
label_text    0
dtype: int64

### Preprocessing pipeline

In [201]:
import re
import pandas as pd
import emoji
import numpy as np
CONTRACTION_PATH = '../data/mapping/contraction.csv'
EMOJI_PATH = '../data/mapping/emoji.csv'

# === Contraction ===
contraction_df = pd.read_csv(CONTRACTION_PATH, encoding='utf-8')
contraction_map = {key: value for key, value in zip(contraction_df.contraction, contraction_df.extension)}

contraction_pattern = re.compile(
    r"\b(" + "|".join(map(re.escape, contraction_map.keys())) + r")\b",
    flags=re.IGNORECASE
)

def replace_contraction(match):
    w = match.group(0).lower()
    if w in contraction_map:
        return contraction_map[match.group(0).lower()]
    else:
        return w

def extend(text):
    return re.sub(contraction_pattern, replace_contraction, text)



# === Emoji ===
emoji_df = pd.read_csv(EMOJI_PATH, encoding='utf-8')
emoji_df['Score'] = np.tanh(
    np.log((emoji_df.Positive + 1) / (emoji_df.Negative + 1))
)

GENDER_EMOJI_MAP = {
    "\u2640": "[EMO_FEMALE]",  # ♀
    "\u2642": "[EMO_MALE]",    # ♂
    "\u26A7": "[EMO_TRANS]"    # ⚧
}

def normalize_emoji(e):
    e = re.sub(r"\uFE0F", "", e)
    if e in GENDER_EMOJI_MAP:
        return GENDER_EMOJI_MAP[e]
    e = re.sub(r"\u200d", "", e)
    return e

emoji_df['Emoji_norm'] = emoji_df['Emoji'].apply(normalize_emoji)
emoji_df = emoji_df[emoji_df['Emoji_norm'].str.len() > 0]

emoji_list = sorted(
    emoji_df['Emoji_norm'].unique(),
    key=len,
    reverse=True
)

emoji_map = dict(
    zip(
        emoji_df['Emoji_norm'],
        zip(emoji_df['Unicode name'], emoji_df.Score)
    )
)

def extract_emoji(sentence, beta=1.0):
    emoji_scores = []
    new_text = sentence
    strongest = 0.0

    for e in emoji.emoji_list(sentence):
        em = e['emoji']
        norm_em = normalize_emoji(em)

        if not norm_em.startswith("[EMO_"):
            name, score = emoji_map.get(norm_em, ('[EMO]', 0.0))
            if score != 0.0:
                emoji_scores.append(score)
        else:
            name = norm_em

        new_text = new_text.replace(em, name)

    if emoji_scores:
        strongest = max(emoji_scores, key=lambda s: abs(s))

    return new_text, strongest * beta



# === Markdown reddit ===
def extract_markdown(text):
    # spoiler
    text = re.sub(r">!(.+?)!<", r" <spoiler> \1 </spoiler> ", text)

    # bold + italic
    text = re.sub(r"\*\*\*(.+?)\*\*\*", r" <bi> \1 </bi> ", text)

    # bold
    text = re.sub(r"\*\*(.+?)\*\*", r" <b> \1 </b> ", text)

    # italic *
    text = re.sub(r"\*(?!\*)(.+?)(?<!\*)\*", r" <i> \1 </i> ", text)

    # strike
    text = re.sub(r"~~(.+?)~~", r" <s> \1 </s> ", text)

    # quote (line-based)
    text = re.sub(r"^>(.+)", r" <q> \1 </q> ", text, flags=re.M)

    # inline code
    text = re.sub(r"`(.+?)`", r" <code> \1 </code> ", text)

    # triple double quotes
    text = re.sub(r'"""\s*(.+?)\s*"""', r' <quote> \1 </quote> ', text)

    return text



# === Mention ===
def normalize_mention(text):
    return re.sub(r"(?<!\w)@[A-Za-z_][A-Za-z0-9_]{1,30}", '[MENTION]', text)    



# === URL ===
def normalize_url(text):
    return re.sub(r"https?://\S+|www\.\S+", '[URL]', text)



# === Time ===
def normalize_time(text):
    return re.sub(r"\b(?:1[0-2]|0?[1-9]):[0-5][0-9]\s*(?i:am|pm)\b", '[TIME]', text)



# === Date ===
def normalize_date(text):
    # ISO 8601 datetime: 2026-01-07T10:30:00
    text = re.sub(r"\b\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\b", "[DATE]", text)
    # YYYY-MM-DD
    text = re.sub(r"\b\d{4}-\d{2}-\d{2}\b", "[DATE]", text)
    # MM/DD/YYYY
    text = re.sub(r"\b\d{1,2}/\d{1,2}/\d{4}\b", "[DATE]", text)
    # DD-MM-YYYY
    text = re.sub(r"\b\d{1,2}-\d{1,2}-\d{4}\b", "[DATE]", text)
    # Month Day, Year (Jan 7, 2026 or January 7, 2026)
    text = re.sub(
        r"\b(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec|"
        r"January|February|March|April|May|June|July|August|September|October|November|December)"
        r"\s+\d{1,2},\s*\d{4}\b", "[DATE]", text
    )
    # Day Month Year (7 Jan 2026 or 7 January 2026)
    text = re.sub(
        r"\b\d{1,2}\s+"
        r"(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec|"
        r"January|February|March|April|May|June|July|August|September|October|November|December)"
        r"\s+\d{4}\b", "[DATE]", text
    )
    # Compact numeric YYYYMMDD
    text = re.sub(r"\b\d{8}\b", "[DATE]", text)
    
    return text



# === Hashtag ===
def normalize_hashtag(text):

    def repl(m):
        tag = m.group()[1:]
        tag = tag.lower()
        return f"[HASHTAG] {tag}"
    
    return re.sub(r'#\w+', repl, text)



# === Whitespace ===
def normalize_whitespace(text):
    return re.sub(r'\s+', ' ', text).strip()



# === Lowercase ===
def lowercase(text):
    token_pattern = r'(\[[A-Z_]+\])'

    parts = re.split(token_pattern, text)

    parts = [p.lower() if not re.fullmatch(token_pattern, p) else p for p in parts]

    return ''.join(parts)



# === Punctuation ===
def normalize_punctuation(text: str) -> str:
    text = re.sub(r'\.{3,}', '...', text)

    text = re.sub(r'!{3,}', '!!', text)
    text = re.sub(r'\?{3,}', '??', text)

    text = re.sub(r'(!\?|\?!){2,}', '!?', text)

    text = re.sub(r'\s+([!?.,])', r'\1', text)
    text = re.sub(r'([!?.,])\s+', r'\1 ', text)

    text = re.sub(r'([,;:]){2,}', r'\1', text)

    return text



# === All uppercase ===
def extract_is_all_uppercase(text):
    return text, int(text.isupper())



# === Uppercase ratio ===
def extract_uppercase_ratio(text):
    clean_text = re.sub(r'\[[A-Z_]+\]', '', text)
    
    alphas = [c for c in clean_text if c.isalpha()]
    
    if not alphas:
        return text,0.0
    
    return text, sum(c.isupper() for c in alphas) / len(alphas)



# === Exclaimination ===
def extract_exclamation_intensity(text, cap=5):
    max_run = 0
    cur = 0
    for c in text:
        if c == '!':
            cur += 1
            max_run = max(max_run, cur)
        else:
            cur = 0

    return text, min(max_run, cap) / cap

### Pipeline order

In [202]:
from functools import partial

preprocessing_pipeline = [
    (partial(extract_exclamation_intensity, cap=5), 'ex_intensity'),
    (extract_markdown),
    (extend),
    (partial(extract_emoji, beta=1.0), 'emoji_score'),
    (normalize_mention),
    (normalize_url),
    (normalize_time),
    (normalize_date),
    (normalize_hashtag),
    (normalize_whitespace),
    (extract_is_all_uppercase, 'all_uppercase'),
    (extract_uppercase_ratio, 'uppercase_ratio'),
    (lowercase),
    (normalize_punctuation)
]

In [203]:
def apply_preprocessing(text):
    res = {"text": text}

    for process in preprocessing_pipeline:

        if isinstance(process, tuple):
            func, key = process
            res['text'], res[key] = func(res['text'])
        else:
            func = process
            res['text'] = func(res['text'])

    return res

### Apply preprocessing

In [204]:
labels = test_df.label.tolist()
test_df = pd.DataFrame(test_df.text.apply(apply_preprocessing).to_list())
test_df['label'] = labels

### Load model

In [205]:
import torch
from torch import nn
import torch.nn.functional as F


class AttentionPooling(nn.Module):
    def __init__(self, hidden_dim):
        super().__init__()
        self.attn = nn.Linear(hidden_dim, 1)
        self.dropout = nn.Dropout(0.4)

    def forward(self, lstm_out, mask):
        # lstm_out: (B, T, H)
        # mask: (B, T, 1)

        scores = self.attn(lstm_out).squeeze(-1)  # (B, T)
        scores = scores.masked_fill(mask.squeeze(-1) == 0, -1e9)

        attn_weights = F.softmax(scores, dim=1)  # (B, T)
        attn_weights = self.dropout(attn_weights)
        context = torch.sum(lstm_out * attn_weights.unsqueeze(-1), dim=1)

        return context, attn_weights


class Model(nn.Module):
    def __init__(self, embedding_matrix, lstm_hidden=128, lstm_layers=1, num_classes=3, embed_proj_size=128):
        super().__init__()
        self.embedding_matrix = embedding_matrix
        self.num_classes = num_classes

        self.embedding = nn.Embedding.from_pretrained(torch.tensor(embedding_matrix, dtype=torch.float32), padding_idx=0, freeze=True)

        self.embed_proj = nn.Sequential(
            nn.Linear(embedding_matrix.size(1), embed_proj_size),
            nn.LayerNorm(embed_proj_size),
            nn.Dropout(0.3)
        )

        self.lstm = nn.LSTM(
            input_size=embed_proj_size,
            hidden_size=lstm_hidden,
            num_layers=lstm_layers,
            batch_first=True,
            bidirectional=True
        )

        self.dropout = nn.Dropout(0.3)

        self.attention = AttentionPooling(lstm_hidden * 2)

        self.layernorm = nn.LayerNorm(lstm_hidden * 2 + 4)

        self.mlp = nn.Sequential(
            nn.Linear(lstm_hidden * 2 + 4, 32),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(32, num_classes)
        )

    def forward(self, text_ids, extra_feats):
        # Embedding
        x = self.embedding(text_ids)  # (B, T, E)

        x = self.embed_proj(x)

        # LSTM
        lstm_out, _ = self.lstm(x)  # (B, T, 2H)
        lstm_out = self.dropout(lstm_out)

        # Mask padding
        mask = (text_ids != 0).unsqueeze(-1)  # (B, T, 1)

        # Attention
        context, attn_weights = self.attention(lstm_out, mask)

        # Concatenate extra features
        features = torch.cat([context, extra_feats], dim=1)

        features = self.layernorm(features)

        logits = self.mlp(features)

        return logits

In [206]:
ckpt = torch.load("../model/embedding.pt", map_location="cpu")

embedding_matrix = ckpt["embedding"]
word2idx = ckpt["word2idx"]

In [207]:
LSTM_HIDDEN = 128
LSTM_LAYERS = 1
EMBED_PROJ_SIZE = 128

model = Model(embedding_matrix, lstm_hidden=LSTM_HIDDEN, lstm_layers=LSTM_LAYERS, embed_proj_size=EMBED_PROJ_SIZE).to('cuda' if torch.cuda.is_available() else 'cpu')

  self.embedding = nn.Embedding.from_pretrained(torch.tensor(embedding_matrix, dtype=torch.float32), padding_idx=0, freeze=True)


In [208]:
model.load_state_dict(torch.load("../model/clf.pt"))

<All keys matched successfully>

### Torch Dataset

In [209]:
from torch.utils.data import Dataset
import re

class SentimentDataset(Dataset):
    def __init__(self, dataframe, word2idx, max_len=128):
        self.df = dataframe
        self.word2idx = word2idx
        self.max_len = max_len


    def tokenize(self, text):
        patterns = [
            r"\[[A-Z_]+\]",
            r"<\/?[\w_]+>",
            r"\w+",
            r"[?!]{2,}",
            r"\.{3,}",
            r"[^\w\s]"
        ]

        combined = re.compile("|".join(patterns), re.UNICODE)

        return combined.findall(text)


    def encode_text(self, text):
        tokens = self.tokenize(text)
        ids = [self.word2idx.get(token, self.word2idx['<unk>']) for token in tokens]
        ids = ids[:self.max_len]

        return ids + [self.word2idx['<pad>']] * (self.max_len - len(ids))


    def __len__(self):
        return len(self.df)


    def __getitem__(self, index):
        row = self.df.loc[index]

        text_ids = torch.tensor(self.encode_text(row['text']))
        extra_feats = torch.tensor([
            row["ex_intensity"],
            row["emoji_score"],
            row["all_uppercase"],
            row["uppercase_ratio"]]
        , dtype=torch.float32)

        return text_ids, extra_feats, torch.tensor(int(row['label']), dtype=torch.long)


In [210]:
from torch.utils.data import DataLoader

MAX_LEN = 128
BATCH_SIZE = 64

test_dataset = SentimentDataset(test_df, word2idx, max_len=MAX_LEN)
test_loader = DataLoader(
    test_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False
)

### Evaluate function

In [211]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import torch.nn.functional as F
from tqdm import tqdm

criterion=nn.CrossEntropyLoss()

def evaluate(model, dataloader, criterion=criterion, device='cuda', threshold=None):
    model.eval()
    total_loss = 0

    all_labels = []
    all_preds = []

    with torch.no_grad():
        for text_ids, extra_feats, labels in tqdm(dataloader, desc='Validating'):
            text_ids = text_ids.to(device)
            extra_feats = extra_feats.to(device)
            labels = labels.to(device)

            logits = model(text_ids, extra_feats)
            loss = criterion(logits, labels)

            total_loss += loss.item()

            all_labels.extend(labels.cpu().tolist())

            probs = F.softmax(logits, dim=1)

            if threshold is None:
                preds = torch.argmax(probs, dim=1).to('cpu').tolist()
            else:
                preds = []

                for p in probs:
                    prob_pos = p[0].item()
                    prob_neu = p[1].item()
                    prob_neg = p[2].item()

                    if prob_pos >= threshold.get('pos', 1/3) and prob_pos >= prob_neg:
                        preds.append(0)
                    elif prob_neg >= threshold.get('neg', 1/3) and prob_neg > prob_pos:
                        preds.append(2)
                    else:
                        preds.append(1)

            all_preds.extend(preds)

    avg_loss = total_loss / len(dataloader)

    acc = accuracy_score(all_labels, all_preds)

    precision = precision_score(all_labels, all_preds, average=None, labels=[0,1,2])
    recall = recall_score(all_labels, all_preds, average=None, labels=[0,1,2])

    precision_pos, precision_neu, precision_neg = precision
    recall_pos, recall_neu, recall_neg = recall

    return {
        'loss': avg_loss,
        'acc': acc,
        'precision_pos': precision_pos,
        'precision_neg': precision_neg,
        'precision_neu': precision_neu,
        'recall_pos': recall_pos,
        'recall_neg': recall_neg,
        'recall_neu': recall_neu,
        'f1_pos': f1_score(all_labels, all_preds, average=None, labels=[0])[0],
        'f1_neg': f1_score(all_labels, all_preds, average=None, labels=[2])[0],
        'f1_neu': f1_score(all_labels, all_preds, average=None, labels=[1])[0],
    }


### Call function

In [224]:
evaluation = evaluate(model, test_loader, device='cuda' if torch.cuda.is_available() else 'cpu', threshold={'pos': 0.4, 'neg': 0.31})

Validating: 100%|██████████| 54/54 [00:01<00:00, 39.00it/s]


In [225]:
evaluation

{'loss': 0.7557825087397186,
 'acc': 0.7013403263403264,
 'precision_pos': np.float64(0.6935849056603773),
 'precision_neg': np.float64(0.7043650793650794),
 'precision_neu': np.float64(0.707916287534122),
 'recall_pos': np.float64(0.8556797020484171),
 'recall_neg': np.float64(0.7222787385554426),
 'recall_neu': np.float64(0.5658181818181818),
 'f1_pos': np.float64(0.7661525635681534),
 'f1_neg': np.float64(0.7132094424912104),
 'f1_neu': np.float64(0.6289409862570736)}