In [2]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import VGG16
from kerastuner.tuners import RandomSearch
import pandas as pd
import numpy as np
import cv2
import os
from sklearn.model_selection import train_test_split
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt


  from kerastuner.tuners import RandomSearch


In [3]:
# Ensuring GPU acceleration is active
print(tf.config.list_physical_devices('GPU'))

[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]


In [None]:
#Defining a function that is used when loading/processing the image data
def preprocess_image(image_path, bbox):
    """Loads an image, crops it to the bounding box, resizes it, and normalizes it."""
    img = cv2.imread(image_path)
    img = img[bbox[1]:bbox[3], bbox[0]:bbox[2]]
    img = cv2.resize(img, (224, 224))
    img = img / 255.0
    return img

In [None]:
# 1. Load and preprocess the training data

# Load the CSV files
df_train = pd.read_csv("Data/annotations_train_grouped.csv")
df_grouped = pd.read_csv("Data/class_names_manufacturer_country_region_grouped.csv")

# Create a dictionary mapping class indices to class names
class_names = {index: row["manufacturer"] for index, row in df_grouped.iterrows()}

# File paths to save the preprocessed data
x_train_path = "Data/x_train.npy"
y_train_path = "Data/y_train.npy"

# Check if preprocessed data exists
if os.path.exists(x_train_path) and os.path.exists(y_train_path):
    # Load the preprocessed data
    x_train = np.load(x_train_path)
    y_train = np.load(y_train_path)
else:
    # Load and preprocess the training data
    x_train = []
    y_train = []
    for i, row in df_train.iterrows():
        image_path = f"Data/cars_train/{row['fname'].split('/')[-1]}"
        bbox = [row['bbox_x1'], row['bbox_y1'], row['bbox_x2'], row['bbox_y2']]
        x_train.append(preprocess_image(image_path, bbox))
        y_train.append(row["class"])

    # Convert the data to NumPy arrays
    x_train = np.array(x_train)
    y_train = np.array(y_train)

    # Save the preprocessed data
    np.save(x_train_path, x_train)
    np.save(y_train_path, y_train)

# Split the training data into training and validation sets
x_train, x_val, y_train, y_val = train_test_split(
    x_train, y_train, test_size=0.2, random_state=42
)

In [None]:
#2. Defining multiple models to explore various approaches

def create_model(
    num_conv_layers=3,
    filters_per_layer=64,
    optimizer="adam",
    learning_rate=0.001,
    dropout_rate=0.5,
    use_early_stopping=True,
):
    """Creates and compiles a CNN model with the specified parameters."""

    model = keras.Sequential()
    model.add(keras.Input(shape=(224, 224, 3)))

    for i in range(num_conv_layers):
        model.add(
            layers.Conv2D(
                filters_per_layer * 2**i, kernel_size=(3, 3), activation="relu"
            )
        )
        model.add(layers.MaxPooling2D(pool_size=(2, 2)))

    model.add(layers.Flatten())
    model.add(layers.Dropout(dropout_rate))
    model.add(layers.Dense(49, activation="softmax"))

    if optimizer == "adam":
        optimizer = keras.optimizers.Adam(learning_rate=learning_rate)
    elif optimizer == "rmsprop":
        optimizer = keras.optimizers.RMSprop(learning_rate=learning_rate)
    elif optimizer == "sgd":
        optimizer = keras.optimizers.SGD(learning_rate=learning_rate)

    model.compile(
        loss="sparse_categorical_crossentropy", optimizer=optimizer, metrics=["accuracy"]
    )

    return model

# Create an ImageDataGenerator for data augmentation
datagen = ImageDataGenerator(
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode="nearest",
)

