In [None]:
pip install seqeval

# Load Data

In [None]:
import pandas as pd
import requests

# Read Datasets from JSONL files
df_train1 = pd.read_json("/kaggle/input/absa-dataset1/train.jsonl", lines=True)
df_train2 = pd.read_json("/kaggle/input/absa-dataset1/train2.jsonl", lines=True)
df_train3 = pd.read_json("/kaggle/input/absa-dataset1/rest16_quad_train.tsv.jsonl", lines=True)


## validation 
df_val1 = pd.read_json("/kaggle/input/absa-dataset1/dev.jsonl", lines=True)
df_val2 = pd.read_json("/kaggle/input/absa-dataset1/dev2.jsonl", lines=True)
df_val3 = pd.read_json("/kaggle/input/absa-dataset1/rest16_quad_dev.tsv.jsonl", lines=True)

## test 
df_test1 = pd.read_json("/kaggle/input/absa-dataset1/test.jsonl", lines=True)
df_test2 = pd.read_json("/kaggle/input/absa-dataset1/test2.jsonl", lines=True)
df_test3 = pd.read_json("/kaggle/input/absa-dataset1/rest16_quad_test.tsv.jsonl", lines=True)



In [None]:
df_train = pd.concat([df_train1, df_train2,df_train3], ignore_index=True)
df_test = pd.concat([df_test1, df_test2,df_test3], ignore_index=True)
df_val = pd.concat([df_val1, df_val2,df_val3], ignore_index=True)

In [None]:
print('train set size:', len(df_train))
print('validation set size:', len(df_val))
print('test set size:', len(df_test))

In [None]:
df_train['text'].duplicated().sum()

In [None]:
df_test['text'].duplicated().sum()

In [None]:
df_val['text'].duplicated().sum()

In [None]:
df_train[df_train['text']=='We , there were four of us , arrived at noon - the place was empty - and the staff acted like we were imposing on them and they were very rude .']

In [None]:
df_train = df_train.drop_duplicates(subset=["text"])
df_test = df_test.drop_duplicates(subset=["text"])
df_val = df_val.drop_duplicates(subset=["text"])

In [None]:
df_train.shape

In [None]:
df_train['text'][df_train['text'].duplicated()]

# Data Processing

In [None]:
df_train.info()

In [None]:
import re

def clean_text(text):
    text = text.lower()
    text = re.sub(r"\s+", " ", text)
    text=re.sub(r'[^\w\s]', '', text)     # remove multiple spaces
    text = re.sub(r"\s([?.!,;:])", r"\1", text)  # remove space before punctuation
    text = text.strip()
    return text

In [None]:
df_train["text"] = df_train['text'].apply(clean_text)
df_test["text"] = df_test['text'].apply(clean_text)
df_val["text"] = df_val['text'].apply(clean_text)


In [None]:
df_test.head()

# ASPECTS EXTRACTIONS

In [None]:
def extract_aspects(colomn):
    return [col["aspect"] for col in colomn]

In [None]:
ae_train=df_train
ae_test=df_test
ae_val=df_val

In [None]:
ae_train['aspects'] = ae_train['labels'].apply(extract_aspects)
ae_test['aspects'] = ae_test['labels'].apply(extract_aspects)
ae_val['aspects'] = ae_val['labels'].apply(extract_aspects)

In [None]:
ae_val.head(8)

In [None]:
ae_val['num_aspects']=ae_val['aspects'].apply(len)
ae_train['num_aspects']=ae_train['aspects'].apply(len)
ae_test['num_aspects']=ae_test['aspects'].apply(len)

In [None]:
ae_test['num_aspects'].value_counts()

## Tokenizer

In [None]:
import re
from transformers import BertTokenizerFast

tokenizer = BertTokenizerFast.from_pretrained("bert-base-uncased")


In [None]:
from transformers import RobertaTokenizerFast, RobertaForTokenClassification
tokenizer = RobertaTokenizerFast.from_pretrained("roberta-base", add_prefix_space=True )


In [None]:
tokenizer

## Create BIO_Tags

In [None]:

