In [None]:
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.layers import Dense, Flatten, Input
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from pathlib import Path

# 1. Data Preprocessing

# Define image directory and load CSV
image_directory = 'D:\\CPEN355_project\\Data\\DVM_noNest_test'
data = pd.read_csv('D:\\CPEN355_project\\Data\\Image_table.csv')

# Parse Image_name to extract Maker, Year, Genmodel_ID
def parse_image_name(image_name):
    parts = image_name.split('$$')
    maker = parts[0]
    year = parts[2]
    genmodel_id = parts[4]
    return maker, year, genmodel_id

data[['Maker', 'Year', 'Genmodel_ID']] = data['Image_name'].apply(
    lambda x: pd.Series(parse_image_name(x)))

# Count Genmodel_ID occurrences and filter
genmodel_counts = data['Genmodel_ID'].value_counts()
valid_genmodels = genmodel_counts[genmodel_counts > 300].index.tolist()
filtered_data = data[data['Genmodel_ID'].isin(valid_genmodels)]

# Randomly sample 200 images per Genmodel_ID
def sample_images(df):
    return df.sample(n=100, random_state=42)

filtered_data = filtered_data.groupby('Genmodel_ID').apply(sample_images).reset_index(drop=True)

# Add full image paths
filtered_data['Image_path'] = filtered_data['Image_name'].apply(
    lambda x: f"{image_directory}\\{x}")

# Filter non-existent images
def image_exists(path):
    return Path(path).is_file()

exists_mask = filtered_data['Image_path'].apply(image_exists)
filtered_data = filtered_data[exists_mask].reset_index(drop=True)

# Encode labels
label_encoders = {}
for label, name in zip(['Maker', 'Year', 'Genmodel_ID'], ['Maker', 'Year', 'Genmodel_ID']):
    le = LabelEncoder()
    label_encoders[name] = le
    filtered_data[name + '_enc'] = le.fit_transform(filtered_data[name])

# Prepare data
image_paths = filtered_data['Image_path']
maker_labels = filtered_data['Maker_enc'].values
year_labels = filtered_data['Year_enc'].values
genmodel_labels = filtered_data['Genmodel_ID_enc'].values

# Split into train and test sets
X_train_paths, X_val_paths, y_train_maker, y_val_maker, y_train_year, y_val_year, y_train_genmodel, y_val_genmodel = train_test_split(
    image_paths, maker_labels, year_labels, genmodel_labels, test_size=0.2, random_state=42, stratify=genmodel_labels)

print(f"data preprocessing done")

# 2. Dataset Preparation

img_size = 224
batch_size = 32

# Function to preprocess images
def preprocess_image(image_path, maker_label, year_label, genmodel_label):
    image = tf.io.read_file(image_path)
    image = tf.image.decode_jpeg(image, channels=3)
    image = tf.image.resize(image, [img_size, img_size])
    image = image / 255.0
    return image, {
        'maker_output': maker_label,
        'year_output': year_label,
        'genmodel_output': genmodel_label
    }
# Create datasets
train_dataset = tf.data.Dataset.from_tensor_slices((X_train_paths.values, y_train_maker, y_train_year, y_train_genmodel))
val_dataset = tf.data.Dataset.from_tensor_slices((X_val_paths.values, y_val_maker, y_val_year, y_val_genmodel))

train_dataset = train_dataset.map(preprocess_image, num_parallel_calls=tf.data.AUTOTUNE)
val_dataset = val_dataset.map(preprocess_image, num_parallel_calls=tf.data.AUTOTUNE)

train_dataset = train_dataset.shuffle(buffer_size=1000).batch(batch_size).prefetch(tf.data.AUTOTUNE)
val_dataset = val_dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)

print(f"dataset preparation done")

# 3. Model Construction

