# AUTOVISION 

In [None]:
# Import Data Science Libraries
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.model_selection import train_test_split
import itertools
import random

# Import visualization libraries
import matplotlib.pyplot as plt
import matplotlib.cm as cm
import cv2
import seaborn as sns

# Tensorflow Libraries
from tensorflow import keras
from sklearn.utils.class_weight import compute_class_weight
from tensorflow.keras import layers,models
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.callbacks import Callback, EarlyStopping,ModelCheckpoint
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras import Model
#from tensorflow.keras.layers.experimental import preprocessing
from tensorflow.keras.regularizers import l2
from tensorflow.keras.callbacks import ReduceLROnPlateau

from tensorflow.keras import preprocessing
# System libraries
from pathlib import Path
import os.path

# Metrics
from sklearn.metrics import classification_report, confusion_matrix

sns.set(style='darkgrid')

In [None]:
# TESTING  - IF THIS DOESN'T PASS IT WONT RUN
import tensorflow as tf
print("TensorFlow Version: ", tf.__version__)

# Check available devices
print("Available devices:")
for device in tf.config.list_physical_devices():
    print(device)

# Check if GPU is available
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    print("Num GPUs Available: ", len(gpus))
    for gpu in gpus:
        print(gpu)
else:
    print("No GPU detected.")

# Check if TensorFlow is built with GPU support
print("Is TensorFlow built with GPU support? ", tf.test.is_built_with_cuda())


In [None]:
# SEED TO REPRODUCE RESULTS
def seed_everything(seed=42):
    # Seed value for TensorFlow
    tf.random.set_seed(seed)
    
    # Seed value for NumPy
    np.random.seed(seed)
    
    # Seed value for Python's random library
    random.seed(seed)
    
    # Force TensorFlow to use single thread
    # Multiple threads are a potential source of non-reproducible results.
    session_conf = tf.compat.v1.ConfigProto(
        intra_op_parallelism_threads=1,
        inter_op_parallelism_threads=1
    )

    # Make sure that TensorFlow uses a deterministic operation wherever possible
    tf.compat.v1.set_random_seed(seed)

    sess = tf.compat.v1.Session(graph=tf.compat.v1.get_default_graph(), config=session_conf)
    tf.compat.v1.keras.backend.set_session(sess)

seed_everything()

In [None]:
#!wget https://raw.githubusercontent.com/mrdbourke/tensorflow-deep-learning/main/extras/helper_functions.py

# Import series of helper functions
from helper_functions import create_tensorboard_callback, plot_loss_curves, unzip_data, compare_historys, walk_through_dir, pred_and_plot

In [None]:
# Set batch size, image size, and enable mixed precision for faster training
BATCH_SIZE = 16
TARGET_SIZE = (224, 224)
from tensorflow.keras.mixed_precision import set_global_policy
set_global_policy('mixed_float16')

In [None]:
# importing dataset
dataset = r"C:\Users\joshu\OneDrive\Desktop\bunchatest\DatasetCarmodel"
walk_through_dir(dataset)


In [None]:
# Convert image paths and labels into a DataFrame
def convert_path_to_df(dataset):
    image_dir = Path(dataset)  # Adjust path to include /train where images are stored
    filepaths = list(image_dir.glob(r'**/*.jpg')) + list(image_dir.glob(r'**/*.png'))
    labels = list(map(lambda x: os.path.split(os.path.split(x)[0])[1], filepaths))
    filepaths = pd.Series(filepaths, name='Filepath').astype(str)
    labels = pd.Series(labels, name='Label')
    image_df = pd.concat([filepaths, labels], axis=1)
    return image_df
image_df = convert_path_to_df(dataset)

In [None]:
# Detect and collect corrupted .jpg images
import PIL
from pathlib import Path
from PIL import UnidentifiedImageError

# Adjust the path to your dataset location
path = Path(r"C:\Users\joshu\OneDrive\Desktop\bunchatest\DatasetCarmodel").rglob("*.jpg")
corrupted_images = []
for img_p in path:
    try:
        img = PIL.Image.open(img_p)
    except PIL.UnidentifiedImageError:
        print(f"Corrupted image found: {img_p}")
        corrupted_images.append(img_p)


