## import library 

In [None]:
!python -m pip install 'git+https://github.com/facebookresearch/detectron2.git'
!pip install mlflow dagshub -q

In [None]:
import os

import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import torch
import cv2

from datasets import (Array2D, Array3D, ClassLabel, Dataset, Features,
                      Sequence, Value)
from PIL import Image
from sklearn.model_selection import train_test_split 
from tqdm import tqdm
from transformers import (LayoutLMv2FeatureExtractor,
                          LayoutLMv2ForSequenceClassification,
                          LayoutLMv2Processor, LayoutLMv2Tokenizer)

### Params

In [None]:



input_size = 224
chanel= 3
epochs = 20
lr=5e-5

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

## Data Ingestion

In [None]:
import os
import cv2 as cv
import random
from sklearn.model_selection import train_test_split

def get_image_paths(path_to_subset):
  # Collect all valid image paths
  paths = []
  labels=[]
  for folder in os.listdir(path_to_subset):
      folder_path = os.path.join(path_to_subset, folder)
      for image in os.listdir(folder_path):
          path_to_image = os.path.join(folder_path, image)

          # Check if image is valid
          img = cv2.imread(path_to_image)
          if img is not None:
              paths.append(path_to_image)
              labels.append(folder)

  data = pd.DataFrame.from_dict({'image_path': paths, 'label': labels})
  return data

In [None]:

df=get_image_paths("/kaggle/input/text-document-images/train")

In [None]:
df.head()

## Train-Test Split¶


In [None]:


train_df, test_df = train_test_split(df, test_size=0.2)

print(f"Train Len:: {len(train_df)}\tTest Len:: {len(test_df)}")

In [None]:

train_df.label.value_counts()

In [None]:

test_df.label.value_counts()

In [None]:


train_dataset = Dataset.from_pandas(train_df)
test_dataset = Dataset.from_pandas(test_df)

## Tokenizer

In [None]:


labels=list(set(df["label"]))
labels

In [None]:

id2label = {v: k for v, k in enumerate(labels)}
label2id = {k: v for v, k in enumerate(labels)}
print(label2id)
print(id2label)

In [None]:


feature_extractor = LayoutLMv2FeatureExtractor()
tokenizer = LayoutLMv2Tokenizer.from_pretrained("microsoft/layoutlmv2-base-uncased")
processor = LayoutLMv2Processor(feature_extractor, tokenizer)


# we need to define custom features
features = Features({
    'image': Array3D(dtype="int64", shape=(chanel, input_size, input_size)),
    'input_ids': Sequence(feature=Value(dtype='int64')),
    'attention_mask': Sequence(Value(dtype='int64')),
    'token_type_ids': Sequence(Value(dtype='int64')),
    'bbox': Array2D(dtype="int64", shape=(512, 4)),
    'labels': ClassLabel(num_classes=len(labels), names=labels),
})

In [None]:

def preprocess_data(examples):
    # take a batch of images
    images = [Image.open(path).convert("RGB")
              for path in examples['image_path']]
    encoded_inputs = processor(images, padding="max_length", truncation=True)

    # add labels
    encoded_inputs["labels"] = [label2id[label] for label in examples["label"]]

    return encoded_inputs


In [None]:


print("\nEncoding Dataset")
train_encoded_data = train_dataset.map(preprocess_data, remove_columns=train_dataset.column_names, 
                                       features=features, batched=True, batch_size=2)

train_encoded_data.set_format(type="torch", device=device)


test_encoded_data = test_dataset.map(preprocess_data, remove_columns=test_dataset.column_names, 
                                     features=features, batched=True, batch_size=2)

test_encoded_data.set_format(type="torch", device=device)

In [None]:
# data loaders
train_dataloader = torch.utils.data.DataLoader(train_encoded_data, batch_size=8, shuffle=True)
test_dataloader = torch.utils.data.DataLoader(test_encoded_data, batch_size=8)

## Show sample 

In [None]:

df['image_path'][0]

In [None]:
from PIL import Image, ImageDraw, ImageFont

image = Image.open(df['image_path'][10])
image = image.convert("RGB")
image

In [None]:
encoded_inputs = processor(image, return_tensors="pt")

In [None]:

for k,v in encoded_inputs.items():
  print(k, v.shape)

In [None]:

processor.tokenizer.decode(encoded_inputs.input_ids.squeeze().tolist())

## Modle training

### Model

In [None]:

model = LayoutLMv2ForSequenceClassification.from_pretrained("microsoft/layoutlmv2-base-uncased", 
                                                            num_labels=len(labels))
model.to(device)

## mlflow setup

In [None]:
import dagshub
import mlflow

dagshub.init(repo_owner='kaushigihanml', repo_name='document_classification', mlflow=True)
mlflow.set_tracking_uri("https://dagshub.com/kaushigihanml/document_classification.mlflow")

### Training 

In [None]:

class_names = list(label2id.keys())
class_names

In [None]:

import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix
from sklearn.metrics import accuracy_score, classification_report
import numpy as np
import mlflow