def create_bio(sentence, aspects):

    # tokenize with offsets
    encoded = tokenizer(sentence, return_offsets_mapping=True, add_special_tokens=False)
    tokens = tokenizer.convert_ids_to_tokens(encoded["input_ids"])
    offsets = encoded["offset_mapping"]

    labels = ["O"] * len(tokens)

    for aspect in aspects:
        # find aspect positions in sentence
        for match in re.finditer(re.escape(aspect), sentence, flags=re.IGNORECASE):
            start, end = match.span()

            # assign BIO tags
            first = True
            for i, (s, e) in enumerate(offsets):
                if s < end and e > start:  # overlapping
                    if first:
                        labels[i] = "B-ASP"
                        first = False
                    else:
                        labels[i] = "I-ASP"

    return tokens, labels


In [None]:
ae_train['Tokens'], ae_train['BIO_Labels'] = zip(*ae_train.apply(lambda row: create_bio(row['text'], row['aspects']), axis=1))
ae_val['Tokens'], ae_val['BIO_Labels'] = zip(*ae_val.apply(lambda row: create_bio(row['text'], row['aspects']), axis=1))
ae_test['Tokens'], ae_test['BIO_Labels'] = zip(*ae_test.apply(lambda row: create_bio(row['text'], row['aspects']), axis=1))


In [None]:
ae_val.head()

## Prepare the data for model

In [None]:
label2id = {"O": 0, "B-ASP": 1, "I-ASP": 2}
id2label = {v: k for k, v in label2id.items()}

In [None]:
id2label

In [None]:
def encode_example(example):
    tokens = example["Tokens"]
    labels = example["BIO_Labels"]

    encoding = tokenizer(
        tokens,
        is_split_into_words=True,
        return_offsets_mapping=True,
        truncation=True,
        max_length=128,
        padding="max_length"
    )

    # Align labels with tokenized words
    word_ids = encoding.word_ids()

    aligned_labels = []
    previous_word = None

    for word_id in word_ids:
        if word_id is None:
            aligned_labels.append(-100)        # ignored by model
        else:
            label = labels[word_id]

            # For subwords: first → original label, others → I-ASP
            if word_id != previous_word:
                aligned_labels.append(label2id[label])
            else:
                # if original was B-ASP, subwords should be I-ASP
                if label == "B-ASP":
                    aligned_labels.append(label2id["I-ASP"])
                else:
                    aligned_labels.append(label2id[label])

        previous_word = word_id

    encoding["labels"] = aligned_labels
    return encoding


In [None]:
ae_val["BIO_Labels"]

In [None]:
encoded_dataset_train = [encode_example(ex) for ex in  ae_train.to_dict(orient="records")]
encoded_dataset_test = [encode_example(ex) for ex in  ae_test.to_dict(orient="records")]
encoded_dataset_val= [encode_example(ex) for ex in  ae_val.to_dict(orient="records")]


In [None]:
encoded_dataset_test[1]

# Train Models

### bert

In [None]:
from transformers import BertForTokenClassification, BertConfig

num_labels = 3  # B, I, O  (or any number you use)

model = BertForTokenClassification.from_pretrained(
    "bert-base-uncased",
    num_labels=num_labels
)

In [None]:
model = RobertaForTokenClassification.from_pretrained(
    "roberta-base",
    num_labels=num_labels
)

In [None]:
from torch.utils.data import Dataset
import torch

class ABSADataset(Dataset):
    def __init__(self, encoded_list):
        self.data = encoded_list

    def __getitem__(self, idx):
        return {k: torch.tensor(v) for k, v in self.data[idx].items()}

    def __len__(self):
        return len(self.data)


In [None]:
from torch.utils.data import DataLoader

train_dataset = ABSADataset(encoded_dataset_train)
test_dataset = ABSADataset(encoded_dataset_test)
val_dataset = ABSADataset(encoded_dataset_val)


train_loader = DataLoader(train_dataset,batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset,batch_size=16, shuffle=False)
test_loader = DataLoader(test_dataset,batch_size=16, shuffle=False)

In [None]:
len(test_dataset)

In [None]:
optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

In [None]:
device

# Train Model Without Validation set

from tqdm import tqdm
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score

num_epochs = 60
train_losses = []
train_accuracies = []