In [None]:
# Plot label distribution as a bar chart

label_counts = image_df['Label'].value_counts()

plt.figure(figsize=(10, 6))
sns.barplot(x=label_counts.index, y=label_counts.values, alpha=0.8, palette='rocket')
plt.title('Distribution of Labels in Image Dataset', fontsize=16)
plt.xlabel('Label', fontsize=14)
plt.ylabel('Count', fontsize=14)
plt.xticks(rotation=45) 
plt.show()


In [None]:
# Display 16 picture of the dataset with their labels
random_index = np.random.randint(0, len(image_df), 16)
fig, axes = plt.subplots(nrows=4, ncols=4, figsize=(10, 10),
                        subplot_kw={'xticks': [], 'yticks': []})

for i, ax in enumerate(axes.flat):
    ax.imshow(plt.imread(image_df.Filepath[random_index[i]]))
    ax.set_title(image_df.Label[random_index[i]])
plt.tight_layout()
plt.show()

In [None]:
# Generate ELA image using OpenCV for model input
def compute_ela_cv(path, quality):
    temp_filename = 'temp_file_name.jpeg'
    SCALE = 15
    orig_img = cv2.imread(path)
    orig_img = cv2.cvtColor(orig_img, cv2.COLOR_BGR2RGB)
    
    cv2.imwrite(temp_filename, orig_img, [cv2.IMWRITE_JPEG_QUALITY, quality])

    # read compressed image
    compressed_img = cv2.imread(temp_filename)

    # get absolute difference between img1 and img2 and multiply by scale
    diff = SCALE * cv2.absdiff(orig_img, compressed_img)
    return diff

# Generate ELA image using PIL for visualization
def convert_to_ela_image(path, quality):
    temp_filename = 'temp_file_name.jpeg'
    ela_filename = 'temp_ela.png'
    image = Image.open(path).convert('RGB')
    image.save(temp_filename, 'JPEG', quality = quality)
    temp_image = Image.open(temp_filename)

    ela_image = ImageChops.difference(image, temp_image)

    extrema = ela_image.getextrema()
    max_diff = max([ex[1] for ex in extrema])
    if max_diff == 0:
        max_diff = 1

    scale = 255.0 / max_diff
    ela_image = ImageEnhance.Brightness(ela_image).enhance(scale)
    
    return ela_image

# Return a random image path from a given directory
def random_sample(path, extension="jpg"):  # Assuming your images are in .jpg format
    items = list(Path(path).rglob(f'*.{extension}'))  # Using rglob to search subdirectories too
    
    if not items:  # Check if the list is empty
        return None  # Return None or handle it as you see fit, e.g., raise an exception
    
    return random.choice(items).as_posix()  # Return the selected path as a string


In [None]:
# display a random saple from the dataset
p = random_sample('C:/Users/joshu/OneDrive/Desktop/Car.com-Image-Scraper/efficientdataOfficial/train/W205')
orig = cv2.imread(p)
orig = cv2.cvtColor(orig, cv2.COLOR_BGR2RGB) / 255.0
init_val = 100
columns = 3
rows = 3

fig=plt.figure(figsize=(15, 10))
for i in range(1, columns*rows +1):
    quality=init_val - (i-1) * 8
    img = compute_ela_cv(path=p, quality=quality)
    if i == 1:
        img = orig.copy()
    ax = fig.add_subplot(rows, columns, i) 
    ax.title.set_text(f'q: {quality}')
    plt.imshow(img)
plt.show()

In [None]:
# Seperate data for training and testing
train_df, test_df = train_test_split(image_df, test_size=0.2, shuffle=True, random_state=42)


