In [None]:
import os
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import pandas as pd
from PIL import Image
import torch
import json
from tqdm.notebook import tqdm
from torch.utils.data import Dataset, DataLoader
from heapq import nlargest
import random

In [None]:
class CarClassifier(nn.Module):
    def __init__(self, image_shape, conv_layers, num_classes):
        super(CarClassifier, self).__init__()

        self.convs = nn.ModuleList(
            [nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, padding=1)] +
            [nn.Conv2d(in_channels=32 * (2**n), out_channels=64 * (2**n), kernel_size=3, padding=1) for n in range(conv_layers - 1)]
        )

        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

        self.flattened_size = (2**conv_layers) * image_shape[0] * image_shape[1]

        self.fc_layers = nn.ModuleList([
            nn.Linear(self.flattened_size, 512),
            nn.Linear(512, num_classes)
        ])

    def forward(self, x):
        for i, conv in enumerate(self.convs):
            x = F.relu(conv(x))
            if i == 0 or i == 2:
                x = self.pool(x)

        x = x.reshape(-1, self.flattened_size)

        x = F.relu(self.fc_layers[0](x))
        x = self.fc_layers[1](x)
        return x
    
    def get_num_conv_layers(self):
        return len(self.convs)

In [None]:
class DynamicCarClassifier(nn.Module):
    def __init__(self, image_shape, conv_layers, num_classes, conv_kernel_size, pool_kernel_size):
        super(CarClassifier, self).__init__()

        self.convs = nn.ModuleList()
        in_channels = 3
        for n in range(conv_layers):
            out_channels = 32 * (2 ** n)
            self.convs.append(nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=conv_kernel_size, padding=1))
            in_channels = out_channels  # Update in_channels for the next layer

        # Same pooling layer used for all conv layers
        self.pool = nn.MaxPool2d(kernel_size=pool_kernel_size, stride=2)

        # Adjust flattened_size calculation based on the consistent kernel size
        self.flattened_size = self.calculate_flattened_size(image_shape, conv_layers, conv_kernel_size, pool_kernel_size)

        self.fc_layers = nn.ModuleList([
            nn.Linear(self.flattened_size, 512),
            nn.Linear(512, num_classes)
        ])

    def forward(self, x):
        for i, conv in enumerate(self.convs):
            x = F.relu(conv(x))
            if i == 0 or i == 2:
                x = self.pool(x)

        x = x.view(-1, self.flattened_size)

        x = F.relu(self.fc_layers[0](x))
        x = self.fc_layers[1](x)
        return x

    def calculate_flattened_size(self, image_shape, conv_layers, conv_kernel_size, pool_kernel_size):
        height, width = image_shape

        for i in range(conv_layers):
            # Conv layer with padding=1 keeps size same
            # Apply pooling size reduction after specific layers (1st and 3rd in this case)
            if i == 0 or i == 2:  # Pooling after 1st and 3rd layers
                height = (height - pool_kernel_size) // 2 + 1
                width = (width - pool_kernel_size) // 2 + 1

            if height <= 0 or width <= 0:
                raise ValueError("Feature map size is too small after convolution and pooling layers.")

        last_layer_channels = 32 * (2 ** (conv_layers - 1))
        return height * width * last_layer_channels

    def get_num_conv_layers(self):
        return len(self.convs)

In [None]:
class VariedKernelSizeCarClassifier(nn.Module):
    def __init__(self):
        super(VariedKernelSizeCarClassifier, self).__init__()

        # Convolutional Layers
        self.conv1 = nn.Conv2d(3, 96, kernel_size=14, stride=4, padding=7)
        self.conv2 = nn.Conv2d(96, 256, kernel_size=5, padding=2)
        self.conv3 = nn.Conv2d(256, 384, kernel_size=3, padding=1)
        self.conv4 = nn.Conv2d(384, 480, kernel_size=3, padding=1)

        # Max Pooling Layer
        self.pool = nn.MaxPool2d(kernel_size=3, stride=2)

        # Calculate the size of the flattened features for the fully connected layer
        self.num_flat_features = self._get_conv_output((3, 100, 100))

        # Fully Connected Layer
        self.fc1 = nn.Linear(self.num_flat_features, 4096)
        self.fc2 = nn.Linear(4096, 200)  # Output layer for 200 classes

        # Dropout
        self.dropout = nn.Dropout(0.5)

    def _get_conv_output(self, shape):
        # Helper function to calculate the size of the flattened features
        batch_size = 1
        input = torch.autograd.Variable(torch.rand(batch_size, *shape))
        output_feat = self._forward_features(input)
        n_size = output_feat.data.view(batch_size, -1).size(1)
        return n_size

    def _forward_features(self, x):
        # Helper function to pass a dummy input through the conv layers
        x = F.relu(self.conv1(x))
        x = self.pool(F.relu(self.conv2(x)))
        x = F.relu(self.conv3(x))
        x = F.relu(self.conv4(x))
        return x

    def forward(self, x):
        # Forward pass through the network
        x = self._forward_features(x)
        x = x.view(x.size(0), -1)  # Flatten the output for the fully connected layers
        x = self.dropout(F.relu(self.fc1(x)))
        x = self.fc2(x)  # Output layer for 200 classes
        return x

