In [None]:
!git clone https://github.com/jefferyYu/UMT.git

In [None]:
import os, ast, math, json, random
from dataclasses import dataclass
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from PIL import Image
from tqdm import tqdm
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from transformers import AutoTokenizer, AutoModel, get_cosine_schedule_with_warmup

# -------------- Repro --------------
SEED = 1337
random.seed(SEED); np.random.seed(SEED); torch.manual_seed(SEED)
torch.backends.cudnn.deterministic = True; torch.backends.cudnn.benchmark = False

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# -------------- Paths (EDIT if needed) --------------
TXT_PATHS = {
    'train': '/kaggle/working/UMT/data/twitter2017/train.txt',
    'val':   '/kaggle/working/UMT/data/twitter2017/valid.txt',   # will auto-fix if 'mnre_val .txt' exists
    'test':  '/kaggle/working/UMT/data/twitter2017/test.txt'
}
IMG_DIRS = {
    'train': '/kaggle/input/twitter2017/twitter2017/twitter2017_images',
    'val':   '/kaggle/input/twitter2017/twitter2017/twitter2017_images',
    'test':  '/kaggle/input/twitter2017/twitter2017/twitter2017_images'
}


In [None]:
def parse_twitter_conll(path):
    samples, img_id, toks, tags = [], None, [], []
    with open(path, 'r', encoding='utf-8') as f:
        for line in f:
            line = line.strip()
            if not line:
                if img_id and toks:
                    samples.append({'img_id': img_id, 'tokens': toks, 'labels': tags})
                img_id, toks, tags = None, [], []
                continue
            if line.startswith('IMGID:'):
                img_id = line.split(':')[1].strip()
            else:
                parts = line.split()
                if len(parts) >= 2:
                    toks.append(parts[0]); tags.append(parts[1])
    return samples

def build_label_vocab(*lists):
    labels = set()
    for s_list in lists:
        for s in s_list:
            labels.update(s['labels'])
    labels = sorted(labels)
    label2id = {l:i for i,l in enumerate(labels)}
    id2label = {i:l for l,i in label2id.items()}
    return label2id, id2label

def find_image_path(img_dir, img_id):
    for ext in ('.jpg','.jpeg','.png','.bmp'):
        path = os.path.join(img_dir, img_id+ext)
        if os.path.exists(path): return path
    return None


In [None]:
from transformers import RobertaTokenizerFast
tokenizer = RobertaTokenizerFast.from_pretrained("roberta-large", add_prefix_space=True)
from PIL import Image, UnidentifiedImageError
class TwitterMNER(Dataset):
    def __init__(self, samples, img_dir, label2id, max_len=128, aug=False):
        self.samples, self.img_dir, self.label2id = samples, img_dir, label2id
        self.max_len = max_len
        self.tfm = transforms.Compose([
            transforms.Resize((256,256)),
            transforms.RandomResizedCrop((224,224), scale=(0.9,1.0)) if aug else transforms.Resize((224,224)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225]),
        ])

    def __len__(self): return len(self.samples)

    def __getitem__(self, idx):
        ex = self.samples[idx]
        tokens, labels = ex['tokens'], ex['labels']

        encodings = tokenizer(
            tokens, is_split_into_words=True,
            padding='max_length', truncation=True,
            max_length=self.max_len, return_tensors='pt'
        )
        word_ids = encodings.word_ids(batch_index=0)
        enc = {k:v.squeeze(0) for k,v in encodings.items()}

        # label alignment
        label_ids, prev_word = [], None
        for w_id in word_ids:
            if w_id is None:
                label_ids.append(-100)
            elif w_id != prev_word:
                label_ids.append(self.label2id[labels[w_id]])
            else:
                label_ids.append(-100)
            prev_word = w_id
        label_ids = torch.tensor(label_ids, dtype=torch.long)

        # image load (safe)
        img_path = find_image_path(self.img_dir, ex['img_id'])
        try:
            img = Image.open(img_path).convert('RGB') if img_path else Image.new('RGB',(224,224))
        except (UnidentifiedImageError, OSError):
            img = Image.new('RGB',(224,224))
        img = self.tfm(img)

        return {
            'input_ids': enc['input_ids'],
            'attention_mask': enc['attention_mask'],
            'pixel_values': img,
            'labels': label_ids
        }


