# Masters Thesis Data Science in Action

Changing annotations
Open the json file with annotations. These are grouped by the image id's and for every id a set is made with all the unique classes that are present in each image according to the annotation. A dictionary is created which has the image id's as key's and the sets of labels as values.

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import json
annotation_file_path = "/content/drive/MyDrive/Thesis BePLiv2/BePLi_dataset_v2/plastic_coco/annotation/all_plastic_coco.json"

with open(annotation_file_path, "r") as annotation_file:
    data = json.load(annotation_file)

In [None]:
categories = data["categories"]
category_id_to_name = {category["id"]: category["name"] for category in categories}

annotations = data["annotations"]

image_labels = dict()

for annotation in annotations:
    image_id = annotation["image_id"]
    category_id = annotation["category_id"]
    annotation["category_name"] = category_id_to_name.get(category_id, "Unknown")
    category_name = annotation["category_name"]

    if image_id not in image_labels:
        image_labels[image_id] = set()

    image_labels[image_id].add(category_name)

In [None]:
count = 0

for image_id, labels in image_labels.items():
    print(f"Image ID: {image_id}, Labels: {labels}")
    count += 1
    if count == 10:
        break

# Opening the images and creating a dataframe
The images folder is openend and the image id's are linked with the image names so the labels from the image_labels dict can be assigned to the correct image. Within the dataframe the filepaths are stored since this takes up less memory than loading in all images into the memory. This way the images can also easily be opened and used for feeding them to deep learning models. These images can still be easily changed into a uniform size before feeding them into a neural network. This ensures that all images have the same dimensions, which is often necessary for neural networks to process them efficiently. Data augmentation is also still a possibility when the data is stored in such a way.

In [None]:
import os
import re
import pandas as pd

folder_path = '/content/drive/MyDrive/Thesis BePLiv2/BePLi_dataset_v2/plastic_coco/images/original_images/'
image_files = os.listdir(folder_path)

# Function to extract image IDs from filenames
def extract_image_id(filename):
    match = re.match(r'(\d+)', filename)
    if match:
        return int(match.group(1))
    return None

# Extract image IDs
image_ids = [extract_image_id(filename) for filename in image_files]

# Remove None values (filenames that don't match the pattern)
image_ids = [image_id for image_id in image_ids if image_id is not None]

# Function to remove "Fragment" and "Others" labels
def remove_fragment_others(labels):
    return set(label for label in labels if label.lower() not in ["fragment", "others"])

labels = [remove_fragment_others(image_labels.get(image_id, "X")) for image_id in image_ids]

data = {
    "Image_ID": image_ids,
    "Filename": image_files,
    "Label": labels,
    "Filepath": [os.path.join(folder_path, filename) for filename in image_files]  # If you want to include file paths
}

df = pd.DataFrame(data)

df.head()


In [None]:
import matplotlib.pyplot as plt
import random

# Define the number of images to display
num_images_to_display = 8

# Select random indices from the dataframe
random_indices = random.sample(range(len(df)), num_images_to_display)

# Create a larger subplot grid
fig, axes = plt.subplots(2, 4, figsize=(16, 10))
axes = axes.flatten()

# Iterate over random indices and display images
for i, idx in enumerate(random_indices):
    filepath = df.iloc[idx]['Filepath']
    labels = df.iloc[idx]['Label']

    # Load and display the image
    img = plt.imread(filepath)
    axes[i].imshow(img)
    axes[i].axis('off')

    # Display labels underneath each other
    axes[i].text(0.5, -0.15, 'Labels:', fontsize=10, ha='center', transform=axes[i].transAxes)
    for j, label in enumerate(labels):
        axes[i].text(0.5, -0.2 - 0.05 * (j + 1), label, fontsize=10, ha='center', transform=axes[i].transAxes)

plt.tight_layout()
plt.show()

# Check class frequencies

In [None]:
from collections import Counter

# Flatten the list of sets into a single list of labels
all_labels = [label if isinstance(label, str) else 'unknown' for labels in df['Label'] for label in labels]

# Count the frequency of each unique label
label_counts = Counter(all_labels)

# Create a bar plot
plt.figure(figsize=(10, 6))
plt.bar(label_counts.keys(), label_counts.values())
plt.title('Class Frequency')
plt.xlabel('Classes')
plt.ylabel('Frequency')
plt.xticks(rotation=45, ha='right')  # Rotate x-axis labels for better readability
plt.tight_layout()
plt.grid(True)
plt.show()

In [None]:
import cv2
import numpy as np

# Function to calculate image statistics
def calculate_image_statistics(image_paths):
    image_data = []
    for path in image_paths:
        # Read the image
        image = cv2.imread(path)
        # Calculate image dimensions (width, height)
        height, width, _ = image.shape
        # Calculate image size (in pixels)
        size = width * height
        # Calculate color distribution (mean and standard deviation of pixel values)
        mean_color = np.mean(image, axis=(0, 1))
        # Calculate pixel intensity distribution (histogram)
        gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        hist = cv2.calcHist([gray_image], [0], None, [256], [0, 256])

        # Append image statistics to list
        image_data.append({
            'Size': size,
            'Mean Color (BGR)': mean_color,
            'Histogram': hist.flatten()
        })
    return image_data

# Calculate image statistics for the dataframe images
image_statistics = calculate_image_statistics(df['Filepath'])