In [None]:
# Apply the ImageDataGenerator with augmentation only for training
train_generator_aug = ImageDataGenerator(
    preprocessing_function=tf.keras.applications.efficientnet_v2.preprocess_input,
    validation_split=0.2,  # 20% of training set will be used for validation
    rotation_range=20,  # Augmentation parameters
    width_shift_range=0.2,
    height_shift_range=0.2,
    horizontal_flip=True,
    zoom_range=0.2,
    brightness_range=[0.8, 1.2],
    shear_range=0.2,
    fill_mode='nearest'
)

# Use ImageDataGenerator without augmentation for validation and testing
val_generator = ImageDataGenerator(
    preprocessing_function=tf.keras.applications.efficientnet_v2.preprocess_input,
    validation_split=0.2  # Same as above, split out validation data
)

test_generator = ImageDataGenerator(
    preprocessing_function=tf.keras.applications.efficientnet_v2.preprocess_input
)

In [None]:
# Training data generator with augmentation
train_images = train_generator_aug.flow_from_dataframe(
    dataframe=train_df,
    x_col='Filepath',
    y_col='Label',
    target_size=(224, 224),
    color_mode='rgb',
    class_mode='categorical',
    batch_size=16,
    shuffle=True,
    seed=42,
    subset='training'  # This makes sure it's only using 80% of train_df
)

# Validation data generator without augmentation
val_images = val_generator.flow_from_dataframe(
    dataframe=train_df,  # Still using the original train_df
    x_col='Filepath',
    y_col='Label',
    target_size=(224, 224),
    color_mode='rgb',
    class_mode='categorical',
    batch_size=16,
    shuffle=False,
    seed=42,
    subset='validation'  # This ensures 20% of train_df is used for validation
)

# Test data generator without augmentation
test_images = test_generator.flow_from_dataframe(
    dataframe=test_df,  # Test data remains completely independent
    x_col='Filepath',
    y_col='Label',
    target_size=(224, 224),
    color_mode='rgb',
    class_mode='categorical',
    batch_size=16,
    shuffle=False
)

In [None]:
# Load the pretained model
pretrained_model = tf.keras.applications.efficientnet_v2.EfficientNetV2L(
    input_shape=(224, 224, 3),
    include_top=False,
    weights='imagenet',
    pooling='max'
)

pretrained_model.trainable = False

In [None]:
# Callback for saving the entire model, not just the weights
checkpoint_path = "car_model_full_model_checkpoint_outofscope_.h5"  # It's a good practice to use .h5 extension for full model saving
checkpoint_callback = ModelCheckpoint(checkpoint_path,
                                      save_weights_only=False,  # Save the entire model
                                      monitor="val_accuracy",
                                      save_best_only=True)


In [None]:
# Setup EarlyStopping callback to stop training if model's val_loss doesn't improve for 3 epochs
early_stopping = EarlyStopping(monitor = "val_loss", # watch the val loss metric
                               patience = 10,
                               restore_best_weights = True) # if val loss decreases for 3 epochs in a row, stop training

In [None]:
inputs = pretrained_model.input
x = pretrained_model(inputs, training=False)

x = Dense(128, activation='relu', kernel_regularizer=l2(0.01))(x)
x = Dropout(0.5)(x)
x = Dense(256, activation='relu', kernel_regularizer=l2(0.01))(x)
x = Dropout(0.5)(x)

outputs = Dense(6, activation='softmax')(x)
from sklearn.utils.class_weight import compute_class_weight

# Get unique classes
classes = np.unique(train_df['Label'])

# Compute class weights
class_weights = compute_class_weight('balanced', classes=classes, y=train_df['Label'])

# Create dictionary mapping class names to weights
name_weight_dict = dict(zip(classes, class_weights))

# Create dictionary mapping class indices to weights
class_weight_dict = {train_images.class_indices[class_name]: weight 
                     for class_name, weight in name_weight_dict.items()}
model = Model(inputs=inputs, outputs=outputs)

