## Setup

In [1]:
# Import libraries
import cv2
from glob import glob
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import pickle
import tensorflow as tf
# from tensorflow import keras
from tensorflow.data import Dataset # type: ignore
from tensorflow.keras import layers, mixed_precision, Sequential # type: ignore
from tensorflow.keras.applications import EfficientNetB0, EfficientNetB3 # type: ignore
from tensorflow.keras.layers import BatchNormalization, Conv2D, Dense, Dropout, Flatten, GlobalAveragePooling2D, MaxPooling2D, SpatialDropout2D # type: ignore
from tensorflow.keras.models import Model # type: ignore
# from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.utils.class_weight import compute_class_weight

2025-03-10 14:24:36.104555: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-03-10 14:24:36.249691: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1741638276.302724   65669 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1741638276.317657   65669 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-03-10 14:24:36.450048: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instr

In [2]:
gpus = tf.config.list_physical_devices('GPU')

if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
    except RuntimeError as e:
        print(e)

mixed_precision.set_global_policy("mixed_float16")
os.environ["TF_XLA_FLAGS"] = "--tf_xla_auto_jit=2 --tf_xla_cpu_global_jit"

tf.config.set_logical_device_configuration(
    gpus[0],
    [tf.config.LogicalDeviceConfiguration(memory_limit=14000)]  # Adjust memory limit (MB) if needed
)


In [None]:
# print(tf.sysconfig.get_build_info())
# print(f'tensorflow version {tf.__version__}')
# print("Num GPUs Available:", len(tf.config.experimental.list_physical_devices('GPU')))
# print(tf.config.list_physical_devices('GPU'))
# print(tf.config.list_logical_devices('GPU'))

In [None]:
# !echo "Downloading files..."
# !wget -q https://github.com/byui-cse/cse450-course/raw/master/data/roadsigns/training1.zip
# !wget -q https://github.com/byui-cse/cse450-course/raw/master/data/roadsigns/training2.zip
# !wget -q https://github.com/byui-cse/cse450-course/raw/master/data/roadsigns/holdout.zip
# !wget -q https://github.com/byui-cse/cse450-course/raw/master/data/roadsigns/mini_holdout.zip
# !wget -q https://github.com/byui-cse/cse450-course/raw/master/data/roadsigns/mini_holdout_answers.csv

## Dataset Preparation

### Upscaling

In [None]:
### This has been done already, no need to do again.
# # Define paths
# input_folder = "original"
# output_folder = "upscaled"
# os.makedirs(output_folder, exist_ok=True)  # Create output directory if not exists

# # Set target resolution
# target_size = (300, 300)  # Change this as needed (e.g., 300x300 for B3)

# # Get all image files
# image_files = []
# image_files.extend(glob(os.path.join(input_folder, "**", "*.jpg"), recursive=True))

# # Process images
# for img_path in image_files:
#     # Determine relative path inside original folder
#     relative_path = os.path.relpath(img_path, input_folder)

#     # Create corresponding output path
#     output_path = os.path.join(output_folder, relative_path)

#     # Ensure the target directory exists
#     os.makedirs(os.path.dirname(output_path), exist_ok=True)

#     # Load and resize image
#     img = cv2.imread(img_path)
#     if img is None:
#         print(f"Skipping corrupted file: {img_path}")
#         continue

#     img_resized = cv2.resize(img, target_size, interpolation=cv2.INTER_LANCZOS4)

#     # Save the resized image
#     cv2.imwrite(output_path, img_resized)

# print(f"Resized {len(image_files)} images and saved them to {output_folder}")

Resized 52040 images and saved them to upscaled


### General

In [3]:
# Define paths
dataset_path = "upscaled/training"
batch_size = 32
img_size = (300, 300)

# Get class names from directory structure
class_names = sorted(os.listdir(dataset_path))
# class_names = ['Speed_20', 'Speed_30', 'Speed_50', 'Speed_60', 'Speed_70',
#                'Speed_80','Speed_Limit_Ends', 'Speed_100', 'Speed_120', 'Overtaking_Prohibited',
#                'Overtakeing_Prohibited_Trucks', 'Crossroad_Ahead', 'Priority_Road_Ahead', 'Yield', 'STOP',
#                'Entry_Forbidden', 'Trucks_Forbidden', 'No_Entry(one-way traffic)', 'Cars_Prohibited(!)', 'Left_Curve_Ahead',
#                'Right_Curve_Ahead', 'Bends_Left_Then_Right', 'Poor_Surface_Ahead', 'Slippery_Surface_Ahead', 'Road_Narrows_On_Right',
#                'Roadwork_Ahead', 'Traffic_Light_Ahead', 'Warning_Pedestrians', 'Warning_Children', 'Warning_Bikes',
#                'Uncontrolled_Crossroad', 'Deer_Crossing', 'End_Previous_Limitation', 'Turning_Right_Compulsory', 'Turning_Left_Compulsory',
#                'Ahead_Only', 'Straight_Or_Right_Mandatory', 'Straight_Or_Left_Mandatory', 'Passing_Right_Compulsory', 'Passing_Left_Compulsory',
#                'Roundabout', 'End_Overtaking_Prohibition', 'End_Overtaking_Prohibition_Trucks']
num_classes = len(class_names)