# Visualize image statistics
def visualize_image_statistics(image_statistics):
    # Plot size distribution
    plt.figure(figsize=(8, 6))
    plt.hist([image['Size'] for image in image_statistics], bins=30, color='skyblue')
    plt.title('Image Size Distribution')
    plt.xlabel('Size (pixels)')
    plt.ylabel('Frequency')
    plt.grid(True)
    plt.show()

    # Plot mean color distribution
    mean_colors = np.array([image['Mean Color (BGR)'] for image in image_statistics])
    plt.figure(figsize=(8, 6))
    plt.bar(['B', 'G', 'R'], np.mean(mean_colors, axis=0), color=['blue', 'green', 'red'])
    plt.title('Mean Color Distribution (BGR)')
    plt.xlabel('Channel')
    plt.ylabel('Mean Value')
    plt.grid(True)
    plt.show()

    # Plot pixel intensity distribution (histogram)
    plt.figure(figsize=(8, 6))
    plt.plot(image_statistics[0]['Histogram'], color='orange')
    plt.title('Pixel Intensity Distribution')
    plt.xlabel('Pixel Value')
    plt.ylabel('Frequency')
    plt.grid(True)
    plt.show()

# Visualize image statistics
visualize_image_statistics(image_statistics)

In [None]:
# Save merged DataFrame to a CSV file
file_path = "/content/drive/MyDrive/Thesis BePLiv2/SRQ1_nda_data.csv"
df.to_csv(file_path, index=False)

# Open created dataframe from here and perform necessary proporcessing before feeding to models

In [None]:
# Load the CSV file into a DataFrame
import os
import pandas as pd
import matplotlib.pyplot as plt
from collections import Counter
import cv2
import numpy as np
from PIL import Image
from tqdm import tqdm
from imgaug import augmenters as iaa

file_path = '/content/drive/MyDrive/Thesis BePLiv2/SRQ1_nda_data.csv'
SRQ1_nda_data = pd.read_csv(file_path)

In [None]:
SRQ1_nda_data.head()

In [None]:
import ast

# Function to convert set to list
def set_to_list(label_set):
    return list(label_set)

# Convert 'Label' column from sets to lists
SRQ1_nda_data['Label'] = SRQ1_nda_data['Label'].apply(lambda x: set_to_list(ast.literal_eval(x)))

In [None]:
SRQ1_nda_data.head()

In [None]:
from sklearn.preprocessing import MultiLabelBinarizer

possible_labels = [
    'pet_bottle',
    'styrene_foam',
    'plastic_bag',
    'other_string',
    'fishing_net',
    'other_fishing_gear',
    'buoy',
    'rope',
    'other_container',
    'box_shaped_case',
    'other_bottle'
    ]

# Convert the 'Label' column into a list of lists
labels = SRQ1_nda_data['Label']

# Initialize MultiLabelBinarizer
mlb = MultiLabelBinarizer(classes = possible_labels)

# Fit and transform labels to one-hot encodings
one_hot_labels = pd.DataFrame(mlb.fit_transform(labels), columns=mlb.classes_, index=SRQ1_nda_data.index)

# Check the classes (labels) and their order
print("Classes (labels):", mlb.classes_)

# Check the one-hot encoded labels
print("One-hot encoded labels:")
print(one_hot_labels)

In [None]:
# Add one-hot encoded labels as a new column to SRQ1_data
SRQ1_nda_data['one_hot_labels'] = one_hot_labels.values.tolist()

# Display the updated dataframe
SRQ1_nda_data.head()

In [None]:
from sklearn.model_selection import train_test_split

# Split data into training and test sets
trainval_df, test_df = train_test_split(SRQ1_nda_data, test_size=0.2, random_state=42)
train_df, val_df = train_test_split(trainval_df, test_size=0.2, random_state=42)

# Check class distribution in the training set
train_df['Label'].value_counts()

# Check class distribution in the validation set
val_df['Label'].value_counts()

# Check class distribution in the test set
test_df['Label'].value_counts()

In [None]:
import cv2

# Function to load and preprocess images
def load_and_preprocess_images(image_paths, target_size=(224, 224)):
    images = []
    for path in image_paths:
        # Load image
        image = cv2.imread(path)
        # Resize image
        image = cv2.resize(image, target_size)
        # Add the preprocessed image to the list
        images.append(image)
    return np.array(images)

# Extract image paths and one-hot encoded labels for training, validation, and test sets
trainval_image_paths = trainval_df['Filepath']
test_image_paths = test_df['Filepath']

trainval_labels = trainval_df['one_hot_labels'].tolist()
test_labels = test_df['one_hot_labels'].tolist()

# Load and preprocess images
trainval_images = load_and_preprocess_images(trainval_image_paths)
test_images = load_and_preprocess_images(test_image_paths)

In [None]:
# convert from integers to floats
train_images = trainval_images.astype('float32')
test_images = test_images.astype('float32')

# normalize to range 0-1
train_images = train_images / 255.0
test_images = test_images / 255.0

In [None]:
# Convert train_labels and val_labels to 2D arrays
train_labels_array = np.array(trainval_labels)

test_labels_array = np.array(test_labels)

# Alternative Approach, VGG-16 + XGBoost

In [None]:
import xgboost as xgb
from tensorflow.keras.applications import VGG16
from tensorflow.keras.models import Model

# Step 1: Feature Extraction with VGG16
def extract_vgg16_features(images):
    base_model = VGG16(weights='imagenet', include_top=False)
    model = Model(inputs=base_model.input, outputs=base_model.output)
    features = model.predict(images)
    return features

# Step 2: Train XGBoost Model
# Extract VGG16 features for training and testing data
X_train_features = extract_vgg16_features(train_images)
X_test_features = extract_vgg16_features(test_images)

In [None]:
# Flatten the features array
X_train_features_flat = X_train_features.reshape(X_train_features.shape[0], -1)

# Flatten the test features array
X_test_features_flat = X_test_features.reshape(X_test_features.shape[0], -1)

In [None]:
# Convert labels to a format suitable for XGBoost
y_train_xgb = train_labels_array.astype(int)
y_test_xgb = test_labels_array.astype(int)

# Train XGBoost model
params = {
    'objective': 'binary:logistic',  # Use binary:logistic for binary classification
    'eval_metric': 'logloss',  # Use logloss for binary classification
    'num_round': 100  # Number of boosting rounds
}