# Use pretrained ResNet50 model
base_model = ResNet50(weights='imagenet', include_top=False, input_tensor=Input(shape=(img_size, img_size, 3)))
x = Flatten()(base_model.output)
1
# Add classification heads
maker_output = Dense(len(label_encoders['Maker'].classes_), activation='softmax', name='maker_output')(x)
year_output = Dense(len(label_encoders['Year'].classes_), activation='softmax', name='year_output')(x)
genmodel_output = Dense(len(label_encoders['Genmodel_ID'].classes_), activation='softmax', name='genmodel_output')(x)

model = Model(inputs=base_model.input, outputs=[maker_output, year_output, genmodel_output])

# Freeze the base model layers
for layer in base_model.layers:
    layer.trainable = False

# Compile the model
model.compile(optimizer='adam',
              loss={'maker_output': 'sparse_categorical_crossentropy',
                    'year_output': 'sparse_categorical_crossentropy',
                    'genmodel_output': 'sparse_categorical_crossentropy'},
              metrics={'maker_output': 'accuracy',
                       'year_output': 'accuracy',
                       'genmodel_output': 'accuracy'})

print(f"model construction done")

# 4. Model Training

early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

history = model.fit(
    train_dataset,
    validation_data=val_dataset,
    epochs=20,
    callbacks=[early_stopping]
)

print(f"model training done")

# 5. Model Evaluation

# Evaluate the model on the validation set
eval_results = model.evaluate(val_dataset)
print(eval_results)
print(model.metrics_names)

# Correctly map the evaluation results
metrics_dict = dict(zip(model.metrics_names, eval_results))

print(f"Maker classification accuracy: {metrics_dict['maker_output_accuracy']:.2f}")
print(f"Year classification accuracy: {metrics_dict['year_output_accuracy']:.2f}")
print(f"Genmodel_ID classification accuracy: {metrics_dict['genmodel_output_accuracy']:.2f}")
print(f"Engine type classification accuracy: {metrics_dict['engine_type_accuracy']:.2f}")


# 6. Compute Precision, Recall, and F1-score

# Initialize lists to collect true labels and predictions
y_true_maker = []
y_true_year = []
y_true_genmodel = []
y_pred_maker = []
y_pred_year = []
y_pred_genmodel = []

for images, labels in val_dataset:
    maker_labels = labels['maker_output']
    year_labels = labels['year_output']
    genmodel_labels = labels['genmodel_output']
    preds = model.predict(images)
    pred_maker, pred_year, pred_genmodel = preds

    # Since labels are integers, no need to use np.argmax
    y_true_maker.extend(maker_labels.numpy())
    y_true_year.extend(year_labels.numpy())
    y_true_genmodel.extend(genmodel_labels.numpy())

    # Predictions are still probabilities, so use np.argmax
    y_pred_maker.extend(np.argmax(pred_maker, axis=1))
    y_pred_year.extend(np.argmax(pred_year, axis=1))
    y_pred_genmodel.extend(np.argmax(pred_genmodel, axis=1))

# Compute classification reports
print("Maker classification report:")
print(classification_report(y_true_maker, y_pred_maker))

print("Year classification report:")
print(classification_report(y_true_year, y_pred_year))

print("Genmodel_ID classification report:")
print(classification_report(y_true_genmodel, y_pred_genmodel))


  filtered_data = filtered_data.groupby('Genmodel_ID').apply(sample_images).reset_index(drop=True)


data preprocessing done
dataset preparation done
model construction done
Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
model training done
[2.2779171466827393, 0.0, 2.2779171466827393, 0.0, 0.0, 0.30000001192092896, 0.0]
['loss', 'maker_output_loss', 'year_output_loss', 'genmodel_output_loss', 'maker_output_accuracy', 'year_output_accuracy', 'genmodel_output_accuracy']
Maker classification accuracy: 0.00
Year classification accuracy: 0.30
Genmodel_ID classification accuracy: 0.00
Maker classification report:
              precision    recall  f1-score   support

           0       1.00      1.00      1.00        20

    accuracy                           1.00        20
   macro avg       1.00      1.00      1.00        20
weighted avg       1.00      1.00      1.00        20

Year classification report:


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
