In [1]:
import fitz
import os

def convert_pdfs_to_images(pdf_dir, output_dir):
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    
    for filename in os.listdir(pdf_dir):
        if filename.endswith(".pdf"):
            pdf_path = os.path.join(pdf_dir, filename)
            doc = fitz.open(pdf_path)
            
            for i in range(len(doc)):
                page = doc.load_page(i)
                pix = page.get_pixmap(dpi=200)
                image_filename = f"{filename}_page_{i}.png"
                image_path = os.path.join(output_dir, image_filename)
                pix.save(image_path)

            doc.close()

# Convert PDFs to images
convert_pdfs_to_images('data-v1/bankruptcy', 'data-v1/bankruptcy_images')
convert_pdfs_to_images('data-v1/non-bankruptcy', 'data-v1/non-bankruptcy_images')


In [2]:
import os
from sklearn.model_selection import train_test_split

bankruptcy_images = [os.path.join('data-v1/bankruptcy_images', f) for f in os.listdir('data-v1/bankruptcy_images')]

non_bankruptcy_images = [os.path.join('data-v1/non-bankruptcy_images', f) 
                   for f in os.listdir('data-v1/non-bankruptcy_images')]

bankruptcy_labels = [0] * len(bankruptcy_images)
non_bankruptcy_labels = [1] * len(non_bankruptcy_images)

images = bankruptcy_images + non_bankruptcy_images
labels = bankruptcy_labels + non_bankruptcy_labels

train_images, test_images, train_labels, test_labels = train_test_split(
    images, labels, test_size=0.2, stratify=labels, random_state=42
)

print(f"Training data: {len(train_images)} images")
print(f"Testing data: {len(test_images)} images")

Training data: 296 images
Testing data: 75 images


In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms, models
from torch.utils.data import Dataset, DataLoader
from PIL import Image

# Dataset class for image data
class ImageDataset(Dataset):
    def __init__(self, image_paths, labels, transform=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        label = self.labels[idx]
        image = Image.open(image_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image, label

In [4]:
# Define transforms for training and validation
train_transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize images to a fixed size
    transforms.RandomHorizontalFlip(),  # Randomly flip images
    transforms.RandomRotation(10),  # Randomly rotate images
    transforms.ToTensor(),  # Convert images to PyTorch tensors
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  # Normalize
])

val_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Create datasets
train_dataset = ImageDataset(train_images, train_labels, transform=train_transform)
val_dataset = ImageDataset(test_images, test_labels, transform=val_transform)

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False)


In [7]:
model = models.resnet18(pretrained=True)

model.fc = nn.Sequential(
    nn.Linear(model.fc.in_features, 1),
    nn.Sigmoid()  # Output probabilities for binary classification
)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4)

In [8]:
import torch
import torch.nn as nn
import torch.optim as optim

class EarlyStopping:
    def __init__(self, patience=5, delta=0.0, checkpoint_path="best_model.pth"):
        self.patience = patience
        self.delta = delta
        self.checkpoint_path = checkpoint_path
        self.best_loss = float('inf')
        self.counter = 0
        self.early_stop = False

    def __call__(self, val_loss, model):
        if val_loss < self.best_loss - self.delta:
            self.best_loss = val_loss
            self.counter = 0
            torch.save(model.state_dict(), self.checkpoint_path)
        else:
            self.counter += 1
            if self.counter >= self.patience:
                print(f"Early stopping triggered. Stopping training.")
                self.early_stop = True

def train_model_with_early_stopping(model, train_loader, val_loader, criterion, optimizer, 
                                    device, num_epochs=20, patience=5):
    early_stopping = EarlyStopping(patience=patience, checkpoint_path="best_model.pth")

    for epoch in range(num_epochs):
        model.train()
        train_loss = 0
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device, dtype=torch.float32)
            optimizer.zero_grad()
            outputs = model(images).view(-1)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()

        model.eval()
        val_loss = 0
        val_correct = 0
        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device, dtype=torch.float32)
                outputs = model(images).view(-1)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                preds = (outputs > 0.5).float()
                val_correct += (preds == labels).sum().item()

        avg_train_loss = train_loss / len(train_loader)
        avg_val_loss = val_loss / len(val_loader)
        val_acc = val_correct / len(val_dataset)

        print(f"Epoch [{epoch+1}/{num_epochs}]")
        print(f"Train Loss: {avg_train_loss:.4f}")
        print(f"Val Loss: {avg_val_loss:.4f}, Val Acc: {val_acc:.4f}")

        early_stopping(avg_val_loss, model)
        if early_stopping.early_stop:
            print("Stopping training early.")
            break

    model.load_state_dict(torch.load("best_model.pth"))
    print("Loaded best model weights.")

    return model

