In [1]:
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.applications import ResNet50
import numpy as np
import matplotlib.pyplot as plt
import os
import math


In [2]:
def parse_tfrecord(example):
    feature_description = {
        'image/encoded': tf.io.FixedLenFeature([], tf.string),
        'image/height': tf.io.FixedLenFeature([], tf.int64),
        'image/width': tf.io.FixedLenFeature([], tf.int64),
        'image/object/bbox/xmin': tf.io.VarLenFeature(tf.float32),
        'image/object/bbox/xmax': tf.io.VarLenFeature(tf.float32),
        'image/object/bbox/ymin': tf.io.VarLenFeature(tf.float32),
        'image/object/bbox/ymax': tf.io.VarLenFeature(tf.float32),
        'image/object/class/label': tf.io.VarLenFeature(tf.int64),
    }
    parsed_example = tf.io.parse_single_example(example, feature_description)

    # Decode image
    image = tf.image.decode_jpeg(parsed_example['image/encoded'])
    image = tf.image.resize(image, [512, 512]) / 255.0  # Normalize to [0, 1]

    # Decode bounding boxes and labels
    xmin = tf.sparse.to_dense(parsed_example['image/object/bbox/xmin'])
    xmax = tf.sparse.to_dense(parsed_example['image/object/bbox/xmax'])
    ymin = tf.sparse.to_dense(parsed_example['image/object/bbox/ymin'])
    ymax = tf.sparse.to_dense(parsed_example['image/object/bbox/ymax'])
    labels = tf.sparse.to_dense(parsed_example['image/object/class/label'])

    bboxes = tf.stack([ymin, xmin, ymax, xmax], axis=-1)  # Combine into (ymin, xmin, ymax, xmax) format
    return image, bboxes, labels


In [3]:
def load_tfrecord_dataset(tfrecord_path, batch_size=8):
    raw_dataset = tf.data.TFRecordDataset(tfrecord_path)
    parsed_dataset = raw_dataset.map(parse_tfrecord)
    dataset = parsed_dataset.shuffle(1000).batch(batch_size).prefetch(tf.data.AUTOTUNE)
    return dataset


In [4]:
base_model = tf.keras.applications.ResNet50(
    include_top=False, input_shape=(512, 512, 3)
)

feature_extractor = models.Model(
    inputs=base_model.input,
    outputs=base_model.get_layer("conv4_block6_out").output
)


In [5]:
def build_rpn(feature_maps):
    x = layers.Conv2D(512, (3, 3), activation='relu', padding='same')(feature_maps)
    objectness_scores = layers.Conv2D(1, (1, 1), activation='sigmoid')(x)
    bbox_deltas = layers.Conv2D(4, (1, 1))(x)
    return objectness_scores, bbox_deltas


In [6]:
def roi_align(feature_maps, rois, output_size=(7, 7)):
    """
    Perform ROI Align on given feature maps using provided ROIs.

    Args:
        feature_maps: Tensor with shape [batch_size, height, width, channels].
        rois: A tensor of shape [num_rois, 4] representing normalized bounding boxes (in range [0, 1]).
        output_size: Tuple specifying the desired cropped size (default is (7, 7)).

    Returns:
        roi_features: Cropped features after applying ROI Align.
    """
    # Determine number of ROIs
    num_rois = tf.shape(rois)[0]
    
    # Ensure `rois` has the correct rank (remove any unexpected extra dimensions)
    rois = tf.reshape(rois, [num_rois, 4])  # Shape must be [num_rois, 4]

    # Prepare the batch indices corresponding to each ROI
    batch_size = tf.shape(feature_maps)[0]
    batch_indices = tf.random.uniform(
        [num_rois], minval=0, maxval=batch_size, dtype=tf.int32
    )  # Random valid indices for batch

    # Use crop_and_resize for ROI alignment
    roi_features = tf.image.crop_and_resize(
        feature_maps,  # Input feature map
        boxes=rois,  # Normalized bounding boxes
        box_indices=batch_indices,  # Indices indicating the batch dimension
        crop_size=output_size  # Desired size for ROI (e.g., 7x7)
    )
    
    return roi_features


