In [33]:
import os
import matplotlib.pyplot as plt
from PIL import Image
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms, models
import pytesseract
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
import time

In [2]:
# Dataset path
dataset_dir = 'Tobacco3482-10'

In [3]:
# Define transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

In [4]:
# Personalize the dataset for PyTorch compatibility
class TobaccoDataset(Dataset):
    def __init__(self, img_dir, transform=None):
        self.img_dir = img_dir
        self.transform = transform
        self.classes = [d for d in os.listdir(img_dir) if os.path.isdir(os.path.join(img_dir, d))]
        self.files = []
        for label in self.classes:
            class_dir = os.path.join(img_dir, label)
            class_files = [(os.path.join(class_dir, file), label) for file in os.listdir(class_dir) if file.endswith('.jpg')]
            self.files.extend(class_files)

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        img_path, label = self.files[idx]
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image, self.classes.index(label), img_path


In [5]:
# Set the dataset and define the train/val split
dataset = TobaccoDataset(dataset_dir, transform=transform)
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])

In [6]:
# Define data loaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

For next section, be sure to have installed tesseract from homebrew: `brew install tesseract`

In [7]:
# Extract text from images using TF-IDF vectorizer
def extract_text_from_image(image_path):
    return pytesseract.image_to_string(image_path)

In [8]:
train_texts = [extract_text_from_image(img_path) for _, _, img_path in train_dataset]
val_texts = [extract_text_from_image(img_path) for _, _, img_path in val_dataset]

In [9]:
vectorizer = TfidfVectorizer(max_features=1000)
train_text_features = vectorizer.fit_transform(train_texts).toarray()
val_text_features = vectorizer.transform(val_texts).toarray()

In [10]:
train_text_features = torch.tensor(train_text_features, dtype=torch.float32)
val_text_features = torch.tensor(val_text_features, dtype=torch.float32)

In [24]:
# Define the CombinedNN model
class CombinedNN(nn.Module):
    def __init__(self, num_classes, dropout_rate=0.5):
        super(CombinedNN, self).__init__()

        # VGG16 to extract visual features
        self.vgg16 = models.vgg16(weights="IMAGENET1K_V1")
        self.vgg16.classifier = nn.Sequential(*list(self.vgg16.classifier.children())[:-1])  # Rimuovere l'ultimo livello

        # NN for visual features
        self.visual_fc1 = nn.Linear(4096, 512)
        self.visual_dropout1 = nn.Dropout(p=dropout_rate)
        self.visual_fc2 = nn.Linear(512, 256)
        self.visual_dropout2 = nn.Dropout(p=dropout_rate)
        self.visual_fc3 = nn.Linear(256, 128)
        self.visual_dropout3 = nn.Dropout(p=dropout_rate)

        # NN for textual features
        self.text_fc1 = nn.Linear(1000, 512)
        self.text_dropout1 = nn.Dropout(p=dropout_rate)
        self.text_fc2 = nn.Linear(512, 256)
        self.text_dropout2 = nn.Dropout(p=dropout_rate)
        self.text_fc3 = nn.Linear(256, 128)
        self.text_dropout3 = nn.Dropout(p=dropout_rate)

        # Concatenation and final layer
        self.final_fc = nn.Linear(128 + 128, num_classes)

    def forward(self, images, texts):
        # Visual features
        x1 = self.vgg16(images)
        x1 = torch.relu(self.visual_fc1(x1))
        x1 = self.visual_dropout1(x1)
        x1 = torch.relu(self.visual_fc2(x1))
        x1 = self.visual_dropout2(x1)
        x1 = torch.relu(self.visual_fc3(x1))
        x1 = self.visual_dropout3(x1)

        # Textual features
        x2 = torch.relu(self.text_fc1(texts))
        x2 = self.text_dropout1(x2)
        x2 = torch.relu(self.text_fc2(x2))
        x2 = self.text_dropout2(x2)
        x2 = torch.relu(self.text_fc3(x2))
        x2 = self.text_dropout3(x2)

        # Concatenation and final layer
        combined = torch.cat((x1, x2), dim=1)
        output = self.final_fc(combined)

        return output