model = models.resnet18(pretrained=True)
model.fc = nn.Sequential(
    nn.Linear(model.fc.in_features, 1),
    nn.Sigmoid()
)
model = model.to(device)

criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

best_model = train_model_with_early_stopping(model, train_loader, val_loader, 
                                             criterion, optimizer, device, 
                                             num_epochs=20, patience=5)

Epoch [1/20]
Train Loss: 0.4383
Val Loss: 0.0267, Val Acc: 0.9867
Epoch [2/20]
Train Loss: 0.2726
Val Loss: 0.0631, Val Acc: 0.9867
Epoch [3/20]
Train Loss: 0.1521
Val Loss: 0.0914, Val Acc: 0.9467
Epoch [4/20]
Train Loss: 0.1358
Val Loss: 0.2064, Val Acc: 0.9733
Epoch [5/20]
Train Loss: 0.1362
Val Loss: 0.1239, Val Acc: 0.9467
Epoch [6/20]
Train Loss: 0.1301
Val Loss: 0.1653, Val Acc: 0.9067
Early stopping triggered. Stopping training.
Stopping training early.
Loaded best model weights.


  model.load_state_dict(torch.load("best_model.pth"))


In [9]:
def evaluate_model(model, val_loader, device):
    model.eval()
    correct = 0
    total = 0
    val_loss = 0.0
    criterion = nn.BCELoss()

    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device, dtype=torch.float32)
            outputs = model(inputs).view(-1)

            loss = criterion(outputs, labels)
            val_loss += loss.item()

            preds = (outputs > 0.5).float()

            total += labels.size(0)
            correct += (preds == labels).sum().item()

    avg_val_loss = val_loss / len(val_loader)
    accuracy = 100 * correct / total
    print(f"Validation Loss: {avg_val_loss:.4f}, Validation Accuracy: {accuracy:.2f}%")

evaluate_model(best_model, val_loader, device)


Validation Loss: 0.0267, Validation Accuracy: 98.67%


In [10]:
import os
import shutil
import fitz  # PyMuPDF
from PIL import Image
import torch
from torchvision import transforms

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

def pdf_to_image(pdf_path, output_image_path, dpi=200):
    pdf_document = fitz.open(pdf_path)
    page = pdf_document[0]
    mat = fitz.Matrix(dpi / 72, dpi / 72)  # Scale by DPI
    pix = page.get_pixmap(matrix=mat)
    pix.save(output_image_path)  # Save as image
    pdf_document.close()

def preprocess_image(image_path):
    image = Image.open(image_path).convert('RGB')
    return transform(image).unsqueeze(0)

def test_model(model, image_path):
    model.eval()
    image_tensor = preprocess_image(image_path)
    with torch.no_grad():
        output = model(image_tensor).view(-1)
        predicted = (output > 0.5).float()
        return 'Bankruptcy' if predicted.item() == 0 else 'Non-Bankruptcy'

def classify_and_sort_files(input_dir, model):
    bankruptcy_dir = os.path.join(input_dir, 'bankruptcy')
    non_bankruptcy_dir = os.path.join(input_dir, 'non-bankruptcy')

    os.makedirs(bankruptcy_dir, exist_ok=True)
    os.makedirs(non_bankruptcy_dir, exist_ok=True)

    for filename in os.listdir(input_dir):
        if filename.endswith('.pdf'):
            pdf_path = os.path.join(input_dir, filename)

            image_filename = f"{filename}.png"
            image_path = os.path.join(input_dir, image_filename)
            pdf_to_image(pdf_path, image_path)

            result = test_model(model, image_path)

            target_dir = bankruptcy_dir if result == 'Bankruptcy' else non_bankruptcy_dir
            shutil.move(pdf_path, os.path.join(target_dir, filename))

            os.remove(image_path)

state_dict = torch.load('best_model.pth', map_location=torch.device('cpu'))
model.load_state_dict(state_dict)

classify_and_sort_files('data-v1/documents', model)


  state_dict = torch.load('best_model.pth', map_location=torch.device('cpu'))
