In [1]:
import os
import math
import tensorflow as tf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.applications import VGG16
from tensorflow.keras.layers import Dense, Dropout, GlobalAveragePooling2D, Input, BatchNormalization, LeakyReLU, Conv2D, MaxPooling2D, Concatenate
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers.experimental import AdamW
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping, ModelCheckpoint
tf.random.set_seed(42)

In [2]:
class DataProcessor:
    def __init__(self, base_path):
        self.base_path = base_path
        self.image_size = (360, 360)
        self.label_encoder = None
    
    def apply_augmentation(self, image, bbox):
        bbox = tf.cast(bbox, tf.float32)
        
        h, w = float(self.image_size[0]), float(self.image_size[1])
        bbox_pixel = bbox * tf.constant([w, h, w, h], dtype=tf.float32)
        
        if tf.random.uniform([]) > 0.5:
            image = tf.image.flip_left_right(image)
            bbox_pixel = tf.stack([
                w - bbox_pixel[2],
                bbox_pixel[1],    
                w - bbox_pixel[0], 
                bbox_pixel[3]      
            ])
        
        if tf.random.uniform([]) > 0.5:
            angle = tf.random.uniform([], minval=-15, maxval=15) * math.pi / 180
            image = tf.keras.layers.RandomRotation(
                factor=(-15/360, 15/360),
                fill_mode='constant',
                fill_value=1.0
            )(image)
            
            center_x = (bbox_pixel[0] + bbox_pixel[2]) / 2
            center_y = (bbox_pixel[1] + bbox_pixel[3]) / 2
            width = bbox_pixel[2] - bbox_pixel[0]
            height = bbox_pixel[3] - bbox_pixel[1]
            
            cos_theta = tf.cos(angle)
            sin_theta = tf.sin(angle)
            new_center_x = (center_x - w/2) * cos_theta - (center_y - h/2) * sin_theta + w/2
            new_center_y = (center_x - w/2) * sin_theta + (center_y - h/2) * cos_theta + h/2
            
            expansion_factor = 1.2
            new_width = width * expansion_factor
            new_height = height * expansion_factor
            
            bbox_pixel = tf.stack([
                tf.clip_by_value(new_center_x - new_width/2, 0, w),
                tf.clip_by_value(new_center_y - new_height/2, 0, h),
                tf.clip_by_value(new_center_x + new_width/2, 0, w),
                tf.clip_by_value(new_center_y + new_height/2, 0, h)
            ])
        
        if tf.random.uniform([]) > 0.5:
            zoom_factor = tf.random.uniform([], minval=0.9, maxval=1.0)
            image = tf.image.central_crop(image, zoom_factor)
            image = tf.image.resize(image, self.image_size)
            
            bbox_pixel = bbox_pixel * zoom_factor
            bbox_pixel = tf.clip_by_value(bbox_pixel, 0, tf.maximum(w, h))
        
        if tf.random.uniform([]) > 0.5:
            image = tf.image.random_brightness(image, 0.2)
            image = tf.image.random_contrast(image, 0.8, 1.2)
            image = tf.image.random_saturation(image, 0.8, 1.2)
            image = tf.image.random_hue(image, 0.1)
        
        if tf.random.uniform([]) > 0.5:
            noise = tf.random.normal(shape=tf.shape(image), mean=0.0, stddev=0.01)
            image = tf.clip_by_value(image + noise, 0.0, 1.0)
        
        bbox = bbox_pixel / tf.constant([w, h, w, h], dtype=tf.float32)
        return image, bbox

    def load_and_preprocess_image(self, filename, label, bbox):
        try:
            img = tf.io.read_file(filename)
            img = tf.image.decode_jpeg(img, channels=3)
            img = tf.image.resize(img, self.image_size)
            img = tf.cast(img, tf.float32) / 255.0
            
            img, bbox = self.apply_augmentation(img, bbox)
            
            return img, label, bbox
        except tf.errors.NotFoundError:
            tf.print(f"File not found: {filename}")
            return None, None, None
            
    def create_dataset(self, csv_file, base_dir, is_training=True, batch_size=16):
        df = pd.read_csv(csv_file)
        
        if self.label_encoder is None:
            self.label_encoder = LabelEncoder()
            df['class'] = self.label_encoder.fit_transform(df['class'])
        else:
            df['class'] = self.label_encoder.transform(df['class'])
            
        filenames = df['filename'].apply(lambda x: os.path.join(base_dir, x)).values
        labels = df['class'].values
        labels_one_hot = tf.keras.utils.to_categorical(labels, num_classes=len(self.label_encoder.classes_))
        
        bboxes = df[['xmin', 'ymin', 'xmax', 'ymax']].values.astype(np.float32)
        
        bboxes = bboxes / np.array([self.image_size[1], self.image_size[0], 
                                   self.image_size[1], self.image_size[0]], 
                                   dtype=np.float32)
        
        dataset = tf.data.Dataset.from_tensor_slices((filenames, labels_one_hot, bboxes))
        dataset = dataset.map(
            lambda f, l, b: (self.load_and_preprocess_image(f, l, b)),
            num_parallel_calls=tf.data.AUTOTUNE
        )
        dataset = dataset.filter(lambda x, y, z: x is not None)
        
        def prepare_data(img, label, bbox):
            return img, {'class_output': label, 'bbox_output': bbox}
        
        dataset = dataset.map(prepare_data)
        
        if is_training:
            dataset = dataset.shuffle(buffer_size=1000)
        
        dataset = dataset.batch(batch_size)
        dataset = dataset.prefetch(buffer_size=tf.data.AUTOTUNE)
        
        return dataset