In [None]:
class RobertaResNet50MNER(nn.Module):
    def __init__(self, num_labels):
        super().__init__()
        from transformers import RobertaModel
        self.text = RobertaModel.from_pretrained('roberta-large')
        hidden = self.text.config.hidden_size
        resnet = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V2)
        resnet.fc = nn.Identity()
        self.visual = resnet
        self.img_proj = nn.Linear(2048, hidden)
        self.gamma = nn.Linear(hidden, hidden)
        self.beta  = nn.Linear(hidden, hidden)
        self.dropout = nn.Dropout(0.1)
        self.classifier = nn.Linear(hidden, num_labels)

    def forward(self, input_ids, attention_mask, pixel_values, labels=None):
        txt_out = self.text(input_ids=input_ids, attention_mask=attention_mask)
        seq = txt_out.last_hidden_state                       # [B,L,H]
        img_feat = self.img_proj(self.visual(pixel_values))    # [B,H]
        g = self.gamma(img_feat).unsqueeze(1)
        b = self.beta(img_feat).unsqueeze(1)
        seq = (1+g)*seq + b
        seq = self.dropout(seq)
        logits = self.classifier(seq)

        # ✅ Always return both logits and loss
        loss = None
        if labels is not None:
            loss_fn = nn.CrossEntropyLoss(ignore_index=-100)
            logits_ = logits.view(-1, logits.size(-1))
            labels_ = labels.view(-1)
            loss = loss_fn(logits_, labels_)

        return logits, loss

In [None]:
train_s = parse_twitter_conll(TXT_PATHS['train'])
val_s   = parse_twitter_conll(TXT_PATHS['val'])
test_s  = parse_twitter_conll(TXT_PATHS['test'])
label2id, id2label = build_label_vocab(train_s, val_s, test_s)
print("Labels:", label2id)

train_ds = TwitterMNER(train_s, IMG_DIRS['train'], label2id, aug=True)
val_ds   = TwitterMNER(val_s,   IMG_DIRS['val'],   label2id)
test_ds  = TwitterMNER(test_s,  IMG_DIRS['test'],  label2id)
train_loader = DataLoader(train_ds, batch_size=8, shuffle=True, num_workers=2)
val_loader   = DataLoader(val_ds,   batch_size=12, shuffle=False, num_workers=2)
test_loader  = DataLoader(test_ds,  batch_size=12, shuffle=False, num_workers=2)


In [None]:
from transformers import get_cosine_schedule_with_warmup
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
import numpy as np

model = RobertaResNet50MNER(num_labels=len(label2id)).to(device)
EPOCHS = 3
optim = torch.optim.AdamW(model.parameters(), lr=3e-5, weight_decay=0.01)
total_steps = len(train_loader) * EPOCHS
sched = get_cosine_schedule_with_warmup(optim, int(0.1 * total_steps), total_steps)
scaler = torch.amp.GradScaler(device="cuda", enabled=(device.type == "cuda"))  # ✅ FIXED

def run_epoch(loader, train=True):
    model.train() if train else model.eval()
    losses, all_preds, all_labels = [], [], []
    for batch in tqdm(loader, desc='Train' if train else 'Eval', leave=False):
        input_ids = batch['input_ids'].to(device)
        attn      = batch['attention_mask'].to(device)
        pixels    = batch['pixel_values'].to(device)
        labels    = batch['labels'].to(device)

        with torch.amp.autocast('cuda', enabled=(device.type=='cuda')):
            logits, loss = model(input_ids, attn, pixels, labels)

        if train:
            optim.zero_grad(set_to_none=True)
            scaler.scale(loss).backward()
            nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            scaler.step(optim)
            scaler.update()
            sched.step()

        losses.append(loss.item())
        preds = torch.argmax(logits, -1).cpu().numpy()
        labs  = labels.cpu().numpy()
        for p, g in zip(preds, labs):
            for pi, gi in zip(p, g):
                if gi == -100: continue
                all_preds.append(pi)
                all_labels.append(gi)

    acc = accuracy_score(all_labels, all_preds)
    p, r, f, _ = precision_recall_fscore_support(all_labels, all_preds, average='micro')
    return np.mean(losses), acc, p, r, f

