In [4]:
# Import libraries
import tensorflow as tf
from tensorflow_examples.models.pix2pix import pix2pix
import numpy as np
import os
import json

# Load Fashionpedia dataset from Hugging Face Datasets
import datasets
dataset = datasets.load_dataset("fashionpedia", data_dir = "../../data/fashionpedia/fashionpedia-api")

# Define some constants
NUM_CATEGORIES = 27 # Number of main apparel categories
NUM_PARTS = 19 # Number of apparel parts
NUM_ATTRIBUTES = 294 # Number of fine-grained attributes
IMAGE_SIZE = (1024, 1024) # Size of input images
BATCH_SIZE = 4 # Batch size for training and evaluation
EPOCHS = 50 # Number of epochs for training
LEARNING_RATE = 0.001 # Learning rate for optimizer
CHECKPOINT_DIR = "checkpoints" # Directory to save model checkpoints

FileNotFoundError: Couldn't find a dataset script at /mnt/e/clothes/clothes-vision/sandbox/01 introduction/fashionpedia/fashionpedia.py or any data file in the same directory. Couldn't find 'fashionpedia' on the Hugging Face Hub either: FileNotFoundError: Dataset 'fashionpedia' doesn't exist on the Hub. If the repo is private or gated, make sure to log in with `huggingface-cli login`.

In [None]:

# Define a function to preprocess the images and masks
def preprocess_data(example):
  # Decode image from bytes and resize to IMAGE_SIZE
  image = tf.image.decode_jpeg(example["image"])
  image = tf.image.resize(image, IMAGE_SIZE)
  image = tf.cast(image, tf.float32) / 255.0 # Normalize to [0, 1] range

  # Decode mask from bytes and resize to IMAGE_SIZE
  mask = tf.io.decode_raw(example["segmentation_mask"], tf.uint8)
  mask = tf.reshape(mask, example["segmentation_mask_shape"])
  mask = tf.image.resize(mask, IMAGE_SIZE, method="nearest")
  mask = tf.cast(mask, tf.int32)

  # Get attributes for each mask
  attributes = example["attributes"]

  return image, mask, attributes

# Create input pipelines for training and validation splits
train_dataset = dataset["train"].map(preprocess_data)
train_dataset = train_dataset.shuffle(1000).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

val_dataset = dataset["validation"].map(preprocess_data)
val_dataset = val_dataset.batch(BATCH_SIZE)


In [None]:


# Define the Attribute-Mask R-CNN model
def get_attribute_mask_rcnn(num_categories, num_parts, num_attributes):

  # Create a ResNet-50 backbone with FPN feature extractor
  backbone = tf.keras.applications.ResNet50(include_top=False, weights="imagenet")
  backbone.trainable = False # Freeze backbone weights
  inputs = tf.keras.Input(shape=[None, None, 3])
  x = backbone(inputs)
  x = tf.keras.layers.Conv2D(256, (1,1))(x)
  
  # Create a feature pyramid network (FPN) layer
  fpn = pix2pix.FeaturePyramidNetwork([64,128,256], num_filters=256)

  # Create a region proposal network (RPN) layer
  rpn = pix2pix.RPNLayer(num_anchors=9)

  # Create a ROI align layer
  roi_aligner = pix2pix.PyramidROIAlign([14,14])

  # Create a detection head layer
  detection_head = pix2pix.DetectionHead(num_classes=num_categories+num_parts+1)

  # Create a mask head layer
  mask_head = pix2pix.MaskHead(num_classes=num_categories+num_parts+1)

  # Create an attribute head layer
  attribute_head = pix2pix.AttributeHead(num_attributes=num_attributes)

  # Connect the layers to form the model
  features = fpn(x)
  rpn_class, rpn_bbox, anchors = rpn(features)
  proposals = pix2pix.apply_box_deltas(anchors, rpn_bbox)
  proposals = tf.clip_by_value(proposals, 0, 1)
  rois = pix2pix.batch_slice([proposals, rpn_class], lambda x,y: pix2pix.proposal_filter(x,y), 4)
  roi_features = roi_aligner([rois]+features)
  detections = detection_head(roi_features)
  masks = mask_head(roi_features)
  attributes = attribute_head(roi_features)

  # Define the model inputs and outputs
  model = tf.keras.Model(inputs=inputs, outputs=[detections, masks, attributes])

  return model