In [3]:
class VisionModel:
    def __init__(self, num_classes, max_objects=10):
        self.num_classes = num_classes
        self.max_objects = max_objects  # Maximum number of objects to detect
        self.model = self._build_model()
        
    def _build_model(self):
        inputs = Input(shape=(360, 360, 3))
        
        # Base model
        base_model = VGG16(weights='imagenet', include_top=False, input_shape=(360, 360, 3))
        for layer in base_model.layers[:-4]:
            layer.trainable = False
        for layer in base_model.layers:
            if isinstance(layer, Conv2D):
                layer.kernel_initializer = tf.keras.initializers.HeNormal()
        
        x = base_model(inputs, training=True)
        
        # Modified conv_block with additional features
        def conv_block(x, filters):
            x = Conv2D(filters, (3, 3), padding='same', kernel_initializer='he_normal')(x)
            x = BatchNormalization()(x)
            x = LeakyReLU(alpha=0.1)(x)
            # Remove MaxPooling to preserve spatial information
            return x
        
        # Feature Pyramid Network (FPN)-like structure
        features = []
        x1 = conv_block(x, 256)
        features.append(x1)
        
        x2 = conv_block(x1, 256)
        features.append(x2)
        
        x3 = conv_block(x2, 256)
        features.append(x3)
        
        # Combine features from different scales
        combined_features = Concatenate()([
            GlobalAveragePooling2D()(f) for f in features
        ])
        
        def fc_block(x, units, dropout_rate=0.5):
            skip = x
            x = Dense(units)(x)
            x = BatchNormalization()(x)
            x = LeakyReLU(alpha=0.1)(x)
            x = Dropout(dropout_rate)(x)
            if skip.shape[-1] == units:
                x = x + skip
            return x
        
        x = fc_block(combined_features, 512)
        x = fc_block(x, 256)
        x = fc_block(x, 128)
        
        # Multiple object detection heads
        class_outputs = []
        bbox_outputs = []
        confidence_outputs = []
        
        for _ in range(self.max_objects):
            # Classification branch
            class_branch = Dense(64)(x)
            class_branch = LeakyReLU(alpha=0.1)(class_branch)
            class_output = Dense(self.num_classes, activation='softmax')(class_branch)
            class_outputs.append(class_output)
            
            # Bounding box branch
            bbox_branch = Dense(64)(x)
            bbox_branch = LeakyReLU(alpha=0.1)(bbox_branch)
            bbox_output = Dense(4, activation='sigmoid')(bbox_branch)  # x, y, w, h
            bbox_outputs.append(bbox_output)
            
            # Confidence branch
            conf_branch = Dense(32)(x)
            conf_branch = LeakyReLU(alpha=0.1)(conf_branch)
            conf_output = Dense(1, activation='sigmoid')(conf_branch)  # Object confidence
            confidence_outputs.append(conf_output)
        
        # Combine outputs
        class_output = Concatenate(axis=-2)(class_outputs)
        bbox_output = Concatenate(axis=-2)(bbox_outputs)
        confidence_output = Concatenate(axis=-2)(confidence_outputs)
        
        model = Model(
            inputs=inputs,
            outputs={
                'class_output': class_output,  # Shape: (batch, max_objects, num_classes)
                'bbox_output': bbox_output,    # Shape: (batch, max_objects, 4)
                'conf_output': confidence_output  # Shape: (batch, max_objects, 1)
            }
        )
        return model
        
    def compile_model(self, learning_rate=0.0001):
        optimizer = AdamW(
            learning_rate=learning_rate,
            weight_decay=0.0001,
            beta_1=0.9,
            beta_2=0.999,
            epsilon=1e-07
        )
        
        def confidence_loss(y_true, y_pred):
            # Custom loss for confidence scores
            return tf.keras.losses.binary_crossentropy(y_true, y_pred)
        
        self.model.compile(
            optimizer=optimizer,
            loss={
                'class_output': 'categorical_crossentropy',
                'bbox_output': 'huber',
                'conf_output': confidence_loss
            },
            loss_weights={
                'class_output': 1.0,
                'bbox_output': 1.0,
                'conf_output': 0.5
            },
            metrics={
                'class_output': ['accuracy'],
                'bbox_output': ['mae'],
                'conf_output': ['accuracy']
            }
        )
    
    def get_callbacks(self, model_dir='model/multi/'):
        # Same as before
        callbacks = [
            ReduceLROnPlateau(
                monitor='val_loss',
                factor=0.5,
                patience=5,
                min_lr=1e-6,
                verbose=1
            ),
            EarlyStopping(
                monitor='val_loss',
                patience=10,
                restore_best_weights=True,
                verbose=1
            ),
            ModelCheckpoint(
                os.path.join(model_dir, 'model-{epoch:02d}-{val_loss:.2f}.keras'),
                monitor='val_loss',
                save_best_only=True,
                verbose=1,
                save_weights_only=False
            )
        ]
        return callbacks