# Train one model per label using the one-vs-rest strategy
num_labels = 11
models = []
for label_idx in range(num_labels):
    # Compute the class imbalance ratio for the current label
    class_imbalance_ratio = (len(y_train_xgb) - np.sum(y_train_xgb[:, label_idx])) / np.sum(y_train_xgb[:, label_idx])

    # Set scale_pos_weight to balance the classes
    scale_pos_weight = class_imbalance_ratio

    # Create DMatrix for the current label
    dtrain_label = xgb.DMatrix(X_train_features_flat, label=y_train_xgb[:, label_idx])

    # Train XGBoost model for the current label with scale_pos_weight
    model = xgb.train(params, dtrain_label, num_boost_round=params['num_round'], verbose_eval=False,
                      params={'scale_pos_weight': scale_pos_weight})

    models.append(model)

In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, hamming_loss, multilabel_confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

# Function to evaluate performance
def evaluate_performance(y_true, y_pred):
    # Accuracy
    accuracy = accuracy_score(y_true, y_pred)

    # Precision, Recall, F1-score per label
    precision = precision_score(y_true, y_pred, average=None)
    recall = recall_score(y_true, y_pred, average=None)
    f1 = f1_score(y_true, y_pred, average=None)

    # Macro F1-score
    macro_f1 = f1_score(y_true, y_pred, average='macro')

    # Hamming Loss
    h_loss = hamming_loss(y_true, y_pred)

    # Confusion Matrix
    mcm = multilabel_confusion_matrix(y_true, y_pred)

    return accuracy, precision, recall, f1, macro_f1, h_loss, mcm

In [None]:
# Step 3: Evaluate Model
# Predict probabilities on the test set for each label
y_pred_proba = np.zeros((len(X_test_features), num_labels))
for label_idx, model in enumerate(models):
    dtest_label = xgb.DMatrix(X_test_features_flat)
    y_pred_proba[:, label_idx] = model.predict(dtest_label)

# Convert predicted probabilities to binary predictions
threshold = 0.5  # Adjust threshold as needed
y_pred_binary = (y_pred_proba > threshold).astype(int)

# Evaluate model performance using provided evaluation function
accuracy, precision, recall, f1, macro_f1, h_loss, mcm = evaluate_performance(y_test_xgb, y_pred_binary)

# Print F1-score per label
labels = list(mlb.classes_)
print("F1-score per label:")
for label, f1_score_label in zip(labels, f1):
    print(f"{label}: {f1_score_label}")

# Print macro F1-score
print("Macro F1-score:", macro_f1)

# Print other metrics
print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("Hamming Loss:", h_loss)

# Plot confusion matrix
plt.figure(figsize=(15, 10))
for i, label in enumerate(labels):
    plt.subplot(4, 4, i + 1)
    sns.heatmap(data=mcm[i], annot=True, cmap='Blues', fmt='d')
    plt.title(f'Confusion Matrix - {label}')
    plt.xlabel('Predicted')
    plt.ylabel('True')
plt.tight_layout()
plt.show()

# Training the VGG-16 Baseline


In [None]:
from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Dense, Flatten
from sklearn.preprocessing import MultiLabelBinarizer

# Load pre-trained VGG-16 model
vgg16_base = VGG16(include_top=False, input_shape=(224, 224, 3))

# Freeze the convolutional base
for layer in vgg16_base.layers:
  layer.trainable = False

# Add new classifying layers
#add flatten layer
flat1 = Flatten()(vgg16_base.layers[-1].output)
#add Dense layer
class1 = Dense(128, activation='relu')(flat1)
#add output layer with softmax activation
output = Dense(11, activation='sigmoid')(class1)
# define new model
vgg16_model = Model(inputs=vgg16_base.inputs, outputs=output)

#compile model
vgg16_model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
# fit model
history = vgg16_model.fit(train_images, train_labels_array, epochs=25, verbose=1)

In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, hamming_loss, multilabel_confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.preprocessing import MultiLabelBinarizer

# Function to evaluate performance
def evaluate_performance(y_true, y_pred):
    # Accuracy
    accuracy = accuracy_score(y_true, y_pred)

    # Precision, Recall, F1-score per label
    precision = precision_score(y_true, y_pred, average=None)
    recall = recall_score(y_true, y_pred, average=None)
    f1 = f1_score(y_true, y_pred, average=None)

    # Macro F1-score
    macro_f1 = f1_score(y_true, y_pred, average='macro')

    # Hamming Loss
    h_loss = hamming_loss(y_true, y_pred)

    # Confusion Matrix
    mcm = multilabel_confusion_matrix(y_true, y_pred)

    return accuracy, precision, recall, f1, macro_f1, h_loss, mcm

In [None]:
# Predict probabilities for each class
y_pred = vgg16_model.predict(test_images)

# Convert probabilities to binary predictions
y_pred_binary = (y_pred > 0.5).astype(int)

y_true = test_labels_array

# Evaluate performance
accuracy, precision, recall, f1, macro_f1, h_loss, mcm = evaluate_performance(y_true, y_pred_binary)

# Print F1-score per label
labels = list(mlb.classes_)
print("F1-score per label:")
for label, f1_score_label in zip(labels, f1):
    print(f"{label}: {f1_score_label}")

# Print macro F1-score
print("Macro F1-score:", macro_f1)

# Print other metrics
print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("Hamming Loss:", h_loss)

# Plot confusion matrix
plt.figure(figsize=(15, 10))
for i, label in enumerate(labels):
    plt.subplot(4, 4, i + 1)
    sns.heatmap(data=mcm[i], annot=True, cmap='Blues', fmt='d')
    plt.title(f'Confusion Matrix - {label}')
    plt.xlabel('Predicted')
    plt.ylabel('True')
plt.tight_layout()
plt.show()

#ViT Creation

In [None]:
from transformers import ViTModel