# Create the model
model = get_attribute_mask_rcnn(NUM_CATEGORIES, NUM_PARTS, NUM_ATTRIBUTES)

# Define the loss functions
def rpn_class_loss(y_true, y_pred):
  # y_true and y_pred are (batch_size, num_anchors) tensors
  # Compute binary cross-entropy loss for foreground/background classification
  loss = tf.keras.losses.binary_crossentropy(y_true, y_pred)
  # Ignore anchors that are not positive or negative (y_true=-1)
  anchor_state = tf.cast(tf.not_equal(y_true, -1), tf.float32)
  loss = tf.reduce_sum(loss * anchor_state) / (tf.reduce_sum(anchor_state) + 1e-3)
  return loss

def rpn_bbox_loss(target_bbox, rpn_class, pred_bbox):
  # target_bbox and pred_bbox are (batch_size, num_anchors, (dy, dx, log(dh), log(dw))) tensors
  # rpn_class is (batch_size, num_anchors) tensor
  # Compute smooth L1 loss for bounding box regression
  diff = tf.abs(target_bbox - pred_bbox)
  less_than_one = tf.cast(tf.less(diff, 1.0), tf.float32)
  loss = (less_than_one * 0.5 * diff**2) + (1 - less_than_one) * (diff - 0.5)
  # Only consider positive anchors (rpn_class=1)
  anchor_state = tf.cast(tf.equal(rpn_class, 1), tf.float32)
  loss = tf.reduce_sum(loss * anchor_state) / (tf.reduce_sum(anchor_state) + 1e-3)
  return loss

def detection_loss(target_class_ids, pred_class_logits, target_deltas, pred_deltas):
    # target_class_ids and pred_class_logits are (batch_size, num_rois) tensors
    # target_deltas and pred_deltas are (batch_size, num_rois, (dy, dx, log(dh), log(dw))) tensors
    # Compute losses for the detection head
    class_loss = tf.keras.losses.sparse_categorical_crossentropy(target_class_ids, pred_class_logits)
    class_loss = tf.reduce_mean(class_loss)
    
    target_class_ids = tf.cast(target_class_ids > 0 ,tf.float32)
    
    diff = tf.abs(target_deltas - pred_deltas)
    

    less_than_one = tf.cast(tf.less(diff,1.0),tf.float32)

    bbox_loss=(less_than_one*0.5*diff**2)+(1-less_than_one)*(diff-0.5)

    bbox_loss=tf.reduce_sum(bbox_loss*target_class_ids)/tf.maximum(tf.reduce_sum(target_class_ids),1)

    return class_loss,bbox_loss

def mask_loss(target_masks,pred_masks,target_class_ids):

    #target_masks and pred_masks are(batch_size,num_rois,height,width) tensors
    #target_class_ids is(batch_size,num_rois) tensor
    #Compute binary cross-entropy loss for mask prediction

    loss=tf.keras.losses.binary_crossentropy(target_masks,pred_masks)

    #Only consider positive ROIs(target_class_ids>0)

    target_class_ids=tf.reshape(target_class_ids,[tf.shape(loss)[0],tf.shape(loss)[1],1])

    loss=tf.reduce_sum(loss*target_class_ids)/tf.maximum(tf.reduce_sum(target_class_ids),1)

    return loss

def attribute_loss(target_attributes,pred_attributes,target_mask_ids):

    #target_attributes and pred_attributes are(batch_size,num_rois,num_attributes) tensors

    #target_mask_ids is(batch_size,num_rois) tensor

    #Compute binary cross-entropy loss for attribute prediction

    loss=tf.keras.losses.binary_crossentropy(target_attributes,pred_attributes)

    #Only consider positive masks(target_mask_ids>0)

    target_mask_ids=tf.reshape(target_mask_ids,[tf.shape(loss)[0],tf.shape(loss)[1],1])

    loss=tf.reduce_sum(loss*target_mask_ids)/tf.maximum(tf.reduce_sum(target_mask_ids),1)

    return loss

#Define the optimizer and learning rate schedule
# Define the optimizer and learning rate schedule
optimizer = tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE)