# Compile the model
model.compile(
    optimizer=Adam(1e-4),
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

# Callbacks for Training
checkpoint_path = "car_model_full_model_checkpoint_outofscope_.h5"  # It's a good practice to use .h5 extension for full model saving
checkpoint_callback = ModelCheckpoint(checkpoint_path,
                                      save_weights_only=False,  # Save the entire model
                                      monitor="val_accuracy",
                                      save_best_only=True)

early_stopping = EarlyStopping(
    monitor="val_loss",
    patience=10,
    restore_best_weights=True
)

reduce_lr = ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.2,
    patience=3,
    min_lr=1e-7,
    verbose=1
)

callbacks_list = [
    early_stopping,
    create_tensorboard_callback("training_logs", "car_model_classification_v2"),
    checkpoint_callback,
    reduce_lr
]

# Initial Training
initial_epochs = 20

history = model.fit(
    train_images,
    steps_per_epoch=len(train_images),
    validation_data=val_images,
    validation_steps=len(val_images),
    epochs=initial_epochs,
    callbacks=callbacks_list,
    class_weight=class_weight_dict
)

# Fine-Tuning the Model
pretrained_model.trainable = True

fine_tune_at = 300  # Adjust based on the model architecture

for layer in pretrained_model.layers[:fine_tune_at]:
    layer.trainable = False