In [None]:
class CarDataset(Dataset):
    def __init__(self, file_paths, image_size, csv_file):
        self.file_paths = file_paths
        self.image_size = image_size
        self.idx_df = pd.read_csv(csv_file)

    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        file_path = self.file_paths[idx]
        split = file_path.split("\\")[1].split("_")[:-1]

        if len(split) > 2:
            split = [split[0], "_".join(split[1:len(split)])]

        try:
            image = Image.open(file_path).resize(self.image_size)
            image = np.asarray(image).transpose((2, 0, 1))  # Change HWC to CHW format
            label = self.idx_df.loc[(self.idx_df['make'] == split[0]) & (self.idx_df['model'] == split[1]), 'idx'].values[0]
            return torch.from_numpy(image).float(), label
        
        except Exception as e:
            print(f"skipping image of make {split[0]} and model {split[1]}")
            # Return a placeholder or zero tensor
            placeholder_image = torch.zeros((3, *self.image_size))  # Assuming 3 channels
            placeholder_label = -1  # Assuming -1 indicates an invalid label
            return placeholder_image, placeholder_label

In [None]:
def train(model, optimizer, criterion, image_size, training_data, validation_data, batch_size=32, epochs=20, patience=5):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    # Create data loaders
    train_loader = DataLoader(CarDataset(training_data, tuple(image_size), "idx.csv"), batch_size=batch_size, shuffle=True)

    for epoch in tqdm(range(epochs)):
        # Training phase
        model.train()
        for data_batch_tensor, labels in tqdm(train_loader):
            if -1 in labels:
                valid_mask = labels != -1

                # Skip the batch if all labels are invalid
                if not valid_mask.any():
                    print("skipping batch")
                    continue

                # Filter out invalid data and labels
                data_batch_tensor = data_batch_tensor[valid_mask]
                labels = labels[valid_mask]

            data_batch_tensor, labels = data_batch_tensor.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(data_batch_tensor).float()
            labels = labels.long()
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

        if epoch != 0  and epoch % 20 == 0:
            try:
                torch.save(model, f"model_conv_{model.get_num_conv_layers()}_epcohs_{epoch}.pth")
            except:
                torch.save(model, f"model_epcohs_{epoch}.pth")
            print(f"epoch {epoch} complete and checkpoint saved.")

    print('Training complete.')

In [None]:
def get_folder_size(folder_path):
    total_size = 0
    for root, dirs, files in os.walk(folder_path):
        total_size += sum(os.path.getsize(os.path.join(root, name)) for name in files)
    return total_size

images_path = "VMMRdb"

if os.path.isfile("split.json") and os.path.isfile("folder.json"):
    with open("split.json", "r") as f:
        data = json.load(f)

    train_files = data["train"]
    validation_files = data["validate"]
    test_files = data["test"]

    with open("folder.json", "r") as f:
        folders = json.load(f)

else:
    subdirs = [os.path.join(images_path, d) for d in os.listdir(images_path) if os.path.isdir(os.path.join(images_path, d))]

    # Calculate folder sizes
    folder_sizes = {folder: get_folder_size(folder) for folder in subdirs}

    # Get the top N largest folders
    largest = nlargest(1000, folder_sizes, key=folder_sizes.get)

    # Correct way to generate filenames list
    filenames = []
    folders = []
    for folder in largest:
        folder_split = folder.split("_")[:-1]
        folder_split[0] = folder_split[0].split("\\")[-1]
    
        if len(folder_split) > 2:
            folder_split = [folder_split[0], "_".join(folder_split[1:len(folder_split)])]

        folders.append(folder_split)

        for filename in os.listdir(folder):
            if os.path.isfile(os.path.join(folder, filename)):
                filenames.append(os.path.join(folder, filename))

    random.shuffle(filenames)
    train_files, validation_files, test_files = np.split(filenames, [int(len(filenames)*0.8), int(len(filenames)*0.9)])

    with open("split.json", "w") as f:
        json.dump(
            {
                "train": train_files.tolist(),
                "validate": validation_files.tolist(),
                "test": test_files.tolist()
            },
            f, indent=4
        )

    with open("folder.json", "w") as f:
        json.dump(folders, f, indent=4)