# Define a function to compute the total loss
def compute_loss(model, inputs):
  # Unpack the inputs
  images, masks, attributes = inputs

  # Get the model outputs
  detections, pred_masks, pred_attributes = model(images)

  # Get the RPN outputs
  rpn_class_logits, rpn_class, rpn_bbox = model.layers[2].output
  anchors = model.layers[2].anchors

  # Get the detection outputs
  pred_class_logits, pred_class_ids, pred_deltas = detections

  # Generate targets for RPN
  rpn_match, rpn_bbox = pix2pix.build_rpn_targets(anchors, masks)
  
  # Generate targets for detection head
  target_class_ids, target_deltas, target_masks = pix2pix.build_detection_targets(detections, masks)

  # Generate targets for attribute head
  target_attributes, target_mask_ids = pix2pix.build_attribute_targets(attributes, masks)

  # Compute losses for RPN
  rpn_class_loss_value = rpn_class_loss(rpn_match, rpn_class)
  rpn_bbox_loss_value = rpn_bbox_loss(rpn_bbox, rpn_match, rpn_class_logits)

  # Compute losses for detection head
  detection_class_loss_value, detection_bbox_loss_value = detection_loss(target_class_ids, pred_class_logits, target_deltas, pred_deltas)

  # Compute loss for mask head
  mask_loss_value = mask_loss(target_masks, pred_masks, target_class_ids)

  # Compute loss for attribute head
  attribute_loss_value = attribute_loss(target_attributes, pred_attributes, target_mask_ids)

  # Compute total loss
  total_loss = rpn_class_loss_value + rpn_bbox_loss_value + detection_class_loss_value + detection_bbox_loss_value + mask_loss_value + attribute_loss_value

  return total_loss

# Define a function to perform a training step
@tf.function
def train_step(model, inputs):
  with tf.GradientTape() as tape:
    # Compute the loss value for this batch.
    loss_value = compute_loss(model, inputs)

    # Get the gradients of the loss with respect to the model's trainable variables.
    gradients = tape.gradient(loss_value, model.trainable_variables)

    # Update the weights of the model.
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    return loss_value

# Define a function to perform an evaluation step
@tf.function
def val_step(model, inputs):
  # Compute the loss value for this batch.
  loss_value = compute_loss(model, inputs)
  
  return loss_value

# Define a checkpoint manager to save the best model
checkpoint = tf.train.Checkpoint(model=model)
manager = tf.train.CheckpointManager(checkpoint, CHECKPOINT_DIR , max_to_keep=3)

# Define some variables to track the training progress
best_val_loss = np.inf # The best validation loss so far
patience = 0 # The number of epochs without improvement in validation loss
stop_training = False # A flag to stop the training loop

# Define metrics to track the train and val losses
train_loss = tf.keras.metrics.Mean(name="train_loss")
val_loss = tf.keras.metrics.Mean(name="val_loss")
# Start the training loop
for epoch in range(EPOCHS):
  
  print(f"Epoch {epoch+1}/{EPOCHS}")
  
  # Reset the metrics at the start of each epoch
  train_loss.reset_states()
  val_loss.reset_states()
  
  # Loop over the batches of the training dataset.
  for inputs in train_dataset:
    
    # Perform a training step and update the train loss metric.
    train_loss.update_state(train_step(model, inputs))
    
    # Display the current train loss
    print(f"\rTrain Loss: {train_loss.result():.4f}", end="")
    
  print()
  
  # Loop over the batches of the validation dataset.
  for inputs in val_dataset:
    
    # Perform an evaluation step and update the val loss metric.
    val_loss.update_state(val_step(model, inputs))
    
    # Display the current val loss
    print(f"\rVal Loss: {val_loss.result():.4f}", end="")
    
  print()
  
  # Compare the current val loss with the best val loss and update accordingly
  if val_loss.result() < best_val_loss:
    
    print(f"Validation loss improved from {best_val_loss:.4f} to {val_loss.result():.4f}. Saving checkpoint.")
    best_val_loss = val_loss.result()
    patience = 0
    manager.save()
    
  else:
    
    print(f"Validation loss did not improve from {best_val_loss:.4f}.")
    patience += 1
    if patience > 5:
      print("Early stopping.")
      stop_training = True
      break
    
  if stop_training:
    break