# Get all image file paths
image_paths = tf.io.gfile.glob(os.path.join(dataset_path, "*", "*.jpg"))  # Adjust extension if needed


In [4]:
def load_and_preprocess_image(image_path):
    try:
        image = tf.io.read_file(image_path)
        image = tf.image.decode_jpeg(image, channels=3)
        image = tf.image.resize(image, img_size)
        image = tf.cast(image, tf.float32)
        # Normalize to [-1, 1]
        image = image / 255.0
        return image
    except Exception as e:
        tf.print("Error processing", image_path, ":", e)
        # Return a tensor of zeros (or use tf.data.experimental.ignore_errors() downstream)
        return tf.zeros([*img_size, 3], dtype=tf.float32)

def get_label(image_path):
    parts = tf.strings.split(image_path, os.sep)
    label_str = parts[-2]  # Extract folder name (e.g., "0")
    
    try:
        label = tf.strings.to_number(label_str, out_type=tf.int32)  # Convert safely to integer
    except:
        print(f"Error converting label: {label_str}")
        label = tf.constant(-1, dtype=tf.int32)  # Assign -1 if an error occurs
    
    return label

def adjust_brightness(image, min_brightness=0.4, max_brightness=1.0):
    grayscale = tf.image.rgb_to_grayscale(image)
    mean_brightness = tf.reduce_mean(grayscale)

    brightness_factor = tf.maximum(min_brightness / mean_brightness, 1.0)  # Ensures valid scaling
    adjusted_image = tf.clip_by_value(image * brightness_factor, 0.0, max_brightness)

    return adjusted_image

class F1Score(tf.keras.metrics.Metric):
    def __init__(self, name='f1_score', **kwargs):
        super(F1Score, self).__init__(name=name, **kwargs)
        self.precision = tf.keras.metrics.Precision()
        self.recall = tf.keras.metrics.Recall()

    def update_state(self, y_true, y_pred, sample_weight=None):
        self.precision.update_state(y_true, y_pred, sample_weight)
        self.recall.update_state(y_true, y_pred, sample_weight)

    def result(self):
        precision = self.precision.result()
        recall = self.recall.result()
        return 2 * ((precision * recall) / (precision + recall + tf.keras.backend.epsilon()))

    def reset_states(self):
        self.precision.reset_states()
        self.recall.reset_states()


# def augment(image_path, label):
#     image = load_and_preprocess_image(image_path)  # Your function that loads & resizes images
#     image = data_augmentation(image)  # Apply augmentation
#     return image, label


In [5]:
data_augmentation = Sequential([
    layers.Lambda(lambda x: adjust_brightness(x)),
    layers.Rescaling(1./255),  # Normalize to [0,1]
    layers.RandomFlip("horizontal"),  # Random horizontal flips
    layers.RandomRotation(0.2),  # Rotate by up to 10%
    layers.RandomZoom(0.3),  # Random zoom
    layers.RandomContrast(0.3),  # Adjust contrast slightly
])

I0000 00:00:1741638291.661299   65669 gpu_device.cc:2022] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 14000 MB memory:  -> device: 0, name: NVIDIA GeForce RTX 4080 SUPER, pci bus id: 0000:01:00.0, compute capability: 8.9


In [13]:
# # print(get_label(image_paths[0]))  # Test the label extraction
# for image in image_paths:
#     img = cv2.imread(image)
#     print("corrupted" if img is None else "", sep='')  # If True, the image is corrupted

# print(load_and_preprocess_image(image_paths[0]))  # Test the image loading and preprocessing

In [6]:
auto = tf.data.AUTOTUNE
# Convert file paths into a tf.data.Dataset
dataset = tf.data.Dataset.from_tensor_slices(image_paths)
# Apply preprocessing and labeling functions
dataset = dataset.map(lambda x: (load_and_preprocess_image(x), get_label(x)), num_parallel_calls=auto)

# Shuffle and split dataset
dataset = dataset.shuffle(buffer_size=1024, seed=42)

train_size = int(0.8 * len(image_paths))  # 80% for training
train_ds = dataset.take(train_size).batch(batch_size).prefetch(auto)
val_ds = dataset.skip(train_size).batch(batch_size).prefetch(auto)