In [None]:
images_path = "VMMRdb"

image_counts_df = pd.DataFrame(columns=["make", "model", "year", "count"])
images_df = pd.DataFrame(columns=["filename", "make", "model", "year"])

for folder in os.listdir(images_path):
    folder_split = folder.split("_")
    
    if len(folder_split) > 3:
        folder_split = [folder_split[0], "_".join(folder_split[1:len(folder_split) - 1]), folder_split[len(folder_split) - 1]]

    folder_path = os.path.join(images_path, folder)

    count = len(os.listdir(folder_path))
    image_counts_df.loc[len(image_counts_df)] = folder_split + [count]

    data = [
        {
            "filename": os.path.join(images_path, folder, filename),
            "make": folder_split[0],
            "model": folder_split[1],
            "year": folder_split[2]
        }
        for filename in os.listdir(folder_path)
    ]

    images_df = pd.concat([images_df, pd.DataFrame(data)], ignore_index=True)

idx_df = pd.read_csv("idx.csv")

images_df = pd.merge(images_df, idx_df, on=['make', 'model'], how='left')

image_counts_df.to_csv("image_counts.csv")
images_df.to_csv("image_info.csv")

In [None]:
def filter_dataframe(df, make_model_pairs):
    # Convert the list of lists into a set of tuples for faster lookup
    make_model_set = set(tuple(pair) for pair in make_model_pairs)

    # Define a function to check if the row's make-model pair is in the set
    def is_in_pairs(row):
        return (row['make'], row['model']) in make_model_set

    # Apply the function to each row and filter the dataframe
    filtered_df = df[df.apply(is_in_pairs, axis=1)]
    return filtered_df


df = pd.read_csv("image_counts.csv").drop_duplicates(["make", "model"])[["make", "model"]]
df = filter_dataframe(df, folders).reset_index()
df["idx"] = df.index

df.to_csv("idx.csv")

num_classes = len(df)

image_shape = [100, 100]

for conv_layers in range(4,8):

    model = CarClassifier(image_shape, conv_layers, num_classes).to("cuda")

    optimizer = optim.Adam(model.parameters(), lr=0.001)

    criterion = nn.CrossEntropyLoss()

    train(
        model,
        optimizer,
        criterion,
        image_shape,
        train_files,
        validation_files,
        batch_size=128,
        epochs=100
    )

    torch.save(model, f"model_conv{conv_layers}.pth")

In [None]:
del model
torch.cuda.empty_cache()
conv_layers = 4

model = DynamicCarClassifier(image_shape, conv_layers, num_classes, 3, 3).to("cuda")

optimizer = optim.Adam(model.parameters(), lr=0.001)

criterion = nn.CrossEntropyLoss()

train(
    model,
    optimizer,
    criterion,
    image_shape,
    train_files,
    validation_files,
    batch_size=128,
    epochs=100
)

torch.save(model, f"model_conv{conv_layers}_kernel3.pth")

In [None]:
del model
torch.cuda.empty_cache()

model = VariedKernelSizeCarClassifier().to("cuda")

optimizer = optim.Adam(model.parameters(), lr=0.001)

criterion = nn.CrossEntropyLoss()

train(
    model,
    optimizer,
    criterion,
    image_shape,
    train_files,
    validation_files,
    batch_size=128,
    epochs=100
)

torch.save(model, f"model_conv{conv_layers}_vary.pth")

In [None]:
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score

def filter_dataframe(df, make_model_pairs):
    # Convert the list of lists into a set of tuples for faster lookup
    make_model_set = set(tuple(pair) for pair in make_model_pairs)

    # Define a function to check if the row's make-model pair is in the set
    def is_in_pairs(row):
        return (row['make'], row['model']) in make_model_set

    # Apply the function to each row and filter the dataframe
    filtered_df = df[df.apply(is_in_pairs, axis=1)]
    return filtered_df


df = pd.read_csv("image_counts.csv").drop_duplicates(["make", "model"])[["make", "model"]]
df = filter_dataframe(df, folders).reset_index()
df["idx"] = df.index

df.to_csv("idx.csv")

num_classes = len(df)