# Recompile the model with a lower learning rate
model.compile(
    optimizer=Adam(1e-5),
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

# Continue Training
fine_tune_epochs = 300
total_epochs = initial_epochs + fine_tune_epochs

history_fine = model.fit(
    train_images,
    steps_per_epoch=len(train_images),
    validation_data=val_images,
    validation_steps=len(val_images),
    epochs=total_epochs,
    initial_epoch=history.epoch[-1]+ 1,
    callbacks=callbacks_list
)
# Save the entire model
#model_save_path = "car_model_classification_full_model"
#model.save(model_save_path)
#print(f"Full model saved to {model_save_path}")

In [None]:
# save model
model_save_path = "car_model_classification_vidkoutofscope.h5"
model.save(model_save_path)
print(f"Model saved to {model_save_path}")


In [None]:
from tensorflow.keras.models import load_model
# Path to your saved model
model_path = "car_model_full_model_checkpoint_outofscope_.h5"

# Load the saved model
model = load_model(model_path)

In [None]:
from tensorflow.keras.models import load_model
results = model.evaluate(test_images, verbose=0)

print("    Test Loss: {:.5f}".format(results[0]))
print("Test Accuracy: {:.2f}%".format(results[1] * 100))

In [None]:
# Plot training and validation accuracy/loss curves
accuracy = history.history['accuracy']
val_accuracy = history.history['val_accuracy']

loss = history.history['loss']
val_loss = history.history['val_loss']

epochs = range(len(accuracy))
plt.plot(epochs, accuracy, 'b', label='Training accuracy')
plt.plot(epochs, val_accuracy, 'r', label='Validation accuracy')

plt.title('Training and validation accuracy')
plt.legend()
plt.figure()
plt.plot(epochs, loss, 'b', label='Training loss')
plt.plot(epochs, val_loss, 'r', label='Validation loss')

plt.title('Training and validation loss')
plt.legend()
plt.show()

In [None]:
# Predict the label of the test_images
pred = model.predict(test_images)
pred = np.argmax(pred,axis=1)

# Map the label
labels = (train_images.class_indices)
labels = dict((v,k) for k,v in labels.items())
pred = [labels[k] for k in pred]

# Display the result
print(f'The first 5 predictions: {pred[:5]}')

In [None]:
# Display 25 random pictures from the dataset with their labels
random_index = np.random.randint(0, len(test_df) - 1, 15)
fig, axes = plt.subplots(nrows=3, ncols=5, figsize=(25, 15),
                        subplot_kw={'xticks': [], 'yticks': []})

for i, ax in enumerate(axes.flat):
    ax.imshow(plt.imread(test_df.Filepath.iloc[random_index[i]]))
    if test_df.Label.iloc[random_index[i]] == pred[random_index[i]]:
        color = "green"
    else:
        color = "red"
    ax.set_title(f"True: {test_df.Label.iloc[random_index[i]]}\nPredicted: {pred[random_index[i]]}", color=color)
plt.show()
plt.tight_layout()

In [None]:
y_test = list(test_df.Label)
print(classification_report(y_test, pred))

In [None]:
report = classification_report(y_test, pred, output_dict=True)
df = pd.DataFrame(report).transpose()
df

In [None]:
# Create and display a labeled confusion matrix
def make_confusion_matrix(y_true, y_pred, classes=None, figsize=(15, 7), text_size=10, norm=False, savefig=False): 

  # Create the confustion matrix
    cm = confusion_matrix(y_true, y_pred)
    cm_norm = cm.astype("float") / cm.sum(axis=1)[:, np.newaxis] # normalize it
    n_classes = cm.shape[0] # find the number of classes we're dealing with

    # Plot the figure and make it pretty
    fig, ax = plt.subplots(figsize=figsize)
    cax = ax.matshow(cm, cmap=plt.cm.Blues) # colors will represent how 'correct' a class is, darker == better
    fig.colorbar(cax)

    # Are there a list of classes?
    if classes:
        labels = classes
    else:
        labels = np.arange(cm.shape[0])
  
    # Label the axes
    ax.set(title="Confusion Matrix",
         xlabel="Predicted label",
         ylabel="True label",
         xticks=np.arange(n_classes), # create enough axis slots for each class
         yticks=np.arange(n_classes), 
         xticklabels=labels, # axes will labeled with class names (if they exist) or ints
         yticklabels=labels)
  
    # Make x-axis labels appear on bottom
    ax.xaxis.set_label_position("bottom")
    ax.xaxis.tick_bottom()
    ### Added: Rotate xticks for readability & increase font size (required due to such a large confusion matrix)
    plt.xticks(rotation=90, fontsize=text_size)
    plt.yticks(fontsize=text_size)

    # Set the threshold for different colors
    threshold = (cm.max() + cm.min()) / 2.

    # Plot the text on each cell
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        if norm:
            plt.text(j, i, f"{cm[i, j]} ({cm_norm[i, j]*100:.1f}%)",
                horizontalalignment="center",
                color="white" if cm[i, j] > threshold else "black",
                size=text_size)
        else:
            plt.text(j, i, f"{cm[i, j]}",
              horizontalalignment="center",
              color="white" if cm[i, j] > threshold else "black",
              size=text_size)

  # Save the figure to the current working directory
    if savefig:
        fig.savefig("confusion_matrix.png")


In [None]:
make_confusion_matrix(y_test, pred, list(labels.values()))

In [None]:
# Load and prepare image as a model-ready array
def get_img_array(img_path, size):
    img = tf.keras.preprocessing.image.load_img(img_path, target_size=size)
    array = tf.keras.preprocessing.image.img_to_array(img)
    array = np.expand_dims(array, axis=0)
    return array
# Create Grad-CAM heatmap to visualize model focus
def make_gradcam_heatmap(img_array, model, last_conv_layer_name, pred_index=None):

    grad_model = tf.keras.models.Model(
        [model.inputs], [model.get_layer(last_conv_layer_name).output, model.output]
    )

    with tf.GradientTape() as tape:
        last_conv_layer_output, preds = grad_model(img_array)
        if pred_index is None:
            pred_index = tf.argmax(preds[0])
        class_channel = preds[:, pred_index]

    grads = tape.gradient(class_channel, last_conv_layer_output)


    pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))


    last_conv_layer_output = last_conv_layer_output[0]
    heatmap = last_conv_layer_output @ pooled_grads[..., tf.newaxis]
    heatmap = tf.squeeze(heatmap)


    heatmap = tf.maximum(heatmap, 0) / tf.math.reduce_max(heatmap)
    return heatmap.numpy()