In [None]:
# # Compute class weights
# class_labels = np.array([get_label(img_path).numpy() for img_path in image_paths])
# class_weights = compute_class_weight(class_weight="balanced", classes=np.unique(class_labels), y=class_labels)
# class_weight_dict = dict(enumerate(class_weights))
# pickle.dump(class_weight_dict, open("class_weight_dict.pkl", "wb"))

In [7]:
class_weight_dict = pickle.load(open("class_weight_dict.pkl", "rb"))
# class_weight_dict = {int(k): float(v) for k, v in class_weight_dict.items()}

In [9]:
for k, v in class_weight_dict.items():
    print(f"Class {type(k)}: {type(v)}")
    break

Class <class 'int'>: <class 'numpy.float64'>


## Model Generation

In [46]:
inputs = tf.keras.Input(shape=(*img_size, 3))

# Apply in-model data augmentation
x = data_augmentation(inputs)

''' EficientNetB0
# Load EfficientNet and attach it
base_model = EfficientNetB0(weights='imagenet', include_top=False, input_tensor=x)
# base_model.trainable = False

x = SpatialDropout2D(0.5)(base_model.output)
x = GlobalAveragePooling2D()(x)
x = Dropout(0.5)(x)
x = Dense(512, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.01))(x)
x = Dropout(0.5)(x)
outputs = Dense(num_classes, activation='softmax')(x)

'''

''' Custom model'''
x = Conv2D(32, (3, 3), activation='relu', padding='same')(x)
x = BatchNormalization()(x)
x = MaxPooling2D((2, 2))(x)

x = Conv2D(64, (3, 3), activation='relu', padding='same')(x)
x = BatchNormalization()(x)
x = MaxPooling2D((2, 2))(x)

x = Conv2D(128, (3, 3), activation='relu', padding='same')(x)
x = BatchNormalization()(x)
x = MaxPooling2D((2, 2))(x)

x = Conv2D(256, (3, 3), activation='relu', padding='same')(x)
x = BatchNormalization()(x)
x = MaxPooling2D((2, 2))(x)

# Feature Extraction
x = GlobalAveragePooling2D()(x)  # Replaces Flatten()

# Fully Connected Layers
x = Dropout(0.5)(x)
x = Dense(512, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.01))(x)
x = Dropout(0.5)(x)
outputs = Dense(num_classes, activation='softmax')(x)
''''''


# Define model explicitly
model = tf.keras.Model(inputs, outputs)

In [67]:
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0005),
                loss='sparse_categorical_crossentropy',
                # metrics=['accuracy'])
                metrics=[
                    'accuracy',  
                    tf.keras.metrics.Precision(name='precision'),
                    tf.keras.metrics.Recall(name='recall')
                    # F1Score(name='f1_score')
                ])


In [48]:
tf.keras.backend.clear_session()  # Clear session to avoid clutter from old models
warmup_data = tf.convert_to_tensor(next(iter(train_ds))[0])  # Get one batch
model.predict(warmup_data)  # Run one inference step


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 999ms/step


array([[0.02325764, 0.02325537, 0.0232521 , ..., 0.02325678, 0.02325448,
        0.02325964],
       [0.02325882, 0.02325521, 0.02324994, ..., 0.02325723, 0.02325425,
        0.02326255],
       [0.02325742, 0.02325494, 0.02325243, ..., 0.0232563 , 0.02325619,
        0.02326008],
       ...,
       [0.02325719, 0.02325545, 0.02325302, ..., 0.02325636, 0.02325557,
        0.02325924],
       [0.02325695, 0.02325559, 0.02325359, ..., 0.0232564 , 0.023255  ,
        0.02325826],
       [0.02325742, 0.02325542, 0.02325278, ..., 0.02325635, 0.02325544,
        0.02325953]], dtype=float32)

In [66]:
for image, label in train_ds.take(1):  # Take a sample batch
    print("Image shape:", image.shape)  # Should be (batch_size, height, width, channels)
    print("Label shape:", label.shape)  # Should be (batch_size,) for sparse labels
    print("Label sample:", label.numpy())


Image shape: (32, 300, 300, 3)
Label shape: (32,)
Label sample: [14 14 30 30 14 14 14 30 14 14 14 14 30 30 30 14 30 14 14 14 14 14 14 30
 30 30 30 14 14 30 14 14]


In [70]:
history = model.fit(
    train_ds,
    epochs=10,  # Start small; increase if needed
    validation_data=val_ds,
    class_weight=class_weight_dict
)

Epoch 1/10


: 

In [49]:
model_filepath = "models/custom_300_take1.keras"

In [24]:
model.save(model_filepath)

In [15]:
model = tf.keras.models.load_model(model_filepath)

## Post-Gen Exploration