# Load a pretrained ViT model
model = ViTModel.from_pretrained('google/vit-base-patch16-224-in21k')


In [None]:
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
import numpy as np

In [None]:
# Define your dataset class
class CustomDataset(Dataset):
    def __init__(self, images, labels, transform=None):
        self.images = images
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]
        if self.transform:
            image = self.transform(image)
        return image, label

# Check if GPU is available and select the appropriate device
device = torch.device("cuda")
print("Using device:", device)

# Define any necessary transformations for test images (convert to tensor only)
transform = transforms.Compose([
    transforms.ToTensor()
])

num_epochs = 15

# Create datasets and dataloaders
train_dataset = CustomDataset(train_images, train_labels_array, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

# Convert test images to PyTorch tensor
test_images_tensor = torch.from_numpy(test_images).float()
# Move the test images tensor to the same device as your model
test_images_tensor = test_images_tensor.to(device)

# Define loss function and optimizer
criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

## Experimental model 1 -- VGG-16 + Transforer

In [None]:
import torch
import torch.nn as nn
import torchvision.models as models
from transformers import ViTModel

# Define a hybrid model combining VGG16 and ViT
class HybridVGGModel(nn.Module):
    def __init__(self, vgg_features_dim, vit_hidden_dim, num_classes):
        super(HybridVGGModel, self).__init__()
        # Load pretrained VGG16 model without the final classification layer
        vgg16_model = models.vgg16(pretrained=True)
        self.vgg16 = nn.Sequential(*list(vgg16_model.features.children())[:-1])
        self.avgpool_vgg = nn.AdaptiveAvgPool2d((1, 1))  # Global average pooling for VGG16 features
        self.vit = ViTModel.from_pretrained('google/vit-base-patch16-224-in21k')

        # Linear layer to adjust dimensionality of VGG16 features
        self.linear_vgg = nn.Linear(512, vgg_features_dim)  # Adjust 512 to match VGG16 output size

        # Linear layer to adjust dimensionality of ViT features (optional)
        self.linear_vit = nn.Linear(768, vgg_features_dim)  # Adjust ViT output size if needed

        self.classifier = nn.Linear(vgg_features_dim + vit_hidden_dim, num_classes)  # Adjusted for concatenated features

    def forward(self, x):
        vgg_features = self.vgg16(x)
        vgg_features = self.avgpool_vgg(vgg_features)
        vgg_features = vgg_features.view(x.size(0), -1)

        # Apply linear transformation to adjust dimensionality of VGG16 features
        vgg_features = self.linear_vgg(vgg_features)

        vit_output = self.vit(x)['last_hidden_state'][:, 0, :]

        # Optionally apply linear transformation to adjust dimensionality of ViT features
        # vit_output = self.linear_vit(vit_output)

        combined_features = torch.cat((vgg_features, vit_output), dim=1)

        output = self.classifier(combined_features)
        return output



# Instantiate the hybrid model
vgg_features_dim = 512  # Dimensionality of VGG16 features (after global average pooling)
vit_hidden_dim = 768  # Dimensionality of ViT hidden states
num_classes = 11  # Number of output classes (adjust as needed)

model = HybridVGGModel(vgg_features_dim, vit_hidden_dim, num_classes)

# Move model to device (GPU if available)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Freeze the parameters of the VGG16 and ViT models
for param in model.vgg16.parameters():
  param.requires_grad = False
for param in model.vit.parameters():
  param.requires_grad = False

# Use the hybrid model for training, fine-tuning, or inference

In [None]:
#Training loop
for epoch in range(num_epochs):
    model.train()  # Set model to training mode
    running_loss = 0.0
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()  # Zero the gradients

        outputs = model(images)  # Forward pass
        labels = labels.float()
        loss = criterion(outputs, labels)  # Compute loss
        loss.backward()  # Backward pass
        optimizer.step()  # Update weights

        running_loss += loss.item() * images.size(0)

    # Compute average training loss for the epoch
    epoch_loss = running_loss / len(train_loader.dataset)

    # Print training and validation loss for the epoch
    print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {epoch_loss:.4f}")

In [None]:
test_images_tensor = test_images_tensor.permute(0, 3, 1, 2)

In [None]:
# After training the model
# Delete unnecessary variables
del train_loader, optimizer

# Clear computational graph
torch.cuda.empty_cache()

# Now you can use the model for inference without keeping the training data and loader in memory

In [None]:
# During inference, obtain the raw logits from the model's output
with torch.no_grad():
    model.eval()
    outputs = model(test_images_tensor)

# Apply threshold to obtain binary predictions
threshold = 0.5
binary_predictions = (outputs > threshold).int()
# Move the tensor to the CPU before converting it to a NumPy array
binary_predictions_cpu = binary_predictions.cpu()

# Evaluate performance (e.g., accuracy, precision, recall, F1-score)
# You can use the provided evaluate_performance function or any other evaluation method suitable for multilabel classification
accuracy, precision, recall, f1, macro_f1, h_loss, mcm = evaluate_performance(test_labels_array, binary_predictions_cpu)

# Print or visualize evaluation metrics as needed
labels = list(mlb.classes_)

print("F1-score per label:")
for label, f1_score_label in zip(labels, f1):
    print(f"{label}: {f1_score_label}")

# Print macro F1-score
print("Macro F1-score:", macro_f1)

# Print other metrics
print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("Hamming Loss:", h_loss)

# Plot confusion matrix
plt.figure(figsize=(15, 10))
for i, label in enumerate(labels):
    plt.subplot(4, 4, i + 1)
    sns.heatmap(data=mcm[i], annot=True, cmap='Blues', fmt='d')
    plt.title(f'Confusion Matrix - {label}')
    plt.xlabel('Predicted')
    plt.ylabel('True')
plt.tight_layout()
plt.show()

## Experimental model 2 -- ResNet-50 + Transformer

In [None]:
import torch
import torch.nn as nn
import torchvision.models as models
from transformers import ViTModel

class HybridResNetModel(nn.Module):
    def __init__(self, resnet_features_dim, vit_hidden_dim, num_classes):
        super(HybridResNetModel, self).__init__()
        # Load pretrained ResNet-50 model without the final classification layer
        resnet_model = models.resnet50(pretrained=True)
        self.resnet = nn.Sequential(*list(resnet_model.children())[:-2])  # Exclude avgpool and fc layers

        # Average pooling layer to convert spatial features into 1D feature vectors
        self.avgpool_resnet = nn.AdaptiveAvgPool2d((1, 1))

        # Linear layer to adjust dimensionality of ResNet features
        self.linear_resnet = nn.Linear(2048, resnet_features_dim)  # Adjust output size to match ResNet features

        # Pretrained ViT model for processing higher-level features
        self.vit = ViTModel.from_pretrained('google/vit-base-patch16-224-in21k')

        # Classification head
        self.classifier = nn.Linear(resnet_features_dim + vit_hidden_dim, num_classes)

    def forward(self, x):
        resnet_features = self.resnet(x)
        resnet_features = self.avgpool_resnet(resnet_features)
        resnet_features = resnet_features.view(x.size(0), -1)

        # Apply linear transformation to adjust dimensionality of ResNet features
        resnet_features = self.linear_resnet(resnet_features)

        vit_output = self.vit(x)['last_hidden_state'][:, 0, :]

        combined_features = torch.cat((resnet_features, vit_output), dim=1)

        output = self.classifier(combined_features)
        return output

# Instantiate the hybrid ResNet model
resnet_features_dim = 2048  # Dimensionality of ResNet-50 features (after global average pooling)
vit_hidden_dim = 768  # Dimensionality of ViT hidden states
num_classes = 11  # Number of output classes (adjust as needed)

resnet_model = HybridResNetModel(resnet_features_dim, vit_hidden_dim, num_classes)

# Move model to device (GPU if available)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
resnet_model.to(device)

# Freeze the parameters of the ResNet and ViT models
for param in resnet_model.resnet.parameters():
    param.requires_grad = False
for param in resnet_model.vit.parameters():
    param.requires_grad = False

# Use the hybrid ResNet model for training, fine-tuning, or inference

In [None]:
#Training loop
for epoch in range(num_epochs):
    resnet_model.train()  # Set model to training mode
    running_loss = 0.0
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()  # Zero the gradients

        outputs = resnet_model(images)  # Forward pass
        labels = labels.float()
        loss = criterion(outputs, labels)  # Compute loss
        loss.backward()  # Backward pass
        optimizer.step()  # Update weights

        running_loss += loss.item() * images.size(0)

    # Compute average training loss for the epoch
    epoch_loss = running_loss / len(train_loader.dataset)

    # Print training and validation loss for the epoch
    print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {epoch_loss:.4f}")

In [None]:
# During inference, obtain the raw logits from the model's output
with torch.no_grad():
    resnet_model.eval()
    outputs = resnet_model(test_images_tensor)

# Apply threshold to obtain binary predictions
threshold = 0.5
binary_predictions_resnet = (outputs > threshold).int()
# Move the tensor to the CPU before converting it to a NumPy array
binary_predictions_resnet_cpu = binary_predictions_resnet.cpu()

# Evaluate performance (e.g., accuracy, precision, recall, F1-score)
# You can use the provided evaluate_performance function or any other evaluation method suitable for multilabel classification
accuracy, precision, recall, f1, macro_f1, h_loss, mcm = evaluate_performance(test_labels_array, binary_predictions_resnet_cpu)

# Print or visualize evaluation metrics as needed
labels = list(mlb.classes_)

print("F1-score per label:")
for label, f1_score_label in zip(labels, f1):
    print(f"{label}: {f1_score_label}")

# Print macro F1-score
print("Macro F1-score:", macro_f1)

# Print other metrics
print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("Hamming Loss:", h_loss)

# Plot confusion matrix
plt.figure(figsize=(15, 10))
for i, label in enumerate(labels):
    plt.subplot(4, 4, i + 1)
    sns.heatmap(data=mcm[i], annot=True, cmap='Blues', fmt='d')
    plt.title(f'Confusion Matrix - {label}')
    plt.xlabel('Predicted')
    plt.ylabel('True')
plt.tight_layout()
plt.show()

## Experimental model 3 -- DenseNet-169 + Transformer

In [None]:
import torch
import torch.nn as nn
import torchvision.models as models
from transformers import ViTModel

# Define a hybrid model combining DenseNet169 and ViT
class HybridDenseModel(nn.Module):
    def __init__(self, densenet_features_dim, vit_hidden_dim, num_classes):
        super(HybridDenseModel, self).__init__()
        # Load pretrained DenseNet169 model without the final classification layer
        densenet_model = models.densenet169(pretrained=True)
        self.densenet = nn.Sequential(*list(densenet_model.features.children())[:-1])
        self.avgpool_densenet = nn.AdaptiveAvgPool2d((1, 1))  # Global average pooling for DenseNet features
        self.vit = ViTModel.from_pretrained('google/vit-base-patch16-224-in21k')

        # Linear layer to adjust dimensionality of DenseNet features
        self.linear_densenet = nn.Linear(1664, densenet_features_dim)  # Adjust 1664 to match DenseNet output size

        # Linear layer to adjust dimensionality of ViT features (optional)
        self.linear_vit = nn.Linear(768, densenet_features_dim)  # Adjust ViT output size if needed

        self.classifier = nn.Linear(densenet_features_dim + vit_hidden_dim, num_classes)  # Adjusted for concatenated features

    def forward(self, x):
        densenet_features = self.densenet(x)
        densenet_features = self.avgpool_densenet(densenet_features)
        densenet_features = densenet_features.view(x.size(0), -1)

        # Apply linear transformation to adjust dimensionality of DenseNet features
        densenet_features = self.linear_densenet(densenet_features)

        vit_output = self.vit(x)['last_hidden_state'][:, 0, :]

        # Optionally apply linear transformation to adjust dimensionality of ViT features
        # vit_output = self.linear_vit(vit_output)

        combined_features = torch.cat((densenet_features, vit_output), dim=1)

        output = self.classifier(combined_features)
        return output

# Instantiate the hybrid model
densenet_features_dim = 1664  # Dimensionality of DenseNet169 features (after global average pooling)
vit_hidden_dim = 768  # Dimensionality of ViT hidden states
num_classes = 11  # Number of output classes (adjust as needed)

dense_model = HybridDenseModel(densenet_features_dim, vit_hidden_dim, num_classes)

# Move model to device (GPU if available)
device = torch.device("cuda")
model.to(device)

# Freeze the parameters of the DenseNet and ViT models
for param in dense_model.densenet.parameters():
  param.requires_grad = False
for param in dense_model.vit.parameters():
  param.requires_grad = False

# Use the hybrid model for training, fine-tuning, or inference

In [None]:
# Training loop
dense_model.to(device)

for epoch in range(num_epochs):
    dense_model.train()  # Set model to training mode
    running_loss = 0.0
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()  # Zero the gradients

        outputs = dense_model(images)  # Forward pass
        labels = labels.float()
        loss = criterion(outputs, labels)  # Compute loss
        loss.backward()  # Backward pass
        optimizer.step()  # Update weights

        running_loss += loss.item() * images.size(0)

    # Compute average training loss for the epoch
    epoch_loss = running_loss / len(train_loader.dataset)

    # Print training loss for the epoch
    print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {epoch_loss:.4f}")


In [None]:
# During inference, obtain the raw logits from the model's output
with torch.no_grad():
    dense_model.eval()
    outputs = dense_model(test_images_tensor)

# Apply threshold to obtain binary predictions
threshold = 0.5
binary_predictions_dense = (outputs > threshold).int()
# Move the tensor to the CPU before converting it to a NumPy array
binary_predictions_dense_cpu = binary_predictions_dense.cpu()

# Evaluate performance (e.g., accuracy, precision, recall, F1-score)
# You can use the provided evaluate_performance function or any other evaluation method suitable for multilabel classification
accuracy, precision, recall, f1, macro_f1, h_loss, mcm = evaluate_performance(test_labels_array, binary_predictions_dense_cpu)

# Print or visualize evaluation metrics as needed
labels = list(mlb.classes_)

print("F1-score per label:")
for label, f1_score_label in zip(labels, f1):
    print(f"{label}: {f1_score_label}")

# Print macro F1-score
print("Macro F1-score:", macro_f1)

# Print other metrics
print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("Hamming Loss:", h_loss)

# Plot confusion matrix
plt.figure(figsize=(15, 10))
for i, label in enumerate(labels):
    plt.subplot(4, 4, i + 1)
    sns.heatmap(data=mcm[i], annot=True, cmap='Blues', fmt='d')
    plt.title(f'Confusion Matrix - {label}')
    plt.xlabel('Predicted')
    plt.ylabel('True')
plt.tight_layout()
plt.show()

# Data Augmentation for improving robustness and generalizability

In [None]:
import cv2
import numpy as np

# Define your augmentation functions
def adjust_contrast(image):
    # Apply contrast adjustment to the image
    # Decrease brightness and increase contrast
    alpha = 2.0
    beta = -20
    adjusted_image = cv2.convertScaleAbs(image, alpha=alpha, beta=beta)
    return adjusted_image

def apply_filter(image):
    # Apply filtering method to the image
    # Use Gaussian Blur for filtering
    kernel_size = (5, 5)
    filtered_image = cv2.GaussianBlur(image, kernel_size, 0)
    return filtered_image

# Augment images and labels
augmented_images = []
augmented_labels = []

for image, label in zip(train_images, train_labels_array):
    if not np.all(label == 0):
        # Apply augmentation only if label is not all zeros

        # Contrast adjustment
        image_contrast = adjust_contrast(image)

        # Filtering
        image_filtered = apply_filter(image_contrast)

        # Append augmented image and label
        augmented_images.extend([image_contrast, image_filtered])
        augmented_labels.extend([label, label])

# Convert lists to numpy arrays
augmented_images = np.array(augmented_images)
augmented_labels = np.array(augmented_labels)

# Extend original arrays
train_images_extended = np.concatenate((train_images, augmented_images))
train_labels_extended = np.concatenate((train_labels_array, augmented_labels))


In [None]:
# Check the number of instances in train_images
num_instances_original = train_images.shape[0]

# Check the number of instances in train_images_extended
num_instances_extended = train_images_extended.shape[0]

print("Number of instances in original train_images:", num_instances_original)
print("Number of instances in extended train_images:", num_instances_extended)

In [None]:
# Check the number of instances in train_images
num_labels_original = train_labels_array.shape[0]

# Check the number of instances in train_images_extended
num_labels_extended = train_labels_extended.shape[0]

print("Number of instances in original train_images:", num_labels_original)
print("Number of instances in extended train_images:", num_labels_extended)

# Retrain and evaluate best performing model on augmented dataset

VGG-16 Baseline

In [None]:
from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Dense, Flatten
from sklearn.preprocessing import MultiLabelBinarizer

# Load pre-trained VGG-16 model
vgg16_base = VGG16(include_top=False, input_shape=(224, 224, 3))

# Freeze the convolutional base
for layer in vgg16_base.layers:
  layer.trainable = False

# Add new classifying layers
#add flatten layer
flat1 = Flatten()(vgg16_base.layers[-1].output)
#add Dense layer
class1 = Dense(128, activation='relu')(flat1)
#add output layer with softmax activation
output = Dense(11, activation='sigmoid')(class1)
# define new model
vgg16_model = Model(inputs=vgg16_base.inputs, outputs=output)

#compile model
vgg16_model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
# fit model
history = vgg16_model.fit(train_images_extended, train_labels_extended, epochs=25, verbose=1)

In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, hamming_loss, multilabel_confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.preprocessing import MultiLabelBinarizer

# Function to evaluate performance
def evaluate_performance(y_true, y_pred):
    # Accuracy
    accuracy = accuracy_score(y_true, y_pred)

    # Precision, Recall, F1-score per label
    precision = precision_score(y_true, y_pred, average=None)
    recall = recall_score(y_true, y_pred, average=None)
    f1 = f1_score(y_true, y_pred, average=None)

    # Macro F1-score
    macro_f1 = f1_score(y_true, y_pred, average='macro')

    # Hamming Loss
    h_loss = hamming_loss(y_true, y_pred)

    # Confusion Matrix
    mcm = multilabel_confusion_matrix(y_true, y_pred)

    return accuracy, precision, recall, f1, macro_f1, h_loss, mcm

In [None]:
# Predict probabilities for each class
y_pred = vgg16_model.predict(test_images)

# Convert probabilities to binary predictions
y_pred_binary = (y_pred > 0.5).astype(int)

y_true = test_labels_array

# Evaluate performance
accuracy, precision, recall, f1, macro_f1, h_loss, mcm = evaluate_performance(y_true, y_pred_binary)

# Print F1-score per label
labels = list(mlb.classes_)
print("F1-score per label:")
for label, f1_score_label in zip(labels, f1):
    print(f"{label}: {f1_score_label}")

# Print macro F1-score
print("Macro F1-score:", macro_f1)

# Print other metrics
print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("Hamming Loss:", h_loss)

# Plot confusion matrix
plt.figure(figsize=(15, 10))
for i, label in enumerate(labels):
    plt.subplot(4, 4, i + 1)
    sns.heatmap(data=mcm[i], annot=True, cmap='Blues', fmt='d')
    plt.title(f'Confusion Matrix - {label}')
    plt.xlabel('Predicted')
    plt.ylabel('True')
plt.tight_layout()
plt.show()

ResNet-50 + ViT

In [None]:
# Define your dataset class
class CustomDataset(Dataset):
    def __init__(self, images, labels, transform=None):
        self.images = images
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]
        if self.transform:
            image = self.transform(image)
        return image, label