for epoch in range(num_epochs):
    print(f"Epoch {epoch + 1}")
    model.train()
    total_loss = 0
    all_preds = []
    all_labels = []

    for batch in tqdm(train_loader):
        optimizer.zero_grad()
        batch = {k: v.to(device) for k, v in batch.items()}

        outputs = model(
            input_ids=batch["input_ids"],
            attention_mask=batch["attention_mask"],
            labels=batch["labels"]
        )

        loss = outputs.loss
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

        # ----- Accuracy tracking -----
        logits = outputs.logits
        preds = torch.argmax(logits, dim=-1)
        for pred_seq, label_seq in zip(preds, batch["labels"]):
            pred_seq = pred_seq.cpu().numpy()
            label_seq = label_seq.cpu().numpy()
            # ignore -100
            mask = label_seq != -100
            all_preds.extend(pred_seq[mask])
            all_labels.extend(label_seq[mask])

    epoch_loss = total_loss / len(train_loader)
    epoch_acc = accuracy_score(all_labels, all_preds)
    train_losses.append(epoch_loss)
    train_accuracies.append(epoch_acc)

    print(f"Loss: {epoch_loss:.4f}, Token-level Accuracy: {epoch_acc:.4f}")


plt.figure(figsize=(12,5))

plt.subplot(1,2,1)
plt.plot(range(1, num_epochs+1), train_losses)
plt.title("Training Loss per Epoch")
plt.xlabel("Epoch")
plt.ylabel("Loss")

plt.subplot(1,2,2)
plt.plot(range(1, num_epochs+1), train_accuracies)
plt.title("Token-level Accuracy per Epoch")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")

plt.show()

## Test Model Without validation

from seqeval.metrics import f1_score, precision_score, recall_score, classification_report

# Map your label IDs to BIO tags
id2tag = {0: "B", 1: "I", 2: "O"}  # adjust based on your BIO scheme

def decode_bio(pred_ids, label_ids):
    pred_labels = []
    true_labels = []
    for p, l in zip(pred_ids, label_ids):
        if l == -100:  # ignore padding
            continue
        pred_labels.append(id2tag[p])
        true_labels.append(id2tag[l])
    return pred_labels, true_labels

def extract_spans(bio_seq):
    spans = []
    start = None
    for i, tag in enumerate(bio_seq):
        if tag == "B":
            if start is not None:
                spans.append((start, i-1))
            start = i
        elif tag == "O":
            if start is not None:
                spans.append((start, i-1))
                start = None
    if start is not None:
        spans.append((start, len(bio_seq)-1))
    return spans

# -----------------------------
# TEST MODEL & METRICS
# -----------------------------
model.eval()
all_pred_labels = []
all_true_labels = []

with torch.no_grad():
    for batch in test_loader:
        batch = {k: v.to(device) for k, v in batch.items()}
        outputs = model(
            input_ids=batch["input_ids"],
            attention_mask=batch["attention_mask"]
        )
        logits = outputs.logits
        preds = torch.argmax(logits, dim=-1)

        for pred_seq, label_seq in zip(preds, batch["labels"]):
            pred_seq = pred_seq.cpu().numpy()
            label_seq = label_seq.cpu().numpy()
            pred_labels, true_labels = decode_bio(pred_seq, label_seq)
            all_pred_labels.append(pred_labels)
            all_true_labels.append(true_labels)

# -----------------------------
# TOKEN-LEVEL METRICS
# -----------------------------
print("Token-level Metrics:")
print("F1-score:", f1_score(all_true_labels, all_pred_labels))
print("Precision:", precision_score(all_true_labels, all_pred_labels))
print("Recall:", recall_score(all_true_labels, all_pred_labels))
print("\nFull Classification Report:\n")
print(classification_report(all_true_labels, all_pred_labels))

# -----------------------------
# SPAN-LEVEL METRICS
# -----------------------------
correct, total_pred, total_true = 0, 0, 0
for pred_seq, true_seq in zip(all_pred_labels, all_true_labels):
    pred_spans = set(extract_spans(pred_seq))
    true_spans = set(extract_spans(true_seq))
    correct += len(pred_spans & true_spans)
    total_pred += len(pred_spans)
    total_true += len(true_spans)

precision = correct / total_pred if total_pred > 0 else 0
recall = correct / total_true if total_true > 0 else 0
f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0

print("\nSpan-level Metrics:")
print(f"Precision: {precision:.4f}, Recall: {recall:.4f}, F1: {f1:.4f}")


