In [None]:
!pip install datasets

from datasets import load_dataset

# Load the dataset

dataset = load_dataset('not-lain/sroie')

In [None]:
# Print the entire structure of the dataset to inspect available fields
print(dataset)

# Print the first example in the training set to see what keys it contains
print(dataset['train'][0].keys())

# Print the first example to see the actual data
print(dataset['train'][0])

In [None]:
from PIL import Image
from transformers import LayoutLMv3Processor
import numpy as np
import torch
from datasets import Dataset

# Clear CUDA cache before training
torch.cuda.empty_cache()

# Initialize the processor with apply_ocr=True
processor = LayoutLMv3Processor.from_pretrained("microsoft/layoutlmv3-base", apply_ocr=True)

def preprocess_data(examples):
    def convert_image(img):
        if isinstance(img, np.ndarray):
            if img.ndim == 2:
                img = np.stack([img] * 3, axis=-1)
            return Image.fromarray(img).convert("RGB")
        elif isinstance(img, Image.Image):
            return img.convert("RGB")
        else:
            raise TypeError("Unsupported image type")

    images = [convert_image(img) for img in examples['images']]
    encoded_inputs = processor(images, padding="max_length", truncation=True, return_tensors="pt")

    labels = examples.get('labels', [0] * len(images))  # Default to zeros if labels are missing
    encoded_inputs['labels'] = torch.tensor(labels)

    return encoded_inputs

def process_dataset_in_batches(dataset, batch_size=8):
    """Process the dataset in small batches to save memory."""
    all_batches = []
    num_batches = len(dataset) // batch_size + (1 if len(dataset) % batch_size != 0 else 0)

    for i in range(num_batches):
        batch = dataset[i * batch_size:(i + 1) * batch_size]
        processed_batch = preprocess_data(batch)
        all_batches.append(processed_batch)

    # Combine all the processed batches into a single dictionary
    combined_batch = {key: torch.cat([b[key] for b in all_batches], dim=0) for key in all_batches[0]}
    
    return Dataset.from_dict({k: v.tolist() for k, v in combined_batch.items()})

# Process the training and test datasets in smaller batches
train_dataset = process_dataset_in_batches(dataset['train'], batch_size=8)
test_dataset = process_dataset_in_batches(dataset['test'], batch_size=8)

In [None]:
from torch.utils.data import DataLoader
from transformers import LayoutLMv3ForTokenClassification, AdamW
from tqdm import tqdm

# Create dataloaders from the processed datasets
def create_dataloader(dataset, batch_size=8):
    return DataLoader(dataset, batch_size=batch_size, shuffle=True)

train_dataloader = create_dataloader(train_dataset, batch_size=8)
test_dataloader = create_dataloader(test_dataset, batch_size=8)

# Initialize the model
model = LayoutLMv3ForTokenClassification.from_pretrained("microsoft/layoutlmv3-base", num_labels=len(train_dataset.features["labels"].feature.names))
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Set up the optimizer
optimizer = AdamW(model.parameters(), lr=5e-5)

# Training loop
def train(model, dataloader, optimizer, device):
    model.train()
    total_loss = 0
    for batch in tqdm(dataloader, desc="Training"):
        optimizer.zero_grad()
        inputs = {key: value.to(device) for key, value in batch.items()}
        outputs = model(**inputs)
        loss = outputs.loss
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(dataloader)

# Evaluation loop
def evaluate(model, dataloader, device):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for batch in tqdm(dataloader, desc="Evaluating"):
            inputs = {key: value.to(device) for key, value in batch.items()}
            outputs = model(**inputs)
            loss = outputs.loss
            total_loss += loss.item()
    return total_loss / len(dataloader)

# Train and evaluate the model
num_epochs = 3
for epoch in range(num_epochs):
    print(f"Epoch {epoch + 1}/{num_epochs}")
    train_loss = train(model, train_dataloader, optimizer, device)
    val_loss = evaluate(model, test_dataloader, device)
    print(f"Training Loss: {train_loss:.4f}")
    print(f"Validation Loss: {val_loss:.4f}")

# Save the model
model.save_pretrained("path_to_save_model")
processor.save_pretrained("path_to_save_processor")

# Load the model (if needed later)
# model = LayoutLMv3ForTokenClassification.from_pretrained("path_to_save_model")
# processor = LayoutLMv3Processor.from_pretrained("path_to_save_processor")

In [None]:
from sklearn.metrics import classification_report
import numpy as np

def evaluate_with_metrics(model, dataloader, device):
    model.eval()
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for batch in tqdm(dataloader, desc="Evaluating"):
            inputs = {key: value.to(device) for key, value in batch.items()}
            outputs = model(**inputs)
            logits = outputs.logits
            predictions = torch.argmax(logits, dim=-1).cpu().numpy()
            labels = inputs["labels"].cpu().numpy()

            # Mask padding labels
            attention_mask = inputs["attention_mask"].cpu().numpy()
            predictions = np.where(attention_mask == 1, predictions, -100)
            labels = np.where(attention_mask == 1, labels, -100)

            all_preds.extend(predictions.flatten())
            all_labels.extend(labels.flatten())
    
    # Remove -100s from both lists
    valid_preds = [p for p, l in zip(all_preds, all_labels) if l != -100]
    valid_labels = [l for l in all_labels if l != -100]

    print(classification_report(valid_labels, valid_preds))
    
# Evaluate on the test set
evaluate_with_metrics(model, test_dataloader, device)

In [None]:
from transformers import LayoutLMv3ForTokenClassification, LayoutLMv3Processor
import torch
from PIL import Image

# Load the model and processor
model = LayoutLMv3ForTokenClassification.from_pretrained("path_to_save_model")
processor = LayoutLMv3Processor.from_pretrained("path_to_save_processor")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

def predict_on_new_image(image_path):
    image = Image.open(image_path).convert("RGB")
    encoded_inputs = processor([image], padding="max_length", truncation=True, return_tensors="pt")
    inputs = {key: value.to(device) for key, value in encoded_inputs.items()}
    
    with torch.no_grad():
        outputs = model(**inputs)
        logits = outputs.logits
        predictions = torch.argmax(logits, dim=-1)
    
    predicted_labels = predictions.cpu().numpy()[0]
    tokens = processor.tokenizer.convert_ids_to_tokens(encoded_inputs['input_ids'][0].cpu().numpy())

    return list(zip(tokens, predicted_labels))

# Example inference
predictions = predict_on_new_image("path_to_new_image.png")
print(predictions)