In [1]:
import subprocess
import sys

def install_package(package):
    subprocess.check_call([sys.executable, "-m", "pip", "install", package])

required_packages = ['transformers', 'torch', 'numpy', 'pandas', 'scikit-learn', 'sacremoses']
for package in required_packages:
    try:
        __import__(package)
    except ImportError:
        install_package(package)

import torch
import numpy as np
import pandas as pd
from transformers import XLMTokenizer, XLMModel
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
from torch import nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

In [2]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model_name = "xlm-mlm-100-1280"
tokenizer = XLMTokenizer.from_pretrained(model_name)
xlm_model = XLMModel.from_pretrained(model_name).to(device)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/2.35k [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/5.72M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/2.97M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/41.9k [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/1.14G [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/1.14G [00:00<?, ?B/s]

In [6]:
def read_data(file_path):
    texts = []
    labels = []

    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            for line_num, line in enumerate(f, 1):
                line = line.strip()
                if not line:
                    continue

                if '\t' in line:
                    parts = line.split('\t')
                    if len(parts) >= 2:
                        text = parts[0]
                        try:
                            sentiment = int(parts[1])
                            texts.append(text)
                            labels.append(sentiment)
                            continue
                        except (ValueError, IndexError):
                            pass

                parts = line.split()
                if parts and len(parts) >= 2:
                    try:
                        label = int(parts[-1])
                        text = ' '.join(parts[:-1])
                        texts.append(text)
                        labels.append(label)
                        continue
                    except ValueError:
                        pass

                for label_value in [-1, 0, 1]:
                    label_str = str(label_value)
                    if f"\t{label_str}\t" in line:
                        text = line.split(f"\t{label_str}\t")[0]
                        texts.append(text)
                        labels.append(label_value)
                        break
    except FileNotFoundError:
        print(f"{file_path} not found")
    except Exception as e:
        print(f"Error reading{file_path}")

    print(f"parsed {len(texts)} examples from {file_path}")
    return texts, labels

train_texts, train_labels = read_data('enma_natural_train.txt')
test_texts, test_labels = read_data('enma_natural_test.txt')

parsed 3452 examples from enma_natural_train.txt
parsed 1000 examples from enma_natural_test.txt


In [7]:
try:
    from transformers import XLMTokenizer, XLMModel
    model_name = "xlm-mlm-100-1280"
    tokenizer = XLMTokenizer.from_pretrained(model_name)
    language_model = XLMModel.from_pretrained(model_name).to(device)
except Exception as e:
    print(f" could not load")

In [8]:
def get_embeddings(texts, batch_size=8, max_samples=5000):
    if len(texts) > max_samples:
        print(f"{max_samples} from {len(texts)} total")
        texts = texts[:max_samples]

    all_embeddings = []

    for i in range(0, len(texts), batch_size):
        batch_texts = texts[i:i+batch_size]

        inputs = tokenizer(batch_texts, return_tensors="pt", padding=True, truncation=True, max_length=128)
        inputs = {k: v.to(device) for k, v in inputs.items()}

        with torch.no_grad():
            outputs = language_model(**inputs)

        embeddings = outputs.last_hidden_state[:, 0, :].cpu().numpy()
        all_embeddings.append(embeddings)

        if (i + batch_size) % 50 == 0 or i + batch_size >= len(texts):
            print(f"Processed {min(i + batch_size, len(texts))} of {len(texts)} examples")

    if len(all_embeddings) == 0:
        return np.array([])

    return np.vstack(all_embeddings)

train_embeddings = get_embeddings(train_texts, max_samples=5000)
test_embeddings = get_embeddings(test_texts, max_samples=5000)


Processed 200 of 3452 examples
Processed 400 of 3452 examples
Processed 600 of 3452 examples
Processed 800 of 3452 examples
Processed 1000 of 3452 examples
Processed 1200 of 3452 examples
Processed 1400 of 3452 examples
Processed 1600 of 3452 examples
Processed 1800 of 3452 examples
Processed 2000 of 3452 examples
Processed 2200 of 3452 examples
Processed 2400 of 3452 examples
Processed 2600 of 3452 examples
Processed 2800 of 3452 examples
Processed 3000 of 3452 examples
Processed 3200 of 3452 examples
Processed 3400 of 3452 examples
Processed 3452 of 3452 examples
Processed 200 of 1000 examples
Processed 400 of 1000 examples
Processed 600 of 1000 examples
Processed 800 of 1000 examples
Processed 1000 of 1000 examples


In [9]:
if len(train_texts) > 5000:
    train_labels = train_labels[:5000]

if len(test_texts) > 5000:
    test_labels = test_labels[:5000]

if len(train_embeddings) == 0 or len(test_embeddings) == 0:
    raise ValueError

In [10]:
train_labels = np.array(train_labels)
test_labels = np.array(test_labels)

class SentimentClassifier(nn.Module):
    def __init__(self, input_dim, hidden_dim=128, output_dim=3):
        super(SentimentClassifier, self).__init__()
        self.classifier = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(hidden_dim, output_dim)
        )

    def forward(self, x):
        return self.classifier(x)

X_train = torch.tensor(train_embeddings, dtype=torch.float32)
y_train = torch.tensor(train_labels, dtype=torch.long) + 1
X_test = torch.tensor(test_embeddings, dtype=torch.float32)
y_test = torch.tensor(test_labels, dtype=torch.long) + 1

In [11]:
train_dataset = TensorDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

input_dim = train_embeddings.shape[1]
model = SentimentClassifier(input_dim).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=2e-5)

In [12]:
num_epochs = 5
for epoch in range(num_epochs):
    model.train()
    total_loss = 0

    for batch_x, batch_y in train_loader:
        batch_x, batch_y = batch_x.to(device), batch_y.to(device)

        outputs = model(batch_x)
        loss = criterion(outputs, batch_y)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    avg_loss = total_loss / len(train_loader)
    print(f"Epoch {epoch+1}, Loss: {avg_loss:.4f}")

    if (epoch + 1) % 2 == 0 or epoch == num_epochs - 1:
        model.eval()
        with torch.no_grad():
            X_test_device = X_test.to(device)
            outputs = model(X_test_device)
            _, predicted = torch.max(outputs, 1)
            predicted = predicted.cpu().numpy()

            predicted_original = predicted - 1
            test_labels_original = y_test.numpy() - 1

            accuracy = accuracy_score(test_labels_original, predicted_original)
            print(f"Test Accuracy: {accuracy:.4f}")
            print(classification_report(test_labels_original, predicted_original))

model.eval()
with torch.no_grad():
    X_test_device = X_test.to(device)
    outputs = model(X_test_device)
    _, predicted = torch.max(outputs, 1)
    predicted = predicted.cpu().numpy()

    predicted_original = predicted - 1
    test_labels_original = y_test.numpy() - 1

    accuracy = accuracy_score(test_labels_original, predicted_original)
    print(f"Final Test Accuracy: {accuracy:.4f}")
    print(classification_report(test_labels_original, predicted_original))


Epoch 1, Loss: 1.0042
Epoch 2, Loss: 0.9631
Test Accuracy: 0.5540
              precision    recall  f1-score   support

          -1       0.00      0.00      0.00       137
           0       0.54      0.16      0.25       334
           1       0.56      0.95      0.70       529

    accuracy                           0.55      1000
   macro avg       0.37      0.37      0.32      1000
weighted avg       0.47      0.55      0.45      1000

Epoch 3, Loss: 0.9352
Epoch 4, Loss: 0.9092
Test Accuracy: 0.5740
              precision    recall  f1-score   support

          -1       0.00      0.00      0.00       137
           0       0.53      0.31      0.39       334
           1       0.59      0.89      0.71       529

    accuracy                           0.57      1000
   macro avg       0.37      0.40      0.36      1000
weighted avg       0.49      0.57      0.50      1000

Epoch 5, Loss: 0.8954
Test Accuracy: 0.5800
              precision    recall  f1-score   support

       