# Check if GPU is available and select the appropriate device
device = torch.device("cuda")
print("Using device:", device)

# Define any necessary transformations for test images (convert to tensor only)
transform = transforms.Compose([
    transforms.ToTensor()
])

num_epochs = 15

# Create datasets and dataloaders
train_dataset = CustomDataset(train_images_extended, train_labels_extended, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

# Convert test images to PyTorch tensor
test_images_tensor = torch.from_numpy(test_images).float()
# Move the test images tensor to the same device as your model
test_images_tensor = test_images_tensor.to(device)

# Define loss function and optimizer
criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

In [None]:
import torch
import torch.nn as nn
import torchvision.models as models
from transformers import ViTModel

class HybridResNetModel(nn.Module):
    def __init__(self, resnet_features_dim, vit_hidden_dim, num_classes):
        super(HybridResNetModel, self).__init__()
        # Load pretrained ResNet-50 model without the final classification layer
        resnet_model = models.resnet50(pretrained=True)
        self.resnet = nn.Sequential(*list(resnet_model.children())[:-2])  # Exclude avgpool and fc layers

        # Average pooling layer to convert spatial features into 1D feature vectors
        self.avgpool_resnet = nn.AdaptiveAvgPool2d((1, 1))

        # Linear layer to adjust dimensionality of ResNet features
        self.linear_resnet = nn.Linear(2048, resnet_features_dim)  # Adjust output size to match ResNet features

        # Pretrained ViT model for processing higher-level features
        self.vit = ViTModel.from_pretrained('google/vit-base-patch16-224-in21k')

        # Classification head
        self.classifier = nn.Linear(resnet_features_dim + vit_hidden_dim, num_classes)

    def forward(self, x):
        resnet_features = self.resnet(x)
        resnet_features = self.avgpool_resnet(resnet_features)
        resnet_features = resnet_features.view(x.size(0), -1)

        # Apply linear transformation to adjust dimensionality of ResNet features
        resnet_features = self.linear_resnet(resnet_features)

        vit_output = self.vit(x)['last_hidden_state'][:, 0, :]

        combined_features = torch.cat((resnet_features, vit_output), dim=1)

        output = self.classifier(combined_features)
        return output

# Instantiate the hybrid ResNet model
resnet_features_dim = 2048  # Dimensionality of ResNet-50 features (after global average pooling)
vit_hidden_dim = 768  # Dimensionality of ViT hidden states
num_classes = 11  # Number of output classes (adjust as needed)

resnet_model = HybridResNetModel(resnet_features_dim, vit_hidden_dim, num_classes)

# Move model to device (GPU if available)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
resnet_model.to(device)

# Freeze the parameters of the ResNet and ViT models
for param in resnet_model.resnet.parameters():
    param.requires_grad = False
for param in resnet_model.vit.parameters():
    param.requires_grad = False

# Use the hybrid ResNet model for training, fine-tuning, or inference

In [None]:
#Training loop
for epoch in range(num_epochs):
    resnet_model.train()  # Set model to training mode
    running_loss = 0.0
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()  # Zero the gradients

        outputs = resnet_model(images)  # Forward pass
        labels = labels.float()
        loss = criterion(outputs, labels)  # Compute loss
        loss.backward()  # Backward pass
        optimizer.step()  # Update weights

        running_loss += loss.item() * images.size(0)

    # Compute average training loss for the epoch
    epoch_loss = running_loss / len(train_loader.dataset)

    # Print training and validation loss for the epoch
    print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {epoch_loss:.4f}")

In [None]:
# During inference, obtain the raw logits from the model's output
with torch.no_grad():
    resnet_model.eval()
    outputs = resnet_model(test_images_tensor)

# Apply threshold to obtain binary predictions
threshold = 0.5
binary_predictions_resnet = (outputs > threshold).int()
# Move the tensor to the CPU before converting it to a NumPy array
binary_predictions_resnet_cpu = binary_predictions_resnet.cpu()

# Evaluate performance (e.g., accuracy, precision, recall, F1-score)
# You can use the provided evaluate_performance function or any other evaluation method suitable for multilabel classification
accuracy, precision, recall, f1, macro_f1, h_loss, mcm = evaluate_performance(test_labels_array, binary_predictions_resnet_cpu)

# Print or visualize evaluation metrics as needed
labels = list(mlb.classes_)

print("F1-score per label:")
for label, f1_score_label in zip(labels, f1):
    print(f"{label}: {f1_score_label}")

# Print macro F1-score
print("Macro F1-score:", macro_f1)

# Print other metrics
print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("Hamming Loss:", h_loss)

# Plot confusion matrix
plt.figure(figsize=(15, 10))
for i, label in enumerate(labels):
    plt.subplot(4, 4, i + 1)
    sns.heatmap(data=mcm[i], annot=True, cmap='Blues', fmt='d')
    plt.title(f'Confusion Matrix - {label}')
    plt.xlabel('Predicted')
    plt.ylabel('True')
plt.tight_layout()
plt.show()

VGG-16 + XGBoost

In [None]:
import xgboost as xgb
from tensorflow.keras.applications import VGG16
from tensorflow.keras.models import Model

# Step 1: Feature Extraction with VGG16
def extract_vgg16_features(images):
    base_model = VGG16(weights='imagenet', include_top=False)
    model = Model(inputs=base_model.input, outputs=base_model.output)
    features = model.predict(images)
    return features

# Step 2: Train XGBoost Model
# Extract VGG16 features for training and testing data
X_train_features = extract_vgg16_features(train_images_extended)
X_test_features = extract_vgg16_features(test_images)

In [None]:
# Flatten the features array
X_train_features_flat = X_train_features.reshape(X_train_features.shape[0], -1)

# Flatten the test features array
X_test_features_flat = X_test_features.reshape(X_test_features.shape[0], -1)

In [None]:
# Convert labels to a format suitable for XGBoost
y_train_xgb = train_labels_extended.astype(int)
y_test_xgb = test_labels_array.astype(int)

# Train XGBoost model
params = {
    'objective': 'binary:logistic',  # Use binary:logistic for binary classification
    'eval_metric': 'logloss',  # Use logloss for binary classification
    'num_round': 100  # Number of boosting rounds
}

# Train one model per label using the one-vs-rest strategy
num_labels = 11
models = []
for label_idx in range(num_labels):
    # Compute the class imbalance ratio for the current label
    class_imbalance_ratio = (len(y_train_xgb) - np.sum(y_train_xgb[:, label_idx])) / np.sum(y_train_xgb[:, label_idx])

    # Set scale_pos_weight to balance the classes
    scale_pos_weight = class_imbalance_ratio

    # Create DMatrix for the current label
    dtrain_label = xgb.DMatrix(X_train_features_flat, label=y_train_xgb[:, label_idx])

    # Train XGBoost model for the current label with scale_pos_weight
    model = xgb.train(params, dtrain_label, num_boost_round=params['num_round'], verbose_eval=False,
                      params={'scale_pos_weight': scale_pos_weight})

    models.append(model)

In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, hamming_loss, multilabel_confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

# Function to evaluate performance
def evaluate_performance(y_true, y_pred):
    # Accuracy
    accuracy = accuracy_score(y_true, y_pred)

    # Precision, Recall, F1-score per label
    precision = precision_score(y_true, y_pred, average=None)
    recall = recall_score(y_true, y_pred, average=None)
    f1 = f1_score(y_true, y_pred, average=None)

    # Macro F1-score
    macro_f1 = f1_score(y_true, y_pred, average='macro')

    # Hamming Loss
    h_loss = hamming_loss(y_true, y_pred)

    # Confusion Matrix
    mcm = multilabel_confusion_matrix(y_true, y_pred)

    return accuracy, precision, recall, f1, macro_f1, h_loss, mcm

In [None]:
# Step 3: Evaluate Model
# Predict probabilities on the test set for each label
y_pred_proba = np.zeros((len(X_test_features), num_labels))
for label_idx, model in enumerate(models):
    dtest_label = xgb.DMatrix(X_test_features_flat)
    y_pred_proba[:, label_idx] = model.predict(dtest_label)

# Convert predicted probabilities to binary predictions
threshold = 0.5  # Adjust threshold as needed
y_pred_binary = (y_pred_proba > threshold).astype(int)

# Evaluate model performance using provided evaluation function
accuracy, precision, recall, f1, macro_f1, h_loss, mcm = evaluate_performance(y_test_xgb, y_pred_binary)

# Print F1-score per label
labels = list(mlb.classes_)
print("F1-score per label:")
for label, f1_score_label in zip(labels, f1):
    print(f"{label}: {f1_score_label}")

# Print macro F1-score
print("Macro F1-score:", macro_f1)

# Print other metrics
print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("Hamming Loss:", h_loss)

# Plot confusion matrix
plt.figure(figsize=(15, 10))
for i, label in enumerate(labels):
    plt.subplot(4, 4, i + 1)
    sns.heatmap(data=mcm[i], annot=True, cmap='Blues', fmt='d')
    plt.title(f'Confusion Matrix - {label}')
    plt.xlabel('Predicted')
    plt.ylabel('True')
plt.tight_layout()
plt.show()