model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for batch in test_loader:
        batch = {k: v.to(device) for k, v in batch.items()}
        outputs = model(
            input_ids=batch["input_ids"],
            attention_mask=batch["attention_mask"],
            labels=batch["labels"]
        )
        logits = outputs.logits
        predictions = torch.argmax(logits, dim=-1)

        # Flatten predictions and labels
        all_preds.extend(predictions.cpu().numpy().flatten())
        all_labels.extend(batch["labels"].cpu().numpy().flatten())

# You can now compute metrics
from sklearn.metrics import classification_report

# Remove -100 labels (ignored tokens)
mask = [l != -100 for l in all_labels]
filtered_preds = [p for p, m in zip(all_preds, mask) if m]
filtered_labels = [l for l, m in zip(all_labels, mask) if m]

print(classification_report(filtered_labels, filtered_preds))

# Train model with Validation Set

In [None]:
from tqdm import tqdm
from sklearn.metrics import accuracy_score
from seqeval.metrics import f1_score, precision_score, recall_score

num_epochs = 100
train_losses, val_losses = [], []
train_accuracies, val_accuracies = [], []

for epoch in range(num_epochs):
    print(f"Epoch {epoch + 1}")
    
    # -------------------------
    # TRAINING
    # -------------------------
    model.train()
    total_loss = 0
    all_preds, all_labels = [], []

    for batch in tqdm(train_loader):
        optimizer.zero_grad()
        batch = {k: v.to(device) for k, v in batch.items()}

        outputs = model(
            input_ids=batch["input_ids"],
            attention_mask=batch["attention_mask"],
            labels=batch["labels"]
        )

        loss = outputs.loss
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

        logits = outputs.logits
        preds = torch.argmax(logits, dim=-1)

        for pred_seq, label_seq in zip(preds, batch["labels"]):
            pred_seq = pred_seq.cpu().numpy()
            label_seq = label_seq.cpu().numpy()
            mask = label_seq != -100
            all_preds.extend(pred_seq[mask])
            all_labels.extend(label_seq[mask])

    epoch_loss = total_loss / len(train_loader)
    epoch_acc = accuracy_score(all_labels, all_preds)
    train_losses.append(epoch_loss)
    train_accuracies.append(epoch_acc)
    print(f"Training Loss: {epoch_loss:.4f}, Accuracy: {epoch_acc:.4f}")

    # -------------------------
    # VALIDATION
    # -------------------------
    model.eval()
    val_loss = 0
    val_preds, val_labels = [], []

    with torch.no_grad():
        for batch in val_loader:
            batch = {k: v.to(device) for k, v in batch.items()}
            outputs = model(
                input_ids=batch["input_ids"],
                attention_mask=batch["attention_mask"],
                labels=batch["labels"]
            )

            val_loss += outputs.loss.item()
            logits = outputs.logits
            preds = torch.argmax(logits, dim=-1)

            for pred_seq, label_seq in zip(preds, batch["labels"]):
                pred_seq = pred_seq.cpu().numpy()
                label_seq = label_seq.cpu().numpy()
                mask = label_seq != -100
                val_preds.extend(pred_seq[mask])
                val_labels.extend(label_seq[mask])

    val_epoch_loss = val_loss / len(val_loader)
    val_epoch_acc = accuracy_score(val_labels, val_preds)
    val_losses.append(val_epoch_loss)
    val_accuracies.append(val_epoch_acc)
    print(f"Validation Loss: {val_epoch_loss:.4f}, Accuracy: {val_epoch_acc:.4f}\n")


## Plot training vs validation

In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(12,5))

# Loss
plt.subplot(1,2,1)
plt.plot(range(1, num_epochs+1), train_losses, marker='o', label='Train Loss')
plt.plot(range(1, num_epochs+1), val_losses, marker='o', label='Val Loss')
plt.title("Loss per Epoch")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()

# Accuracy
plt.subplot(1,2,2)
plt.plot(range(1, num_epochs+1), train_accuracies, marker='o', label='Train Acc')
plt.plot(range(1, num_epochs+1), val_accuracies, marker='o', label='Val Acc')
plt.title("Token-level Accuracy per Epoch")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.legend()

plt.show()


## Test Model

In [None]:
from seqeval.metrics import classification_report, f1_score, precision_score, recall_score

model.eval()
all_pred_labels = []
all_true_labels = []

# Map IDs to BIO tags
id2tag = {0: "B", 1: "I", 2: "O"}  # adjust according to your label mapping