In [4]:
# View 9 images and their class labels
plt.figure(figsize=(10, 10))
images, labels = next(train_generator)  # Assuming train_generator is a generator
batch_size = images.shape[0]

for i in range(min(9, batch_size)):
    ax = plt.subplot(3, 3, i + 1)
    plt.imshow((images[i] * 255).astype("uint8"))
    plt.title(int(labels[i]))
    plt.axis("off")

plt.show()

NameError: name 'train_generator' is not defined

<Figure size 1000x1000 with 0 Axes>

## Mini-Holdout

In [31]:
# Path to new images
mini_path = "original/mini_holdout"  # Change this to your actual folder
mini_files = sorted([os.path.join(mini_path, f) for f in os.listdir(mini_path) if f.endswith((".jpg"))])

# Load and preprocess all new images
mini = np.array([load_and_preprocess_image(img_path) for img_path in mini_files])


In [32]:
print(model.input_shape)
print(mini.shape)

(None, 100, 100, 3)
(201, 100, 100, 3)


In [33]:
# Predict class probabilities
predictions = model.predict(mini)

# Convert probabilities to class labels
predicted_classes = np.argmax(predictions, axis=1)

[1m7/7[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 28ms/step


In [34]:
for i in range(0, 200, 10):
    print(mini_files[i], predicted_classes[i])

original/mini_holdout/00000.jpg 37
original/mini_holdout/00010.jpg 37
original/mini_holdout/00020.jpg 37
original/mini_holdout/00030.jpg 37
original/mini_holdout/00040.jpg 37
original/mini_holdout/00050.jpg 37
original/mini_holdout/00060.jpg 37
original/mini_holdout/00070.jpg 37
original/mini_holdout/00080.jpg 37
original/mini_holdout/00090.jpg 37
original/mini_holdout/00100.jpg 37
original/mini_holdout/00110.jpg 37
original/mini_holdout/00120.jpg 37
original/mini_holdout/00130.jpg 37
original/mini_holdout/00140.jpg 37
original/mini_holdout/00150.jpg 37
original/mini_holdout/00160.jpg 37
original/mini_holdout/00170.jpg 37
original/mini_holdout/00180.jpg 37
original/mini_holdout/00190.jpg 37


In [35]:
mini_answers = pd.read_csv("mini_holdout_answers.csv")
# Create a mapping from filename → class
filename_to_label = dict(zip(mini_answers["Filename"], mini_answers["ClassId"]))

# Extract true labels in the same order as mini_files
true_labels = [filename_to_label[os.path.basename(f)] for f in mini_files]

# Convert to NumPy array (for compatibility with sklearn)
true_labels = np.array(true_labels)

In [36]:
# Compute metrics
accuracy = accuracy_score(true_labels, predicted_classes)
precision = precision_score(true_labels, predicted_classes, average='weighted')
recall = recall_score(true_labels, predicted_classes, average='weighted')
f1 = f1_score(true_labels, predicted_classes, average='weighted')

# Print results
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

Accuracy: 0.0000
Precision: 0.0000
Recall: 0.0000
F1 Score: 0.0000


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


## Testing the model
Once you have built and trained your model, the next step is to run the mini holdout images through it and see how well your model does at making predictions for images it has never seen before.

Since loading these images and formatting them for the model can be tricky, you may find the following code useful. This code only uses your model to predict the class label for a given image. You'll still need to compare those predictions to the "ground truth" class labels in `mini_holdout_answers.csv` to evaluate how well the model does.

Previously, you were given a file that would check your results. This time you're given the answers to the first mini holdout dataset. You'll need to compare those predictions against the "ground truth" class labels in `mini_holdout_answers.csv` to evaluate how well the model does.

Make sure to use the insights gained from the mini hold out dataset in your executive summary.


```
from tensorflow.keras.preprocessing import image_dataset_from_directory
test_dir = '/content/'

test_datagen = ImageDataGenerator(rescale=1./255)
test_generator = test_datagen.flow_from_directory(
        test_dir,
        classes=['mini_holdout'],
        target_size=image_size,
        class_mode='sparse',
        shuffle=False)
probabilities = model.predict(test_generator)
predictions = [np.argmax(probas) for probas in probabilities]
```



##Mini Hold out Dataset


Once you feel confident, you will need to predict for the full holdout dataset using the following code, and submit your csv file:

```
from tensorflow.keras.preprocessing import image_dataset_from_directory
test_dir = '/content/'

test_datagen = ImageDataGenerator(rescale=1./255)
test_generator = test_datagen.flow_from_directory(
        test_dir,
        classes=['holdout'],
        target_size=image_size,
        class_mode='sparse',
        shuffle=False)
probabilities = model.predict(test_generator)
predictions = [np.argmax(probas) for probas in probabilities]
```