# Define the models with different parameters
models = [
    {
        "name": "model_1",
        "num_conv_layers": 3,
        "filters_per_layer": 32,
        "optimizer": "adam",
        "learning_rate": 0.001,
        "dropout_rate": 0.5,
        "use_early_stopping": True,
        "epochs": 50,
    },
    {
        "name": "model_2",
        "num_conv_layers": 4,
        "filters_per_layer": 32,
        "optimizer": "adam",
        "learning_rate": 0.0001,
        "dropout_rate": 0.5,
        "use_early_stopping": True,
        "epochs": 50,
    },
    {
        "name": "model_3",
        "num_conv_layers": 3,
        "filters_per_layer": 64,
        "optimizer": "rmsprop",
        "learning_rate": 0.001,
        "dropout_rate": 0.5,
        "use_early_stopping": True,
        "epochs": 50,
    },
    {
        "name": "model_4",
        "num_conv_layers": 4,
        "filters_per_layer": 64,
        "optimizer": "rmsprop",
        "learning_rate": 0.0001,
        "dropout_rate": 0.5,
        "use_early_stopping": True,
        "epochs": 50,
    },
    {
        "name": "model_5",
        "num_conv_layers": 3,
        "filters_per_layer": 32,
        "optimizer": "sgd",
        "learning_rate": 0.001,
        "dropout_rate": 0.5,
        "use_early_stopping": True,
        "epochs": 50,
    },
    {
        "name": "model_6",
        "num_conv_layers": 4,
        "filters_per_layer": 32,
        "optimizer": "sgd",
        "learning_rate": 0.0001,
        "dropout_rate": 0.5,
        "use_early_stopping": True,
        "epochs": 50,
    },
    {
        "name": "model_7",
        "num_conv_layers": 3,
        "filters_per_layer": 64,
        "optimizer": "adam",
        "learning_rate": 0.001,
        "dropout_rate": 0.25,
        "use_early_stopping": False,
        "epochs": 50,
    },
    {
        "name": "model_8",
        "num_conv_layers": 4,
        "filters_per_layer": 64,
        "optimizer": "rmsprop",
        "learning_rate": 0.0001,
        "dropout_rate": 0.25,
        "use_early_stopping": False,
        "epochs": 50,
    },
    {
        "name": "model_9",
        "num_conv_layers": 5,
        "filters_per_layer": 32,
        "optimizer": "sgd",
        "learning_rate": 0.001,
        "dropout_rate": 0.25,
        "use_early_stopping": False,
        "epochs": 50,
    },
    {
        "name": "model_10",
        "num_conv_layers": 4,
        "filters_per_layer": 16,
        "optimizer": "adam",
        "learning_rate": 0.0001,
        "dropout_rate": 0.5,
        "use_early_stopping": True,
        "epochs": 100,
    },
]


In [None]:
# attempting to use a pre-trained model
base_model = VGG16(
    weights="imagenet", include_top=False, input_shape=(224, 224, 3)
)

# Freeze the convolutional base
base_model.trainable = False

# Create a new model on top
inputs = keras.Input(shape=(224, 224, 3))
x = base_model(inputs, training=False)
x = keras.layers.GlobalAveragePooling2D()(x)
outputs = keras.layers.Dense(49, activation="softmax")(x)
model = keras.Model(inputs, outputs)

# Compile the model
model.compile(
    loss="sparse_categorical_crossentropy", optimizer="adam", metrics=["accuracy"]
)

# Create an ImageDataGenerator for data augmentation
datagen = ImageDataGenerator(
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode="nearest",
)

# Implement early stopping
early_stopping = tf.keras.callbacks.EarlyStopping(monitor="val_loss", patience=5)

# Train the model with data augmentation
history = model.fit(
    datagen.flow(x_train, y_train, batch_size=32),
    epochs=50,
    validation_data=(x_val, y_val),
    callbacks=[early_stopping],
)

# Save the trained model
model.save("car_manufacturer_model_vgg16.h5")

In [4]:
#Reload the model
temp = keras.models.load_model("car_manufacturer_model_vgg16.h5")

In [5]:
temp.save("car_manufacturer_model_vgg16.keras")

In [None]:
# attempting to create a single more optimized model without relying on a pre-trained base

# More aggressive Data augmentation
datagen = ImageDataGenerator(
    rotation_range=30,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode="nearest",
    brightness_range=[0.8, 1.2],
    contrast_stretching=True
)

# Hyperparameter tuning
def build_model(hp):
    model = keras.Sequential()
    model.add(keras.Input(shape=(224, 224, 3)))
    for i in range(hp.Int('num_conv_layers', 2, 5)):
        model.add(layers.Conv2D(
            filters=hp.Int('filters_' + str(i), min_value=32, max_value=256, step=32),
            kernel_size=hp.Choice('kernel_size_' + str(i), values=[3, 5]),
            activation='relu'
        ))
        model.add(layers.BatchNormalization())
        model.add(layers.MaxPooling2D(pool_size=(2, 2)))
    model.add(layers.Flatten())
    model.add(layers.Dropout(hp.Float('dropout', 0.3, 0.7, step=0.1)))
    model.add(layers.Dense(len(class_names), activation='softmax'))
    
    model.compile(
        optimizer=keras.optimizers.Adam(hp.Float('learning_rate', 1e-4, 1e-2, sampling='log')),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )
    return model

tuner = RandomSearch(
    build_model,
    objective='val_accuracy',
    max_trials=10,
    executions_per_trial=1,
    directory='keras_tuner',
    project_name='car_classification'
)

tuner.search_space_summary()

tuner.search(datagen.flow(x_train, y_train, batch_size=32),
             epochs=10,
             validation_data=(x_val, y_val))

best_model = tuner.get_best_models(num_models=1)[0]
best_model.summary()

# Train the best model with the entire dataset
history = best_model.fit(
    datagen.flow(x_train, y_train, batch_size=32),
    epochs=50,
    validation_data=(x_val, y_val)
)

model.save("best_model.keras")

In [None]:
# 3. Train models