# === Train Loop ===
best, bad = -1, 0
train_losses, val_losses, train_f1s, val_f1s = [], [], [], []

for ep in range(1, EPOCHS + 1):
    tr_loss, tr_acc, tr_p, tr_r, tr_f1 = run_epoch(train_loader, True)
    vl_loss, vl_acc, vl_p, vl_r, vl_f1 = run_epoch(val_loader, False)
    train_losses.append(tr_loss); val_losses.append(vl_loss)
    train_f1s.append(tr_f1); val_f1s.append(vl_f1)

    print(f"Epoch {ep:02d} | Train F1 {tr_f1:.3f} | Val F1 {vl_f1:.3f}")

    if vl_f1 > best:
        best = vl_f1
        bad = 0
        torch.save(model.state_dict(), "best_twitter2017.pth")
    else:
        bad += 1
        if bad >= 2:
            print("Early stop.")
            break


In [None]:
import numpy as np
import torch
from tqdm import tqdm
from sklearn.metrics import (
    accuracy_score,
    precision_recall_fscore_support,
    classification_report,
    confusion_matrix
)
import matplotlib.pyplot as plt
import seaborn as sns

# Ensure model is in evaluation mode
model.eval()

all_labels, all_preds = [], []

with torch.no_grad():
    for batch in tqdm(test_loader, desc="Evaluating"):
        input_ids = batch['input_ids'].to(device)
        attn      = batch['attention_mask'].to(device)
        pixels    = batch['pixel_values'].to(device)
        labels    = batch['labels'].to(device)

        logits, loss = model(input_ids, attn, pixels, labels)
        preds = torch.argmax(logits, dim=-1).cpu().numpy()
        golds = labels.cpu().numpy()

        # collect only valid tokens (ignore subwords/pads)
        for p, g in zip(preds, golds):
            for pi, gi in zip(p, g):
                if gi == -100:
                    continue
                all_preds.append(pi)
                all_labels.append(gi)

# ---------------- Overall Metrics ----------------
acc  = accuracy_score(all_labels, all_preds)
prec_micro, rec_micro, f1_micro, _ = precision_recall_fscore_support(all_labels, all_preds, average='micro')
prec_macro, rec_macro, f1_macro, _ = precision_recall_fscore_support(all_labels, all_preds, average='macro')
prec_weighted, rec_weighted, f1_weighted, _ = precision_recall_fscore_support(all_labels, all_preds, average='weighted')

print("\n===== Detailed Test Results =====")
print(f"Accuracy:        {acc:.4f}")
print(f"Micro F1:        {f1_micro:.4f}")
print(f"Macro F1:        {f1_macro:.4f}")
print(f"Weighted F1:     {f1_weighted:.4f}")
print(f"Micro Precision: {prec_micro:.4f}")
print(f"Micro Recall:    {rec_micro:.4f}")

# ---------------- Per-label report ----------------
target_names = [id2label[i] for i in range(len(id2label))]
report = classification_report(
    all_labels, all_preds,
    target_names=target_names,
    digits=4,
    zero_division=0
)
print("\nPer-label classification report:")
print(report)

# ---------------- Confusion Matrix ----------------
cm = confusion_matrix(all_labels, all_preds, labels=list(range(len(id2label))))
# plt.figure(figsize=(7,6))
# sns.heatmap(
#     cm, annot=False, cmap='YlGnBu',
#     xticklabels=target_names, yticklabels=target_names
# )
# plt.xlabel("Predicted Label")
# plt.ylabel("True Label")
# plt.title("Confusion Matrix – Token Level")
# plt.show()


