## Import

In [1]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from transformers import AutoTokenizer, AutoModel
import pandas as pd
import numpy as np

## Constructor

In [2]:
class LogicalReasoningModel(nn.Module):
    def __init__(self, model_name, num_labels):
        super(LogicalReasoningModel, self).__init__()
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModel.from_pretrained(model_name)
        self.classifier = nn.Sequential(
            nn.Linear(self.model.config.hidden_size, 256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, num_labels)
        )

    def forward(self, context, question, options):
        # Prepare input for the transformer model
        inputs = [
            self.tokenizer(
                f"{context} [SEP] {question} [SEP] {option}",
                truncation=True,
                padding="max_length",
                max_length=512,
                return_tensors="pt"
            ) for option in options
        ]

        # Process each option and aggregate
        outputs = []
        for input_data in inputs:
            input_data = {k: v.to(self.model.device) for k, v in input_data.items()}
            output = self.model(**input_data).last_hidden_state[:, 0, :]  # CLS token output
            outputs.append(output)

        # Stack outputs and classify
        logits = torch.stack([self.classifier(output) for output in outputs], dim=1)
        return logits

class LogicalReasoningDataset(Dataset):
    def __init__(self, dataframe, tokenizer, max_length, is_train=True):
        self.data = dataframe
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.is_train = is_train

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        row = self.data.iloc[index]
        context = row['context']
        question = row['question']
        options = eval(row['answers'])  # Convert string representation to list
        label = row['label'] if self.is_train else -1

        return context, question, options, label


## Training Configuration

In [4]:
# Training configuration
MODEL_NAME = "bert-base-uncased"
NUM_LABELS = 4
BATCH_SIZE = 16
EPOCHS = 10
LEARNING_RATE = 2e-5

def load_data(file_path):
    return pd.read_csv(file_path)

train_file_path = "train.csv"
test_file_path = "test.csv"
submission_file_path = "sample_submission.csv"

train_data = load_data(train_file_path)
test_data = load_data(test_file_path)
submission_template = load_data(submission_file_path)

train_dataset = LogicalReasoningDataset(train_data, AutoTokenizer.from_pretrained(MODEL_NAME), max_length=512, is_train=True)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)

# Initialise
model = LogicalReasoningModel(model_name=MODEL_NAME, num_labels=NUM_LABELS)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Optimiser and loss
optimizer = torch.optim.AdamW(model.parameters(), lr=LEARNING_RATE)
criterion = nn.CrossEntropyLoss()


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/440M [00:00<?, ?B/s]

## Training loop

In [5]:

# Training loop
for epoch in range(EPOCHS):
    model.train()
    total_loss = 0
    for batch in train_loader:
        context, question, options, labels = batch
        labels = labels.to(device)

        logits = model(context[0], question[0], options[0])  # Process first example in batch

        optimizer.zero_grad()
        loss = criterion(logits.view(-1, NUM_LABELS), labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    print(f"Epoch {epoch + 1}/{EPOCHS}, Loss: {total_loss / len(train_loader)}")

# Save model
torch.save(model.state_dict(), "logical_reasoning_model.pth")


KeyboardInterrupt: 

## Predict

In [None]:

# Prediction on test set
def predict_test_data(model, test_data, tokenizer, device):
    model.eval()
    test_dataset = LogicalReasoningDataset(test_data, tokenizer, max_length=512, is_train=False)
    test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

    predictions = []
    with torch.no_grad():
        for context, question, options, _ in test_loader:
            logits = model(context[0], question[0], options[0])
            probabilities = torch.softmax(logits, dim=-1)
            best_option = torch.argmax(probabilities, dim=-1).item()
            predictions.append(best_option)

    return predictions

# Generate predictions
predictions = predict_test_data(model, test_data, AutoTokenizer.from_pretrained(MODEL_NAME), device)

# Create submission file
submission_template['label'] = predictions
submission_template.to_csv("submission.csv", index=False)
print("Submission file saved as submission.csv")


In [3]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import pandas as pd
from sklearn.metrics import accuracy_score
import random

class ReasoningDataset(Dataset):
    def __init__(self, dataframe, tokenizer, max_length, augment=False):
        self.data = dataframe
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.augment = augment

    def __len__(self):
        return len(self.data)

    def augment_data(self, context, question):
        # Simple augmentation: shuffle words in context and question
        if self.augment:
            context_words = context.split()
            question_words = question.split()
            random.shuffle(context_words)
            random.shuffle(question_words)
            context = " ".join(context_words)
            question = " ".join(question_words)
        return context, question

    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        context = row['context']
        question = row['question']
        context, question = self.augment_data(context, question)
        input_text = f"{context} [SEP] {question}"
        encoded = self.tokenizer(
            input_text,
            truncation=True,
            padding="max_length",
            max_length=self.max_length,
            return_tensors="pt"
        )
        label = row['label'] if 'label' in row else -1
        return {**encoded, 'labels': torch.tensor(label, dtype=torch.long)}

# Define model
class ReasoningAwareModel(nn.Module):
    def __init__(self, model_name, num_labels):
        super(ReasoningAwareModel, self).__init__()
        self.model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=num_labels)
        self.depth_embedding = nn.Embedding(10, self.model.config.hidden_size)  # Embedding for reasoning depth
        self.complexity_embedding = nn.Embedding(10, self.model.config.hidden_size)  # Embedding for complexity

    def forward(self, input_ids, attention_mask, depth=None, complexity=None, labels=None):
        outputs = self.model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
        logits = outputs.logits

        if depth is not None and complexity is not None:
            depth_features = self.depth_embedding(depth)
            complexity_features = self.complexity_embedding(complexity)
            logits += depth_features + complexity_features

        return outputs