# Overlay Grad-CAM heatmap on original image and save it
def save_and_display_gradcam(img_path, heatmap, cam_path="cam.jpg", alpha=0.4):

    img = tf.keras.preprocessing.image.load_img(img_path)
    img = tf.keras.preprocessing.image.img_to_array(img)


    heatmap = np.uint8(255 * heatmap)


    jet = cm.get_cmap("jet")


    jet_colors = jet(np.arange(256))[:, :3]
    jet_heatmap = jet_colors[heatmap]


    jet_heatmap = tf.keras.preprocessing.image.array_to_img(jet_heatmap)
    jet_heatmap = jet_heatmap.resize((img.shape[1], img.shape[0]))
    jet_heatmap = tf.keras.preprocessing.image.img_to_array(jet_heatmap)


    superimposed_img = jet_heatmap * alpha + img
    superimposed_img = tf.keras.preprocessing.image.array_to_img(superimposed_img)

    superimposed_img.save(cam_path)


    
    return cam_path
    

preprocess_input = tf.keras.applications.efficientnet.preprocess_input
decode_predictions = tf.keras.applications.efficientnet.decode_predictions

last_conv_layer_name = "top_conv"
img_size = (224,224, 3)

# Remove last layer's softmax
model.layers[-1].activation = None

In [None]:
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt

# Path to the image
image_path = r"c:\Users\joshu\Downloads\Screenshot 2024-09-20 220026.png"

# Define the target size (same as the input size of your model)
target_size = (224, 224)

# Function to preprocess and predict a single image
def classify_single_image(image_path, model, target_size):
    # Load and preprocess the image
    img = tf.keras.preprocessing.image.load_img(image_path, target_size=target_size)
    img_array = tf.keras.preprocessing.image.img_to_array(img)
    img_array = np.expand_dims(img_array, axis=0)  # Expand dimensions to create batch size of 1
    img_array = tf.keras.applications.efficientnet_v2.preprocess_input(img_array)  # Preprocess as per EfficientNetV2

    # Make predictions
    predictions = model.predict(img_array)
    
    # Get the class labels from the model
    labels = (train_images.class_indices)
    labels = dict((v, k) for k, v in labels.items())  # Reverse the dictionary
    
    # Get the predicted classes and their corresponding confidence scores
    predicted_classes = np.argsort(predictions[0])[::-1]  # Sort by confidence
    predicted_confidences = np.sort(predictions[0])[::-1]  # Sorted confidences
    
    # Plot the image with its predicted labels and confidences
    plt.imshow(img)
    plt.title("Predictions and Confidences")
    plt.axis('off')  # Turn off axis labels
    plt.show()
    
    # Print all predicted classes and their confidences
    for i in range(len(predicted_classes)):
        class_label = labels[predicted_classes[i]]
        confidence = predicted_confidences[i] * 100
        print(f"Class: {class_label}, Confidence: {confidence:.2f}%")

    # Return the predicted classes and confidences as a dictionary
    return {labels[predicted_classes[i]]: predicted_confidences[i] for i in range(len(predicted_classes))}

# Run the classification on the single image
predicted_labels_and_confidences = classify_single_image(image_path, model, target_size)
print(predicted_labels_and_confidences)


In [None]:
# Extract class labels from the training data generator
labels = (train_images.class_indices)
labels = dict((v, k) for k, v in labels.items())  # Reverse the dictionary

# Save labels to a JSON file for later use
import json
with open('class_labels.json', 'w') as f:
    json.dump(labels, f)

print(labels)


In [None]:
# Show Grad-CAM heatmaps highlighting regions used for prediction

fig, axes = plt.subplots(nrows=3, ncols=5, figsize=(15, 10),
                        subplot_kw={'xticks': [], 'yticks': []})

for i, ax in enumerate(axes.flat):
    img_path = test_df.Filepath.iloc[random_index[i]]
    img_array = preprocess_input(get_img_array(img_path, size=img_size))
    heatmap = make_gradcam_heatmap(img_array, model, last_conv_layer_name)
    cam_path = save_and_display_gradcam(img_path, heatmap)
    ax.imshow(plt.imread(cam_path))
    ax.set_title(f"True: {test_df.Label.iloc[random_index[i]]}\nPredicted: {pred[random_index[i]]}")
plt.tight_layout()
plt.show()