In [None]:
import torch
import torch.nn.functional as F
from PIL import Image
from torchvision import transforms
import matplotlib.pyplot as plt
from transformers import RobertaTokenizerFast

# === Load tokenizer ===
tokenizer = RobertaTokenizerFast.from_pretrained("roberta-large", add_prefix_space=True)

# === Load model (same as your trained one) ===
class RobertaResNet50MNER(torch.nn.Module):
    def __init__(self, num_labels):
        super().__init__()
        from transformers import RobertaModel
        from torchvision import models
        self.text = RobertaModel.from_pretrained('roberta-large')
        hidden = self.text.config.hidden_size
        resnet = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V2)
        resnet.fc = torch.nn.Identity()
        self.visual = resnet
        self.img_proj = torch.nn.Linear(2048, hidden)
        self.gamma = torch.nn.Linear(hidden, hidden)
        self.beta = torch.nn.Linear(hidden, hidden)
        self.dropout = torch.nn.Dropout(0.1)
        self.classifier = torch.nn.Linear(hidden, num_labels)

    def forward(self, input_ids, attention_mask, pixel_values, labels=None):
        txt_out = self.text(input_ids=input_ids, attention_mask=attention_mask)
        seq = txt_out.last_hidden_state
        img_feat = self.img_proj(self.visual(pixel_values))
        g = self.gamma(img_feat).unsqueeze(1)
        b = self.beta(img_feat).unsqueeze(1)
        seq = (1 + g) * seq + b
        seq = self.dropout(seq)
        logits = self.classifier(seq)

        loss = None
        if labels is not None:
            loss_fn = torch.nn.CrossEntropyLoss(ignore_index=-100)
            logits_ = logits.view(-1, logits.size(-1))
            labels_ = labels.view(-1)
            loss = loss_fn(logits_, labels_)
        return logits, loss

# === Load your trained weights ===
model = RobertaResNet50MNER(num_labels=len(id2label))
ckpt = torch.load("best_twitter2017.pth", map_location=device)
model.load_state_dict(ckpt)
model.to(device)
model.eval()

# === Image & text ===
img_path = "/kaggle/input/twitter2017/twitter2017/twitter2017_images/16_05_01_100.jpg"
tweet_text = "AP News : The Latest : Little Rock settles lawsuit over police shooting"

# === Preprocess image ===
img_tfm = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485,0.456,0.406],
                         std=[0.229,0.224,0.225])
])
image = Image.open(img_path).convert("RGB")
pixel_values = img_tfm(image).unsqueeze(0).to(device)

# === Tokenize text ===
tokens = tweet_text.split()
enc = tokenizer(tokens, is_split_into_words=True,
                padding='max_length', truncation=True,
                max_length=128, return_tensors='pt')
input_ids = enc["input_ids"].to(device)
attn = enc["attention_mask"].to(device)

# === Inference ===
with torch.no_grad():
    logits, _ = model(input_ids, attn, pixel_values)  # unpack tuple
    probs = F.softmax(logits, dim=-1)
    preds = torch.argmax(probs, dim=-1).squeeze(0).cpu().numpy()
    confs = torch.max(probs, dim=-1).values.squeeze(0).cpu().numpy()

# === Decode predictions ===
word_ids = enc.word_ids(batch_index=0)
pred_tags = []
prev_word = None
for i, w_id in enumerate(word_ids):
    if w_id is None or w_id == prev_word:
        continue
    tag = id2label[int(preds[i])]
    conf = float(confs[i])
    pred_tags.append((tokens[w_id], tag, conf))
    prev_word = w_id

# === Display results ===
print(f"\n🔍 Predicted entities for: {img_path}")
for tok, tag, conf in pred_tags:
    if tag != "O":
        print(f"{tok:15s} → {tag:8s} (confidence: {conf:.3f})")

plt.imshow(image)
plt.axis("off")
plt.title("Predicted Entities with Confidence")
plt.show()