In [4]:
def plot_metrics(history):
    plt.figure(figsize=(12, 5))
    plt.subplot(1, 2, 1)
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Model Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend(loc='upper right')

    plt.subplot(1, 2, 2)
    plt.plot(history.history['class_output_accuracy'], label='Training Accuracy')
    plt.plot(history.history['val_class_output_accuracy'], label='Validation Accuracy')
    plt.title('Model Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend(loc='upper right')
    
    plt.tight_layout()
    plt.show()


In [2]:

def train_model():
    base_path = 'dataset/'
    data_processor = DataProcessor(base_path)
    
    train_dataset = data_processor.create_dataset(
        os.path.join(base_path, 'train', '_annotations.csv'),
        os.path.join(base_path, 'train'),
        is_training=True
    )


    valid_dataset = data_processor.create_dataset(
        os.path.join(base_path, 'valid', '_annotations.csv'),
        os.path.join(base_path, 'valid'),
        is_training=False
    )
    
    model = VisionModel(num_classes=len(data_processor.label_encoder.classes_))
    model.compile_model()
    
    history = model.model.fit(
        train_dataset,
        epochs=100,
        validation_data=valid_dataset,
        callbacks=model.get_callbacks(),
        workers=4,
        use_multiprocessing=True
    )
    
    model_save_path = 'model/multi/my_model.h5'
    model.model.save(model_save_path)
    print(f'Model saved to: {model_save_path}')
    plot_metrics(history)
    return model, history, data_processor


In [6]:
model, history, data_processor = train_model()

Epoch 1/100


AttributeError: in user code:

    File "d:\testing\.testenv\Lib\site-packages\keras\engine\training.py", line 1284, in train_function  *
        return step_function(self, iterator)
    File "d:\testing\.testenv\Lib\site-packages\keras\engine\training.py", line 1268, in step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    File "d:\testing\.testenv\Lib\site-packages\keras\engine\training.py", line 1249, in run_step  **
        outputs = model.train_step(data)
    File "d:\testing\.testenv\Lib\site-packages\keras\engine\training.py", line 1055, in train_step
        return self.compute_metrics(x, y, y_pred, sample_weight)
    File "d:\testing\.testenv\Lib\site-packages\keras\engine\training.py", line 1149, in compute_metrics
        self.compiled_metrics.update_state(y, y_pred, sample_weight)
    File "d:\testing\.testenv\Lib\site-packages\keras\engine\compile_utils.py", line 577, in update_state
        self.build(y_pred, y_true)
    File "d:\testing\.testenv\Lib\site-packages\keras\engine\compile_utils.py", line 483, in build
        self._metrics = tf.__internal__.nest.map_structure_up_to(
    File "d:\testing\.testenv\Lib\site-packages\keras\engine\compile_utils.py", line 631, in _get_metric_objects
        return [self._get_metric_object(m, y_t, y_p) for m in metrics]
    File "d:\testing\.testenv\Lib\site-packages\keras\engine\compile_utils.py", line 631, in <listcomp>
        return [self._get_metric_object(m, y_t, y_p) for m in metrics]
    File "d:\testing\.testenv\Lib\site-packages\keras\engine\compile_utils.py", line 652, in _get_metric_object
        y_t_rank = len(y_t.shape.as_list())

    AttributeError: 'NoneType' object has no attribute 'shape'


In [None]:

test_dataset = data_processor.create_dataset(
        os.path.join('dataset', 'test', '_annotations.csv'),
        os.path.join('dataset', 'test'),
        is_training=False
    )

    # Predictions on test data
class_pred, bbox_pred = model.model.predict(test_dataset)
class_pred_labels = np.argmax(class_pred, axis=1)

    # Extract true class labels from test dataset
true_class_labels = []
for images, labels in test_dataset.unbatch():
    true_class_labels.append(np.argmax(labels['class_output']))

true_class_labels = np.array(true_class_labels)

    # Classification report
print("Classification Report:")
print(classification_report(true_class_labels, class_pred_labels, target_names=data_processor.label_encoder.classes_))

  

In [None]:
  # Confusion Matrix
conf_matrix = confusion_matrix(true_class_labels, class_pred_labels)
plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=data_processor.label_encoder.classes_, yticklabels=data_processor.label_encoder.classes_)
plt.title('Confusion Matrix')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.show()

In [46]:
from tensorflow.keras.models import load_model
model = load_model('model/multi/my_model.h5')

In [None]:
# Evaluate the model
results = model.evaluate(test_dataset)

# Unpack the results
test_loss, class_output_loss, bbox_output_loss, test_accuracy, top_k_categorical_accuracy, bbox_output_mae, bbox_output_mean_squared_error = results

# Print the relevant metrics
print(f'Test Accuracy: {test_accuracy}, Test Loss: {test_loss}')
print(f'Bounding Box MAE: {bbox_output_mae}, Bounding Box MSE: {bbox_output_mean_squared_error}')
print(f'Top-k Categorical Accuracy: {top_k_categorical_accuracy}')


In [None]:
base_path = 'dataset/'
data_processor = DataProcessor(base_path)
test_dataset = data_processor.create_dataset(
    os.path.join(base_path, 'test', '_annotations.csv'),
    os.path.join(base_path, 'test'),
    is_training=False
)

y_pred = model.predict(test_dataset)
print(y_pred)

In [None]:
# Load test dataset CSV file and encode true labels
test_csv_path = os.path.join(base_path, 'test', '_annotations.csv')
df_test = pd.read_csv(test_csv_path)

# Encode class labels using the same LabelEncoder
if data_processor.label_encoder is None:
    data_processor.label_encoder = LabelEncoder()
    data_processor.label_encoder.fit(df_test['class'])

# Convert class labels to integers for true labels
true_labels = data_processor.label_encoder.transform(df_test['class'])

class_predictions, bbox_predictions = y_pred  # Extract class and bbox predictions

# Convert predicted probabilities to class labels
predicted_classes = class_predictions.argmax(axis=1)

# Classification report for precision, recall, and F1-score
print(classification_report(true_labels, predicted_classes, target_names=data_processor.label_encoder.classes_))


In [None]:
def plot_image_with_boxes(image, pred_boxes, true_boxes=None):
    plt.figure(figsize=(5, 5))
    plt.imshow(image, cmap='gray')
    
    # Get image dimensions
    height, width = image.shape[:2]

    if pred_boxes is not None:
        for box in pred_boxes:
            if isinstance(box, np.ndarray):
                x = box[0] * width
                y = box[1] * height
                w = box[2] * width
                h = box[3] * height
                
                plt.gca().add_patch(plt.Rectangle((x, y), w, h, 
                                                fill=False, 
                                                color='red', 
                                                linewidth=2,
                                                label='Predicted'))
    
    # Draw ground truth boxes (handle multiple)
    if true_boxes is not None:
        for box in true_boxes:
            if isinstance(box, np.ndarray):
                x = box[0] * width
                y = box[1] * height
                w = box[2] * width
                h = box[3] * height
                
                plt.gca().add_patch(plt.Rectangle((x, y), w, h, 
                                                fill=False, 
                                                color='green', 
                                                linewidth=2,
                                                label='Ground Truth'))    
                
    plt.legend()
    plt.axis('on')
    plt.tight_layout()
    plt.show()

# Get a batch of data from the test dataset
test_dataset_iter = iter(test_dataset)  # Create an iterator
batch = next(test_dataset_iter)  # Extract one batch

# Extract image and true bounding boxes from the batch
image = batch[0][0].numpy()  # Get the first image in the batch and convert to numpy
true_boxes = batch[1]['bbox_output'][0].numpy()  # Get the true bounding boxes for the first image

# Predict bounding boxes
pred_boxes = model.predict(image[np.newaxis, ...])[1][0]  # Predict bounding boxes and take the first prediction
# Before plotting, let's verify the box coordinates
print("Predicted boxes shape:", pred_boxes.shape)
print("True boxes shape:", true_boxes.shape)
print("Image shape:", image.shape)

# Plot the image with predicted and true bounding boxes
plot_image_with_boxes(image, [pred_boxes], [true_boxes])


In [None]:
num_images = 5  # Set the number of images you want to visualize
for i, batch in enumerate(test_dataset.take(num_images)):
    # Extract image and true bounding boxes
    image = batch[0][0].numpy()
    true_boxes = batch[1]['bbox_output'][0].numpy()
    
    # Predict bounding boxes
    pred_boxes = model.predict(image[np.newaxis, ...])[1][0]
    
    # Plot the image with predicted and true bounding boxes
    plot_image_with_boxes(image, [pred_boxes], [true_boxes])
    
    if i >= num_images - 1:
        break  # Stop after displaying the desired number of images


In [None]:
def plot_image_with_boxes(image, pred_boxes, true_boxes=None, ax=None):
    if ax is None:
        ax = plt.gca()

    ax.imshow(image, cmap='gray')
    
    # Get image dimensions
    height, width = image.shape[:2]

    # Draw predicted boxes
    if pred_boxes is not None:
        for box in pred_boxes:
            x = box[0] * width
            y = box[1] * height
            w = box[2] * width
            h = box[3] * height

            ax.add_patch(plt.Rectangle((x, y), w, h, 
                                         fill=False, 
                                         color='red', 
                                         linewidth=2,
                                         label='Predicted'))

    # Draw ground truth boxes
    if true_boxes is not None:
        for box in true_boxes:
            x = box[0] * width
            y = box[1] * height
            w = box[2] * width
            h = box[3] * height

            ax.add_patch(plt.Rectangle((x, y), w, h, 
                                         fill=False, 
                                         color='green', 
                                         linewidth=2,
                                         label='Ground Truth'))

    ax.legend()

def plot_multiple_images(test_dataset, num_images, rows, cols):
    fig, axes = plt.subplots(rows, cols, figsize=(20, 4*rows))
    axes = axes.ravel()  # Flatten the axes array
    
    for i, batch in enumerate(test_dataset.take(num_images)):
        if i >= num_images:
            break
            
        # Extract image and true bounding boxes
        image = batch[0][0].numpy()
        true_boxes = batch[1]['bbox_output'][0].numpy()
        
        # Predict bounding boxes
        pred_boxes = model.predict(image[np.newaxis, ...])[1][0]
        
        # Plot using the modified function
        plot_image_with_boxes(image, [pred_boxes], [true_boxes], ax=axes[i])
        axes[i].set_title(f'Image {i+1}')
    
    plt.tight_layout()
    plt.show()

# Specify the layout
num_images = 30  # Total number of images
rows = 5       # Number of rows
cols = 6        # Number of columns

# Plot the grid of images
plot_multiple_images(test_dataset, num_images, rows, cols)