In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Input
from tensorflow.keras.models import Model
from tensorflow_addons.losses import TripletSemiHardLoss
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import Callback
from sklearn.metrics import accuracy_score
import numpy as np


In [None]:
class DynamicUnfreezeCallback(Callback):
    def __init__(self, model, initial_unfreeze_layers, step_unfreeze_layers, threshold, patience):
        super(DynamicUnfreezeCallback, self).__init__()
        self.model = model
        self.initial_unfreeze_layers = initial_unfreeze_layers
        self.step_unfreeze_layers = step_unfreeze_layers
        self.threshold = threshold
        self.patience = patience
        self.best_val_accuracy = 0.0
        self.unfreeze_count = initial_unfreeze_layers
        self.wait = 0

    def on_epoch_end(self, epoch, logs=None):
        val_accuracy = logs.get('val_accuracy')
        
        if val_accuracy > self.best_val_accuracy:
            self.best_val_accuracy = val_accuracy
            self.wait = 0
        else:
            self.wait += 1
        
        if self.best_val_accuracy >= self.threshold and self.wait >= self.patience:
            self.wait = 0
            self.unfreeze_count += self.step_unfreeze_layers
            print(f"Unfreezing additional {self.step_unfreeze_layers} layers.")
            self.unfreeze_layers(self.unfreeze_count)
            self.model.compile(optimizer=Adam(1e-4), loss=TripletSemiHardLoss())

    def unfreeze_layers(self, num_layers):
        for layer in self.model.layers[-num_layers:]:
            layer.trainable = True


In [None]:
def create_model():
    base_model = tf.keras.applications.InceptionResNetV2(input_shape=(160, 160, 3), include_top=False, pooling='avg')
    
    x = base_model.output
    x = tf.keras.layers.Dense(512, activation='relu')(x)
    x = tf.keras.layers.Lambda(lambda x: tf.math.l2_normalize(x, axis=1))(x)
    
    model = Model(inputs=base_model.input, outputs=x)
    
    # Freeze all layers initially
    for layer in base_model.layers:
        layer.trainable = False
    
    # Unfreeze the last `initial_unfreeze_layers` layers
    initial_unfreeze_layers = 10
    for layer in model.layers[-initial_unfreeze_layers:]:
        layer.trainable = True
    
    return model

model = create_model()
model.compile(optimizer=Adam(1e-4), loss=TripletSemiHardLoss())


In [None]:
# Dummy data generators for demonstration
def generate_triplet(batch_size=32):
    while True:
        # Generate dummy data
        anchor = np.random.rand(batch_size, 160, 160, 3)
        positive = np.random.rand(batch_size, 160, 160, 3)
        negative = np.random.rand(batch_size, 160, 160, 3)
        yield [anchor, positive, negative], np.ones(batch_size)

train_generator = generate_triplet()
val_generator = generate_triplet()


In [None]:
initial_unfreeze_layers = 10
step_unfreeze_layers = 5
accuracy_threshold = 0.90
patience = 3

callbacks = [DynamicUnfreezeCallback(model, initial_unfreeze_layers, step_unfreeze_layers, accuracy_threshold, patience)]

model.fit(train_generator, 
          epochs=50, 
          steps_per_epoch=100, 
          validation_data=val_generator, 
          validation_steps=10, 
          callbacks=callbacks)


In [None]:
import numpy as np
import tensorflow as tf