In [7]:
def resize_images(images, target_size=(1024, 1024)):
    """
    Resize images to the expected input size of ResNet50.
    Args:
        images: Input images of shape (batch_size, h, w, 3).
        target_size: Desired target size, default is (1024, 1024).

    Returns:
        Resized images with the shape matching the target_size.
    """
    # Resize each image in the batch
    resized_images = tf.image.resize(images, target_size)
    return resized_images


In [8]:
class DetectionHead(tf.keras.layers.Layer):
    """
    A simple classification head for ROI features.
    """
    def __init__(self, num_classes):
        super(DetectionHead, self).__init__()
        self.num_classes = num_classes
        # Define the layers only once (weights will be initialized once)
        self.dense1 = layers.Dense(256, activation='relu')
        self.classifier = layers.Dense(num_classes, activation='softmax')
        self.bbox_regressor = layers.Dense(4, activation=None)

    def call(self, roi_features):
        """
        Forward pass through the detection head.
        
        Args:
            roi_features: Input features of shape [num_rois, 7, 7, channels].

        Returns:
            class_scores: The predicted class scores.
            bbox_deltas: The predicted bounding box deltas.
        """
        # Flatten the spatial dimensions (7x7) to make it compatible with Dense layers
        x = tf.keras.layers.Flatten()(roi_features)  # Flatten from [7,7,channels] to 1D
        x = self.dense1(x)
        
        # Class scores and bounding box predictions
        class_scores = self.classifier(x)  # Classification predictions
        bbox_deltas = self.bbox_regressor(x)  # Bounding box offsets
        
        return class_scores, bbox_deltas

In [9]:
def rpn_loss(objectness_pred, objectness_true, bbox_pred, bbox_true):
    obj_loss = tf.keras.losses.BinaryCrossentropy()(objectness_true, objectness_pred)
    bbox_loss = tf.keras.losses.Huber()(bbox_true, bbox_pred)
    return obj_loss + bbox_loss


In [10]:
def rpn_loss(objectness_pred, objectness_true, bbox_pred, bbox_true):
    obj_loss = tf.keras.losses.BinaryCrossentropy()(objectness_true, objectness_pred)
    bbox_loss = tf.keras.losses.Huber()(bbox_true, bbox_pred)
    return obj_loss + bbox_loss


In [11]:
# Example feature map tensor
feature_maps = tf.random.normal([8, 32, 32, 1024])  # [batch_size=8, height=32, width=32, channels=1024]

# Example normalized ROIs
rois = tf.random.uniform([10, 4], minval=0.0, maxval=1.0)  # Generate 10 random ROIs normalized in [0,1]

# Call roi_align
roi_features = roi_align(feature_maps, rois)
print("ROI Features Shape:", roi_features.shape)  # Expected: [10, 7, 7, 1024]


ROI Features Shape: (10, 7, 7, 1024)


In [12]:
def create_backbone():
    """
    Create the backbone feature extraction model using ResNet50 as an example.
    Set input shape to match the actual data dimensions.
    """
    # Use the actual image size (512, 512) as input
    base_model = ResNet50(include_top=False, weights="imagenet", input_shape=(1024, 1024, 3))

    # Optionally freeze some layers for transfer learning
    for layer in base_model.layers:
        layer.trainable = False  # Freeze the layers for transfer learning; set to True for fine-tuning
    
    return base_model


In [13]:
# Define the model globally
def create_model():
    # Example: A simple CNN model
    base_model = tf.keras.applications.ResNet50(
        include_top=False, input_shape=(512, 512, 3)
    )
    x = tf.keras.layers.GlobalAveragePooling2D()(base_model.output)
    x = tf.keras.layers.Dense(128, activation="relu")(x)
    outputs = tf.keras.layers.Dense(2, activation="softmax")(x)
    model = tf.keras.Model(inputs=base_model.input, outputs=outputs)

    # Compile the model to ensure compatibility
    model.compile(optimizer=tf.keras.optimizers.Adam(), loss="sparse_categorical_crossentropy", metrics=["accuracy"])
    return model

In [14]:
model = create_model()
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4)