image_shape = [100, 100]

train_loader = DataLoader(CarDataset(test_files, tuple(image_shape), "idx.csv"), batch_size=128)

metrics = {}

for file in os.listdir(os.getcwd()):
    if file.endswith(".pth"):
        model = torch.load(file).to("cuda")
        model.eval()

        all_preds = []
        all_labels = []

        for data_batch_tensor, labels in tqdm(train_loader):
            if -1 in labels:
                valid_mask = labels != -1

                if not valid_mask.any():
                    print("skipping batch")
                    continue

                data_batch_tensor = data_batch_tensor[valid_mask]
                labels = labels[valid_mask]

            # Move data and labels to the same device as the model
            data_batch_tensor = data_batch_tensor.to("cuda")
            labels = labels.to("cuda")

            with torch.no_grad():
                outputs = model(data_batch_tensor)
                _, preds = torch.max(outputs, 1)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

        # Calculating the metrics
        precision = precision_score(all_labels, all_preds, average='weighted')
        recall = recall_score(all_labels, all_preds, average='weighted')
        f1 = f1_score(all_labels, all_preds, average='weighted')
        accuracy = accuracy_score(all_labels, all_preds)

        metrics[file] = {
            "precision": precision,
            "recall": recall,
            "f1": f1,
            "accuracy": accuracy
        }
    
with open("metrics.json", "w") as f:
    json.dump(metrics, f, indent=4)

In [None]:
import matplotlib.pyplot as plt

def display_images_with_labels(image_label_list, num_cols=2):
    num_images = len(image_label_list)
    rows = num_images // num_cols + int(num_images % num_cols > 0)
    fig, axes = plt.subplots(rows, num_cols, figsize=(10, 5 * rows))

    for i, (image_tensor, label) in enumerate(image_label_list):
        image = image_tensor.numpy().transpose((1, 2, 0))
        
        # Normalize if the image is not in the [0, 1] range
        if image.max() > 1.0:
            image = image / 255.0

        row, col = divmod(i, num_cols)
        ax = axes[row, col] if num_images > num_cols else axes[col]
        ax.imshow(image)
        ax.set_title(f"Label: {label}")
        ax.axis('off')

    # Hide unused subplots
    for j in range(i + 1, rows * num_cols):
        row, col = divmod(j, num_cols)
        if num_images > num_cols:
            axes[row, col].axis('off')
        else:
            axes[col].axis('off')

    plt.tight_layout()
    plt.show()


def filter_dataframe(df, make_model_pairs):
    # Convert the list of lists into a set of tuples for faster lookup
    make_model_set = set(tuple(pair) for pair in make_model_pairs)

    # Define a function to check if the row's make-model pair is in the set
    def is_in_pairs(row):
        return (row['make'], row['model']) in make_model_set

    # Apply the function to each row and filter the dataframe
    filtered_df = df[df.apply(is_in_pairs, axis=1)]
    return filtered_df


df = pd.read_csv("image_counts.csv").drop_duplicates(["make", "model"])[["make", "model"]]
df = filter_dataframe(df, folders).reset_index()
df["idx"] = df.index

df.to_csv("idx.csv")

num_classes = len(df)

image_shape = [100, 100]

test_loader = DataLoader(CarDataset(test_files, tuple(image_shape), "idx.csv"), batch_size=128)

model = torch.load("model_conv6.pth").to("cuda")
model.eval()

found = []
count = 0
for data_batch_tensor, labels in test_loader:
    if len(found) >= 4:
        break

    if -1 in labels:
        valid_mask = labels != -1

        if not valid_mask.any():
            print("skipping batch")
            continue

        data_batch_tensor = data_batch_tensor[valid_mask]
        labels = labels[valid_mask]

    # Move data and labels to the same device as the model
    data_batch_tensor = data_batch_tensor.to("cuda")
    labels = labels.to("cuda")

    with torch.no_grad():
        outputs = model(data_batch_tensor)
        _, preds = torch.max(outputs, 1)

    for i, (label, pred) in enumerate(zip(labels, preds)):
        if label == pred:
            idx_df = pd.read_csv("idx.csv")

            # Get the make and model for the current label
            make, model = idx_df.loc[idx_df["idx"] == label.item(), ["make", "model"]].values[0]

            # Concatenate make and model into a single string
            label_str = f"{make} {model}"

            found.append((data_batch_tensor[i].cpu(), label_str))

            if len(found) >= 4:
                break

        count += 1

display_images_with_labels(found)
print(f"Found after {count} images.")