def generate_triplet(data, labels, batch_size=32):
    """
    Generate batches of triplet samples (anchor, positive, negative).
    
    Args:
    data: A numpy array of images.
    labels: A numpy array of labels corresponding to the images.
    batch_size: Number of triplets to generate in each batch.
    
    Yields:
    A tuple (inputs, targets), where:
    - inputs is a list containing [anchor, positive, negative] images.
    - targets is an array of ones, used for the triplet loss function.
    """
    while True:
        anchor_images = []
        positive_images = []
        negative_images = []
        
        for _ in range(batch_size):
            # Randomly select an anchor-positive pair
            idx = np.random.randint(0, len(data))
            anchor_image = data[idx]
            anchor_label = labels[idx]
            
            # Find positive example (same class as anchor)
            pos_idx = np.random.choice(np.where(labels == anchor_label)[0])
            positive_image = data[pos_idx]
            
            # Find negative example (different class from anchor)
            neg_idx = np.random.choice(np.where(labels != anchor_label)[0])
            negative_image = data[neg_idx]
            
            # Append the images to their respective lists
            anchor_images.append(anchor_image)
            positive_images.append(positive_image)
            negative_images.append(negative_image)
        
        # Convert lists to numpy arrays
        anchor_images = np.array(anchor_images)
        positive_images = np.array(positive_images)
        negative_images = np.array(negative_images)
        
        # Yield a batch of triplet samples and dummy targets
        yield [anchor_images, positive_images, negative_images], np.ones(batch_size)

# Example usage
# Assuming `data` is an array of images and `labels` is an array of corresponding labels
# data = np.array([...]) 
# labels = np.array([...])

# Create a generator
# train_generator = generate_triplet(data, labels, batch_size=32)


## New RESPONSE

In [None]:
import os
import numpy as np
from PIL import Image
import tensorflow as tf
from tensorflow.keras.preprocessing.image import img_to_array
import random


In [None]:
def load_image(image_path, target_size):
    """
    Load an image from a file and resize it to the target size.
    
    Args:
    image_path (str): Path to the image file.
    target_size (tuple): Desired image size (width, height).
    
    Returns:
    np.array: The loaded and resized image.
    """
    image = Image.open(image_path)
    image = image.resize(target_size)
    image = img_to_array(image)
    return image


In [None]:
class TripletGenerator(tf.keras.utils.Sequence):
    def __init__(self, main_dir, target_size=(160, 160), batch_size=32):
        """
        Initialize the TripletGenerator.
        
        Args:
        main_dir (str): Path to the main directory containing class subdirectories.
        target_size (tuple): Desired image size (width, height).
        batch_size (int): Number of triplets per batch.
        """
        self.main_dir = main_dir
        self.target_size = target_size
        self.batch_size = batch_size
        self.class_dirs = [os.path.join(main_dir, d) for d in os.listdir(main_dir) if os.path.isdir(os.path.join(main_dir, d))]
        self.class_images = {d: [os.path.join(d, img) for img in os.listdir(d) if img.endswith(('.png', '.jpg', '.jpeg'))] for d in self.class_dirs}
        self.on_epoch_end()

    def __len__(self):
        return int(np.floor(len(self.class_dirs) * self.batch_size / 3))

    def on_epoch_end(self):
        pass

    def __getitem__(self, index):
        anchors = []
        positives = []
        negatives = []
        
        for _ in range(self.batch_size):
            anchor_class = random.choice(list(self.class_images.keys()))
            positive_class = anchor_class
            negative_class = random.choice(list(self.class_images.keys()))

            while anchor_class == negative_class:
                negative_class = random.choice(list(self.class_images.keys()))

            anchor_img = random.choice(self.class_images[anchor_class])
            positive_img = random.choice(self.class_images[positive_class])
            negative_img = random.choice(self.class_images[negative_class])

            anchor_img = load_image(anchor_img, self.target_size)
            positive_img = load_image(positive_img, self.target_size)
            negative_img = load_image(negative_img, self.target_size)

            anchors.append(anchor_img)
            positives.append(positive_img)
            negatives.append(negative_img)
        
        return [np.array(anchors), np.array(positives), np.array(negatives)], np.ones(self.batch_size)


In [None]:
# Parameters
main_dir = "path_to_main_directory"  # Replace with the actual path to your main directory
target_size = (160, 160)
batch_size = 32

# Create the triplet generator
train_generator = TripletGenerator(main_dir=main_dir, target_size=target_size, batch_size=batch_size)

# Example model
model = create_model()  # Assuming create_model() is defined as in the previous example
model.compile(optimizer=Adam(1e-4), loss=TripletSemiHardLoss())

# Train the model
model.fit(train_generator, epochs=50, steps_per_epoch=len(train_generator), callbacks=callbacks)