def eval_prediction(test_dataloader,model):

    #model.eval()
    predict_label=[]
    true_label=[]
    for batch in tqdm(test_dataloader, desc="Evaluating"):
        with torch.no_grad():
            input_ids = batch['input_ids'].to(device)
            bbox = batch['bbox'].to(device)
            image = batch['image'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            token_type_ids = batch['token_type_ids'].to(device)
            labels = batch['labels'].to(device)
    
            # forward pass
            outputs = model(input_ids=input_ids, bbox=bbox, image=image, attention_mask=attention_mask, 
                            token_type_ids=token_type_ids, labels=labels)
            predictions = outputs.logits.argmax(-1)
            predict_label.extend(predictions)
            true_label.extend(labels)
    
            
    int_pred_list = [t.item() for t in predict_label]
    int_real_list = [t.item() for t in true_label]

    return int_pred_list,int_real_list



def evaluate(model,test_dataloader):
    # Assuming 'true_labels' are the true labels and 'predicted_labels' are the predicted labels
    predicted_labels,true_labels=eval_prediction(test_dataloader,model)

    # Define class labels
    class_names = list(label2id.keys())
    
    # Create the confusion matrix
    cm = confusion_matrix(true_labels, predicted_labels)
    
    # Create a heatmap of the confusion matrix
    plt.figure(figsize=(8, 6))
    sns.set(font_scale=1.2)
    sns.heatmap(cm, annot=True, cmap='Blues', fmt='g', xticklabels=class_names, yticklabels=class_names)
    
    plt.title("Confusion Matrix")
    plt.ylabel("True Label")
    plt.xlabel("Predicted Label")
    plt.savefig("confusion_matrix.png", dpi=300, bbox_inches="tight")
    mlflow.log_artifact("confusion_matrix.png")
    plt.close()
    
    
    # Classification report
    report = classification_report(
        true_labels,predicted_labels, target_names=class_names, output_dict=True
    )
    #print(report)
    
    # Log per-class metrics
    for class_name in class_names:
        if class_name in report:
            mlflow.log_metrics(
                {
                    f"{class_name}_precision": report[class_name]["precision"],
                    f"{class_name}_recall": report[class_name]["recall"],
                    f"{class_name}_f1": report[class_name]["f1-score"],
                }
            )

In [None]:
 eval_prediction(test_dataloader,model)

## Training

In [None]:
import mlflow
from torch.optim import AdamW
from tqdm.notebook import tqdm

# Start MLflow run
mlflow.set_experiment("Layout_model")
mlflow.start_run(run_name="layoutlmv2",nested=True)

# Log hyperparameters
mlflow.log_params({
    "learning_rate": 5e-5,
    "num_train_epochs": epochs,
    "optimizer": "AdamW"
})
with open("label2id.txt", "w") as f:
    for key, value in label2id.items():
        f.write(f"{key}: {value}\n")
optimizer = AdamW(model.parameters(), lr=5e-5)

global_step = 0

model.train()
for epoch in range(epochs):
    print("Epoch:", epoch)
    running_loss = 0.0
    correct = 0
    total_samples = 0

    for batch in tqdm(train_dataloader):
        outputs = model(**batch)
        loss = outputs.loss

        running_loss += loss.item()
        predictions = outputs.logits.argmax(-1)
        correct += (predictions == batch['labels']).float().sum()
        total_samples += batch['labels'].size(0)

        # Backward pass and optimization step
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        global_step += 1

        # Optionally log batch-level loss
        if global_step % 100 == 0:
            mlflow.log_metric("batch_loss", loss.item(), step=global_step)

    epoch_loss = running_loss / total_samples
    accuracy = 100 * correct / total_samples

    print(f"Loss: {epoch_loss:.4f}")
    print(f"Training accuracy: {accuracy.item():.2f}%")

    # Log epoch metrics
    mlflow.log_metric("epoch_loss", epoch_loss, step=epoch)
    mlflow.log_metric("epoch_accuracy", accuracy.item(), step=epoch)

# Optionally log the final model
#mlflow.pytorch.log_model(model, "model")
#run evaluatorS
evaluate(model=model,test_dataloader=test_dataloader)
model.save_pretrained("model.pth")
mlflow.log_artifact("/kaggle/working/model.pth")
mlflow.log_artifact("/kaggle/working/label2id.txt")
mlflow.end_run()


## Inference

In [None]:

from PIL import Image, ImageDraw, ImageFont
from transformers import LayoutLMv2ForSequenceClassification
from transformers import (LayoutLMv2FeatureExtractor,
                          LayoutLMv2ForSequenceClassification,
                          LayoutLMv2Processor, LayoutLMv2Tokenizer)


try:
    feature_extractor = LayoutLMv2FeatureExtractor()
    tokenizer = LayoutLMv2Tokenizer.from_pretrained("microsoft/layoutlmv2-base-uncased")
    processor = LayoutLMv2Processor(feature_extractor, tokenizer)
    load_model = LayoutLMv2ForSequenceClassification.from_pretrained("/kaggle/working/model.pth")
    model.to(device)
except Exception as e:
    raise e


def layout_model_prediction(img_path):
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    image = Image.open(img_path)
    image = image.convert("RGB")
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    # prepare image for the model
    encoded_inputs = processor(
    image,
    return_tensors="pt",
    truncation=True,
    padding="max_length",
    max_length=512)
 
    # make sure all keys of encoded_inputs are on the same device as the model
    for k,v in encoded_inputs.items():
      encoded_inputs[k] = v.to(model.device)
        
    load_model.to(device)
    # forward pass
    outputs = load_model(**encoded_inputs)
    logits = outputs.logits

    predicted_class_idx = logits.argmax(-1).item()
    print("Predicted class:", id2label[predicted_class_idx])

    return id2label[predicted_class_idx]
 

In [None]:



layout_model_prediction(df['image_path'][25])