def decode_bio(pred_ids, label_ids):
    pred_labels, true_labels = [], []
    for p, l in zip(pred_ids, label_ids):
        if l == -100:  # ignore padding
            continue
        pred_labels.append(id2tag[p])
        true_labels.append(id2tag[l])
    return pred_labels, true_labels

def extract_spans(bio_seq):
    spans = []
    start = None
    for i, tag in enumerate(bio_seq):
        if tag == "B":
            if start is not None:
                spans.append((start, i-1))
            start = i
        elif tag == "O":
            if start is not None:
                spans.append((start, i-1))
                start = None
    if start is not None:
        spans.append((start, len(bio_seq)-1))
    return spans

# -------------------------
# TEST LOOP
# -------------------------
with torch.no_grad():
    for batch in test_loader:
        batch = {k: v.to(device) for k, v in batch.items()}
        outputs = model(
            input_ids=batch["input_ids"],
            attention_mask=batch["attention_mask"]
        )
        logits = outputs.logits
        preds = torch.argmax(logits, dim=-1)

        for pred_seq, label_seq in zip(preds, batch["labels"]):
            pred_seq = pred_seq.cpu().numpy()
            label_seq = label_seq.cpu().numpy()
            pred_labels, true_labels = decode_bio(pred_seq, label_seq)
            all_pred_labels.append(pred_labels)
            all_true_labels.append(true_labels)


## Tokens level metrics

In [None]:
print("Token-level Metrics:")
print("F1-score:", f1_score(all_true_labels, all_pred_labels))
print("Precision:", precision_score(all_true_labels, all_pred_labels))
print("Recall:", recall_score(all_true_labels, all_pred_labels))
print("\nClassification Report:\n")
print(classification_report(all_true_labels, all_pred_labels))


## Span level metrics  

In [None]:
correct, total_pred, total_true = 0, 0, 0

for pred_seq, true_seq in zip(all_pred_labels, all_true_labels):
    pred_spans = set(extract_spans(pred_seq))
    true_spans = set(extract_spans(true_seq))
    correct += len(pred_spans & true_spans)
    total_pred += len(pred_spans)
    total_true += len(true_spans)

precision = correct / total_pred if total_pred > 0 else 0
recall = correct / total_true if total_true > 0 else 0
f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0

print("\nSpan-level Metrics:")
print(f"Precision: {precision:.4f}, Recall: {recall:.4f}, F1: {f1:.4f}")


## Save model and his metrics

In [None]:
import os

save_path = "/kaggle/working/"  # choose your folder
os.makedirs(save_path, exist_ok=True)

# Save the model
model.save_pretrained(save_path)

# Save the tokenizer
tokenizer.save_pretrained(save_path)

print(f"Model saved to {save_path}")


In [None]:
import json
import os

results_path = "/kaggle/working/"
os.makedirs(results_path, exist_ok=True)


# Suppose you have all_pred_labels and all_true_labels from test
with open(os.path.join(results_path, "predictions.json"), "w") as f:
    json.dump({
        "pred_labels": all_pred_labels,
        "true_labels": all_true_labels
    }, f, indent=4)


In [None]:
metrics = {
    "token_level": {
        "precision": precision_score(all_true_labels, all_pred_labels),
        "recall": recall_score(all_true_labels, all_pred_labels),
        "f1": f1_score(all_true_labels, all_pred_labels)
    },
    "span_level": {
        "precision": precision,
        "recall": recall,
        "f1": f1
    }
}

with open(os.path.join(results_path, "metrics.json"), "w") as f:
    json.dump(metrics, f, indent=4)


In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(12,5))

plt.subplot(1,2,1)
plt.plot(range(1, num_epochs+1), train_losses, marker='o', color='red')
plt.title("Training Loss per Epoch")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.savefig(os.path.join(results_path, "loss_graph.png"))

plt.subplot(1,2,2)
plt.plot(range(1, num_epochs+1), train_accuracies, marker='o', color='blue')
plt.title("Token-level Accuracy per Epoch")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.savefig(os.path.join(results_path, "accuracy_graph.png"))

plt.close()


In [None]:
## test model

from transformers import AutoTokenizer, AutoModelForTokenClassification
import torch

model_path = "/kaggle/working/"
  # your saved folder
model = AutoModelForTokenClassification.from_pretrained(model_path)

model.eval()