# Train the models
for model_params in models:
    print(f"Training {model_params['name']}...")
    model = create_model(
        num_conv_layers=model_params["num_conv_layers"],
        filters_per_layer=model_params["filters_per_layer"],
        optimizer=model_params["optimizer"],
        learning_rate=model_params["learning_rate"],
        dropout_rate=model_params["dropout_rate"],
        use_early_stopping=model_params["use_early_stopping"],
    )

    callbacks = []
    if model_params["use_early_stopping"]:
        early_stopping = tf.keras.callbacks.EarlyStopping(monitor="val_loss", patience=5)
        callbacks.append(early_stopping)

    history = model.fit(
        datagen.flow(x_train, y_train, batch_size=32),
        epochs=model_params["epochs"],
        validation_data=(x_val, y_val),
        callbacks=callbacks,
    )

    # Save the trained model
    model.save(f"{model_params['name']}.keras")

In [None]:
#Save the model for future use
model.save("car_manufacturer_model.keras")

In [None]:
#Reload the model
model = keras.models.load_model("car_manufacturer_model.keras")

In [None]:
# 4. Load and preprocess the test data

# Load the CSV files
df_test = pd.read_csv("Data/annotations_test_grouped.csv")
df_grouped = pd.read_csv("Data/class_names_manufacturer_country_region_grouped.csv")

# Create a dictionary mapping class indices to class names
class_names = {index: row["manufacturer"] for index, row in df_grouped.iterrows()}

# Function to load and preprocess an image
def preprocess_image(image_path, bbox):
    """Loads an image, crops it to the bounding box, resizes it, and normalizes it."""
    img = cv2.imread(image_path)
    img = img[bbox[1]:bbox[3], bbox[0]:bbox[2]]
    img = cv2.resize(img, (224, 224))
    img = img / 255.0
    return img

# File paths to save the preprocessed data
x_test_path = "Data/x_test.npy"
y_test_path = "Data/y_test.npy"

# Check if preprocessed data exists
if os.path.exists(x_test_path) and os.path.exists(y_test_path):
    # Load the preprocessed data
    x_test = np.load(x_test_path)
    y_test = np.load(y_test_path)
else:
    # process the data
    x_test = []
    y_test = []
    for i, row in df_test.iterrows():
        image_path = f"Data/cars_test/{row['fname'].split('/')[-1]}"
        bbox = [row['bbox_x1'], row['bbox_y1'], row['bbox_x2'], row['bbox_y2']]
        x_test.append(preprocess_image(image_path, bbox))
        y_test.append(row["class"])

    # Convert the data to NumPy arrays
    x_test = np.array(x_test)
    y_test = np.array(y_test)

    # Save the preprocessed data
    np.save(x_test_path, x_test)
    np.save(y_test_path, y_test)
    
# Create a smaller test set due to an issue when running the full fat dataset for evaluation
x_test_small = x_test[:5000]
y_test_small = y_test[:5000]

In [None]:
#Testing with a smaller subset to debug an issue

try:
    # Evaluate the model on the smaller test set
    loss, accuracy = model.evaluate(x_test_small, y_test_small)
    print("Test accuracy (small set):", accuracy)

except Exception as e:
    print(f"An error occurred during evaluation: {e}")
    import traceback
    traceback.print_exc()

In [None]:
# 4. Evaluate the model

# Evaluate the model on the test set
loss, accuracy = model.evaluate(x_test, y_test)
print("Test accuracy:", accuracy)

In [None]:
# 4. Evaluate all of the models with a slightly reduced size due to kernel crashing when I ran the entire dataset, not sure why and I couldn't fix it
model_names = [f"model_{i}.keras" for i in range(1, 11)]
results = []

for model_name in model_names:
    try:
        model = keras.models.load_model(model_name)
        loss, accuracy = model.evaluate(x_test_small, y_test_small)
        results.append({"model_name": model_name, "accuracy": accuracy})
    except Exception as e:
        print(f"An error occurred while evaluating {model_name}: {e}")

# Print the results in a formatted table
print("-" * 40)
print("Model\t\tAccuracy")
print("-" * 40)
for result in results:
    print(f"{result['model_name']}\t{result['accuracy']:.4f}")
print("-" * 40)

In [None]:
# 5. Analyze the learned features

# Extract the learned features from the last convolutional layer
feature_extractor = keras.Model(
    inputs=model.inputs, outputs=model.get_layer("conv2d_2").output
)
features = feature_extractor.predict(x_test)

# Flatten the features
features_flat = features.reshape(features.shape[0], -1)

# Perform PCA for dimensionality reduction
pca = PCA(n_components=50)
features_pca = pca.fit_transform(features_flat)

# Perform t-SNE for visualization
tsne = TSNE(n_components=2, perplexity=30, n_iter=1000)
features_tsne = tsne.fit_transform(features_pca)

# Plot the t-SNE visualization
plt.figure(figsize=(10, 10))
for i in range(len(class_names)):
    plt.scatter(
        features_tsne[y_test == i, 0],
        features_tsne[y_test == i, 1],
        label=class_names[i],
    )
plt.legend()
plt.show()