# Load dataset
train_file_path = "train.csv"
test_file_path = "test.csv"
submission_file_path = "sample_submission.csv"

train_data = pd.read_csv(train_file_path)
test_data = pd.read_csv(test_file_path)
submission_template = pd.read_csv(submission_file_path)

# Configuration
MODEL_NAME = "microsoft/deberta-v3-large"
MAX_LENGTH = 512
BATCH_SIZE = 16
EPOCHS = 7
LEARNING_RATE = 1e-5

# Tokenizer
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

# Datasets and Dataloaders
train_dataset = ReasoningDataset(train_data, tokenizer, MAX_LENGTH, augment=True)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)

test_dataset = ReasoningDataset(test_data, tokenizer, MAX_LENGTH, augment=False)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

# Model
model = ReasoningAwareModel(MODEL_NAME, num_labels=4)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Optimizer and Loss
optimizer = torch.optim.AdamW(model.parameters(), lr=LEARNING_RATE)
criterion = nn.CrossEntropyLoss()

# Training loop
for epoch in range(EPOCHS):
    model.train()
    total_loss = 0
    for batch in train_loader:
        input_ids = batch['input_ids'].squeeze(1).to(device)
        attention_mask = batch['attention_mask'].squeeze(1).to(device)
        labels = batch['labels'].to(device)

        optimizer.zero_grad()
        outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs.loss
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    print(f"Epoch {epoch + 1}/{EPOCHS}, Loss: {total_loss / len(train_loader)}")

# Save model
torch.save(model.state_dict(), "reasoning_model.pth")

# Prediction loop
def predict(model, dataloader):
    model.eval()
    predictions = []
    true_labels = []
    with torch.no_grad():
        for batch in dataloader:
            input_ids = batch['input_ids'].squeeze(1).to(device)
            attention_mask = batch['attention_mask'].squeeze(1).to(device)

            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            preds = torch.argmax(outputs.logits, dim=-1)
            predictions.extend(preds.cpu().numpy())
            if 'labels' in batch:
                true_labels.extend(batch['labels'].cpu().numpy())
    return predictions, true_labels

# Generate predictions and calculate accuracy
predictions, true_labels = predict(model, test_loader)
if len(true_labels) > 0:
    accuracy = accuracy_score(true_labels, predictions)
    print(f"Test Accuracy: {accuracy:.4f}")

# Create submission file
submission_template['label'] = predictions
submission_template.to_csv("submission.csv", index=False)
print("Submission file saved as submission.csv")


Some weights of DebertaV2ForSequenceClassification were not initialized from the model checkpoint at microsoft/deberta-v3-large and are newly initialized: ['classifier.bias', 'classifier.weight', 'pooler.dense.bias', 'pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


OutOfMemoryError: CUDA out of memory. Tried to allocate 256.00 MiB. GPU 0 has a total capacity of 39.56 GiB of which 80.81 MiB is free. Process 15619 has 39.48 GiB memory in use. Of the allocated memory 38.70 GiB is allocated by PyTorch, and 284.27 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation.  See documentation for Memory Management  (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)