@tf.function
def train_step(images, bboxes, labels):
    """
    A single training step.

    Args:
        images: Input images batch.
        bboxes: Bounding boxes.
        labels: Classification labels.

    Returns:
        Loss value for backpropagation.
    """
    with tf.GradientTape() as tape:
        # Resize images if necessary
        images_resized = resize_images(images)  # Resize images as shown before

        # Generate feature maps from the backbone
        feature_maps = backbone_model(images_resized)
        rois = roi_align(feature_maps, bboxes)  # Extract ROIs
        
        # Pass ROIs through detection head
        roi_features = tf.image.resize(rois, [7, 7])  # Resize for compatibility
        class_scores, bbox_deltas = head(roi_features)  # Pass through DetectionHead
        
        # Calculate classification loss
        classification_loss = tf.keras.losses.sparse_categorical_crossentropy(labels, class_scores)
        
        # Calculate bounding box loss
        bbox_loss_fn = tf.keras.losses.MeanAbsoluteError()
        bbox_loss = bbox_loss_fn(bboxes, bbox_deltas)  # Mean absolute error computation

        # Combine losses
        total_loss = tf.reduce_mean(classification_loss + bbox_loss)

    # Compute gradients
    gradients = tape.gradient(total_loss, head.trainable_variables + backbone_model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, head.trainable_variables + backbone_model.trainable_variables))

    return total_loss






In [15]:
train_file = r"C:\Users\ACER\Desktop\IS\Obj_recognition_Version_1\tfrecords\train.record"
valid_file = r"C:\Users\ACER\Desktop\IS\Obj_recognition_Version_1\tfrecords\valid.record"

if not os.path.exists(train_file):
    raise FileNotFoundError(f"File not found at {train_file}")

if not os.path.exists(valid_file):
    raise FileNotFoundError(f"File not found at {valid_file}")

train_dataset = load_tfrecord_dataset(train_file, batch_size=48)
print("Train dataset loaded successfully.")
valid_dataset = load_tfrecord_dataset(valid_file, batch_size=48)
print("Train dataset loaded successfully.")

Train dataset loaded successfully.
Train dataset loaded successfully.


In [16]:
for images, bboxes, labels in train_dataset.take(1):
    print("Images shape:", images.shape)
    print("Bounding boxes:", bboxes)
    print("Labels:", labels)