# Number of classes in the dataset
num_classes = len(dataset.classes)

# Model creation
model = CombinedNN(num_classes=num_classes)

# Model Compiling
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=0.001)


In [None]:
# Move the model to the GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

In [26]:
# Custom summary function. 
# Pytorch summary function does not work for CombinedNN since our model takes multiple inputs
def print_model_summary(model, input_shapes):
    def register_hook(module):
        def hook(module, input, output):
            class_name = str(module.__class__).split(".")[-1].split("'")[0]
            module_idx = len(summary)

            m_key = "%s-%i" % (class_name, module_idx + 1)
            summary[m_key] = OrderedDict()
            summary[m_key]["input_shape"] = list(input[0].size())
            summary[m_key]["output_shape"] = list(output.size())
            summary[m_key]["trainable"] = any(p.requires_grad for p in module.parameters())
            params = 0
            for p in module.parameters():
                params += torch.prod(torch.tensor(p.size()))
            summary[m_key]["nb_params"] = params

        if (
            not isinstance(module, nn.Sequential)
            and not isinstance(module, nn.ModuleList)
            and not (module == model)
        ):
            hooks.append(module.register_forward_hook(hook))

    import numpy as np
    from collections import OrderedDict

    summary = OrderedDict()
    hooks = []

    model.apply(register_hook)

    # Create inputs with the correct shape
    inputs = [torch.rand(2, *in_shape).to(device) for in_shape in input_shapes]
    model(*inputs)

    for h in hooks:
        h.remove()

    print("----------------------------------------------------------------")
    print("        Layer (type)               Output Shape         Param #")
    print("================================================================")
    total_params = 0
    total_output = 0
    trainable_params = 0
    for layer in summary:
        line_new = "{:>25}  {:>25} {:>15}".format(
            layer,
            str(summary[layer]["output_shape"]),
            "{0:,}".format(summary[layer]["nb_params"]),
        )
        total_params += summary[layer]["nb_params"]
        total_output += np.prod(summary[layer]["output_shape"])
        if "trainable" in summary[layer]:
            if summary[layer]["trainable"]:
                trainable_params += summary[layer]["nb_params"]
        print(line_new)

    print("================================================================")
    print("Total params: {0:,}".format(total_params))
    print("Trainable params: {0:,}".format(trainable_params))
    print("Non-trainable params: {0:,}".format(total_params - trainable_params))
    print("----------------------------------------------------------------")

In [None]:
# Print model summary
print_model_summary(model, [(3, 224, 224), (1000,)])

In [13]:
# Dummy inputs
dummy_images = torch.randn(1, 3, 224, 224).to(device)
dummy_texts = torch.randn(1, 1000).to(device)

In [29]:
# Visualize the model
from torchviz import make_dot
output = model(dummy_images, dummy_texts)
dot = make_dot(output, params=dict(model.named_parameters()))
dot.render("CombinedNN_architecture", format="png")

'CombinedNN_architecture.png'

### Training

In [31]:
# Define training function
def train_model(model, train_loader, val_loader, train_text_features, val_text_features, criterion, optimizer, num_epochs=20, patience=3):
    train_loss_list = []
    val_loss_list = []
    accuracy_list = []
    best_val_loss = float('inf')
    epochs_no_improve = 0
 
    for epoch in range(num_epochs):
        start_time = time.time()
        
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0
        
        for i, (images, labels, _) in enumerate(train_loader):
            optimizer.zero_grad()
            texts = train_text_features[i * images.size(0):(i + 1) * images.size(0)].to(device)
            images, labels = images.to(device), labels.to(device)
            
            outputs = model(images, texts)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
        
        train_loss = running_loss / len(train_loader)
        train_accuracy = 100 * correct / total

        val_loss = 0.0
        val_correct = 0
        val_total = 0
        model.eval()
        
        with torch.no_grad():
            for i, (images, labels, _) in enumerate(val_loader):
                texts = val_text_features[i * images.size(0):(i + 1) * images.size(0)].to(device)
                images, labels = images.to(device), labels.to(device)
                
                outputs = model(images, texts)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                
                _, predicted = torch.max(outputs.data, 1)
                val_total += labels.size(0)
                val_correct += (predicted == labels).sum().item()
        
        val_loss /= len(val_loader)
        val_accuracy = 100 * val_correct / val_total
        
        epoch_time = time.time() - start_time

        train_loss_list.append(train_loss)
        val_loss_list.append(val_loss)
        accuracy_list.append(val_accuracy)

        print(f'Epoch [{epoch + 1}/{num_epochs}], '
              f'Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.2f}%, '
              f'Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.2f}%, '
              f'Time: {epoch_time:.2f}s')
        
        # Early stopping
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            epochs_no_improve = 0
        else:
            epochs_no_improve += 1
        
        if epochs_no_improve == patience:
            print("Early stopping triggered")
            break

    return train_loss_list, val_loss_list, accuracy_list