Images shape: (48, 512, 512, 3)
Bounding boxes: tf.Tensor(
[[[0.733      0.70266664 0.956      0.8333333 ]]

 [[0.364      0.35466668 0.542      0.41933334]]

 [[0.304      0.72866666 0.481      0.824     ]]

 [[0.325      0.78066665 0.864      0.91      ]]

 [[0.58       0.47733334 0.829      0.5653333 ]]

 [[0.30927834 0.44015443 0.47938144 0.58301157]]

 [[0.825      0.16066666 1.         0.53      ]]

 [[0.54       0.62133336 1.         0.8326667 ]]

 [[0.882      0.012      0.95       0.254     ]]

 [[0.354      0.34266666 0.515      0.424     ]]

 [[0.576      0.29733333 0.843      0.42466667]]

 [[0.195      0.706      0.309      0.78533334]]

 [[0.512      0.524      0.903      0.748     ]]

 [[0.40729168 0.4484375  0.48958334 0.478125  ]]

 [[0.48       0.49466667 0.52       0.546     ]]

 [[0.563      0.5553333  0.646      0.62866664]]

 [[0.536      0.17866667 0.767      0.25266665]]

 [[0.299      0.022      0.577      0.16333333]]

 [[0.595      0.32       0.822      0.437

In [17]:
backbone_model = create_backbone()

# Create detection head
num_classes = 19
head = DetectionHead(num_classes)

# Optimizer
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)

In [20]:
# Set your weights save directory
weights_dir = "Saved_weights"
os.makedirs(weights_dir, exist_ok=True)

# Training Parameters
epochs = 2
batch_size= 48
num_samples = sum(1 for _ in train_dataset.unbatch())
steps_per_epoch = math.ceil(num_samples / batch_size)

# Custom training loop
for epoch in range(epochs):
    print(f"Epoch {epoch + 1}/{epochs}")
    epoch_loss = 0  # Track average loss for the epoch

    # Iterate over the training data
    for step, (images, bboxes, labels) in enumerate(train_dataset.take(steps_per_epoch)):
        # Run the custom training step
        loss = train_step(images, bboxes, labels)
        epoch_loss += loss

        if step % 10 == 0:  # Log progress
            print(f"Step {step}/{steps_per_epoch}, Loss: {loss:.4f}")
            

    # Average the epoch loss
    epoch_loss /= steps_per_epoch
    print(f"Epoch {epoch + 1} Average Loss: {epoch_loss:.4f}")

    # Save weights at the end of each epoch
    weights_path = os.path.join(weights_dir, f"epoch_{epoch + 1}.h5")
    model.save_weights(weights_path)
    print(f"Saved weights to: {weights_path}")

Epoch 1/2
Step 0/209, Loss: 169.4749
Step 10/209, Loss: 115.4429
Step 20/209, Loss: 43.8944
Step 30/209, Loss: 15.1025
Step 40/209, Loss: 15.8117
Step 50/209, Loss: 5.6910
Step 60/209, Loss: 3.5114
Step 70/209, Loss: 3.4824
Step 80/209, Loss: 3.5026
Step 90/209, Loss: 3.4589
Step 100/209, Loss: 3.4352
Step 110/209, Loss: 3.4245
Step 120/209, Loss: 3.3749
Step 130/209, Loss: 3.4145
Step 140/209, Loss: 3.3783
Step 150/209, Loss: 3.3931
Step 160/209, Loss: 3.3462
Step 170/209, Loss: 3.3554
Step 180/209, Loss: 3.3351
Step 190/209, Loss: 3.3501
Step 200/209, Loss: 3.2730
Epoch 1 Average Loss: 17.0975


ValueError: The filename must end in `.weights.h5`. Received: filepath=Saved_weights\epoch_1.h5

In [25]:
model_dir = "Saved_model"
os.makedirs(model_dir, exist_ok=True)

model_path = os.path.join(model_dir, "trained_model_updated.h5")
model.save(model_path)
print(f"Saved model to: {model_path}")



Saved model to: Saved_model\trained_model_updated.h5


In [None]:
model_path = "Saved_model/trained_model.h5"
model = tf.keras.models.load_model(model_path, compile=False)  # Avoid issues with compiled metrics
print(f"Loaded model from: {model_path}")

# Recompile the model to ensure optimizer and metrics are set
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
    loss='categorical_crossentropy',  # Update this to your specific loss function
    metrics=['accuracy']  # Use the metrics relevant to your task
)

# Resume training
for epoch in range(epochs):
    print(f"Epoch {epoch + 1}/{epochs}")
    epoch_loss = 0  # Track average loss for the epoch

    # Iterate over the training data
    for step, (images, bboxes, labels) in enumerate(train_dataset.take(steps_per_epoch)):
        # Run the custom training step
        loss = train_step(images, bboxes, labels)
        epoch_loss += loss

        if step % 10 == 0:  # Log progress
            print(f"Step {step}/{steps_per_epoch}, Loss: {loss:.4f}")
            

    # Average the epoch loss
    epoch_loss /= steps_per_epoch
    print(f"Epoch {epoch + 1} Average Loss: {epoch_loss:.4f}")

    updated_model_path = "Saved_model/updated_trained_model.h5"
    model.save(updated_model_path)
    print(f"Updated model saved to: {updated_model_path}")

Loaded model from: Saved_model/trained_model.h5
Epoch 1/1
Step 0/209, Loss: 3.3016
Step 10/209, Loss: 3.2890
Step 20/209, Loss: 3.2741
Step 30/209, Loss: 3.2544
Step 40/209, Loss: 3.2132
Step 50/209, Loss: 3.2307
Step 60/209, Loss: 3.2408
Step 70/209, Loss: 3.2175
Step 80/209, Loss: 3.1929
Step 90/209, Loss: 3.1904
Step 100/209, Loss: 3.1571
Step 110/209, Loss: 3.1197
Step 120/209, Loss: 3.1247
Step 130/209, Loss: 3.1620


In [None]:
def evaluate_model(dataset):
    for images, bboxes, labels in dataset:
        class_scores, bbox_deltas = predict(images)  # Replace with the prediction function
        # Add evaluation logic such as IoU calculation or mAP


In [None]:
def predict(image):
    feature_maps = feature_extractor(image)
    rpn_scores, rpn_deltas = build_rpn(feature_maps)
    rois = tf.random.uniform((1, 10, 4), 0, 1)  # Replace with actual proposals
    roi_features = roi_align(feature_maps, rois, (7, 7))
    class_scores, bbox_deltas = build_head(roi_features, num_classes=2)
    return class_scores, bbox_deltas

test_image = next(iter(valid_dataset))[0]  # Example test image
class_scores, bbox_deltas = predict(test_image)