In [None]:
# Model training 
train_loss, validation_loss, accuracy = train_model(model, train_loader, val_loader, train_text_features, val_text_features, criterion, optimizer, num_epochs=30, patience=3)

In [None]:
model_save_path = "CombinedNN.pth"
torch.save(model.state_dict(), model_save_path)
print(f"Model saved to {model_save_path}")

In [None]:
# Plot training loss, validation loss and accuracy
plt.figure(figsize=(12, 10))

plt.subplot(2, 1, 1)
plt.plot(train_loss, label='Training Loss')
plt.plot(validation_loss, label='Validation Loss')
plt.title('Training vs. Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.xticks(range(len(train_loss)))

plt.subplot(2, 1, 2)
plt.plot(accuracy, label='Validation Accuracy')
plt.title('Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy (%)')
plt.legend()
plt.xticks(range(len(accuracy)))

plt.tight_layout()
plt.show()

### Test
Contrary to our predictions, the model does not yield optimal results. We decided not to investigate the performances on the test set, but we left the code anyway

In [None]:
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import numpy as np
from sklearn.metrics import accuracy_score, f1_score, precision_score, confusion_matrix, classification_report

In [None]:
# Directory paths
test_dir = "/Users/simone/Desktop/Luiss /*Machine Learning/Reply project/Document-classification/Reply_dataset"

In [None]:
# Data transforms for the test set
test_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [None]:
# Data generator for the test set
test_dataset = datasets.ImageFolder(test_dir, transform=test_transforms)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=32, shuffle=False)

In [None]:
# Load your trained model
model_load_path = "CombinedNN.pth"
model = CombinedNN(num_classes=len(test_dataset.classes)).to(device)
model.load_state_dict(torch.load(model_load_path))
model.eval()  # Set the model to evaluation mode
print(f"Model loaded from {model_load_path}")

In [None]:
# Evaluate the model on the test set
criterion = nn.CrossEntropyLoss()
test_loss = 0.0
correct = 0
total = 0

y_true = []
y_pred = []

In [None]:
# Evaluate the model
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        loss = criterion(outputs, labels)
        test_loss += loss.item()
        
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        
        y_true.extend(labels.cpu().numpy())
        y_pred.extend(predicted.cpu().numpy())

test_loss /= len(test_loader)
test_acc = 100 * correct / total

print(f'Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.2f}%')

In [None]:
# Calculate evaluation metrics
accuracy = accuracy_score(y_true, y_pred)
f1 = f1_score(y_true, y_pred, average='weighted')
precision = precision_score(y_true, y_pred, average='weighted')
conf_matrix = confusion_matrix(y_true, y_pred)
class_report = classification_report(y_true, y_pred, target_names=test_dataset.classes)
 
print(f"Accuracy: {accuracy} \n \nF1 Score: {f1}\n \nPrecision: {precision}\n \nConfusion Matrix:\n {conf_matrix}\n \nClassification Report:\n {class_report}")

In [None]:
# Plot confusion matrix
plt.figure(figsize=(10, 8))
plt.imshow(conf_matrix, interpolation='nearest', cmap=plt.cm.Blues)
plt.title('Confusion Matrix')
plt.colorbar()
tick_marks = np.arange(len(test_dataset.classes))
plt.xticks(tick_marks, test_dataset.classes, rotation=45)
plt.yticks(tick_marks, test_dataset.classes)
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.show()