In [None]:
import matplotlib.pyplot as plt
import numpy as np
import os
import random
import tensorflow as tf
from pathlib import Path
from keras import applications
from keras import layers
from keras import losses
from keras import ops
from keras import optimizers
from keras import metrics
from keras import Model
from keras.applications import resnet


target_shape = (200, 200)

In [None]:
# Import necessary libraries
from pathlib import Path

# Define cache directory
cache_dir = Path(Path.home()) / ".keras"

# Check if the cache directory exists; if not, create it
if not cache_dir.exists():
    cache_dir.mkdir(parents=True, exist_ok=True)  # Create the directory if it doesn't exist

# Define paths for anchor, positive, and negative images
anchor_images_path = cache_dir / "middle"
positive_images_path = cache_dir / "right"
negative_images_path = cache_dir / "left"

# Check if directories exist; if not, create them
for path in [anchor_images_path, positive_images_path, negative_images_path]:
    if not path.exists():
        path.mkdir(parents=True, exist_ok=True)  # Create the directory if it doesn't exist

# Print the paths to confirm they are set up correctly
print("Anchor Images Path:", anchor_images_path)
print("Positive Images Path:", positive_images_path)
print("Negative Images Path:", negative_images_path)

In [None]:
cache_dir = Path(Path.home()) / "autodl-tmp"
anchor_images_path = cache_dir / "anchor_18k"
positive_images_path = cache_dir / "positive_18K"
negative_images_path = cache_dir / "negative_18k"


In [None]:
!unzip -oq anchor_18k.zip -d $cache_dir
!unzip -oq positive_18K.zip -d $cache_dir
!unzip -oq negative_18k.zip -d $cache_dir

In [None]:
import tensorflow as tf

def load_image(path):
    # Load and preprocess image
    image = tf.io.read_file(path)
    image = tf.image.decode_jpeg(image, channels=3)  # Adjust as needed
    image = tf.image.resize(image, [224, 224])  # Resize to the desired dimensions
    return image

In [None]:
def preprocess_image(filename):
    """
    Load the specified file as a JPEG image, preprocess it and
    resize it to the target shape.
    """

    image_string = tf.io.read_file(filename)
    image = tf.image.decode_jpeg(image_string, channels=3)
    image = tf.image.convert_image_dtype(image, tf.float32)
    image = tf.image.resize(image, target_shape)
    return image


def preprocess_triplets(anchor, positive, negative):
    """
    Given the filenames corresponding to the three images, load and
    preprocess them.
    """

    return (
        preprocess_image(anchor),
        preprocess_image(positive),
        preprocess_image(negative),
    )



In [None]:
# We need to make sure both the anchor and positive images are loaded in
# sorted order so we can match them together.
anchor_images = sorted(
    [str(anchor_images_path / f) for f in os.listdir(anchor_images_path)]
)

positive_images = sorted(
    [str(positive_images_path / f) for f in os.listdir(positive_images_path)]
)


negative_images = sorted(
    [str(negative_images_path / f) for f in os.listdir(negative_images_path)]
)




image_count = len(anchor_images)

anchor_dataset = tf.data.Dataset.from_tensor_slices(anchor_images)
positive_dataset = tf.data.Dataset.from_tensor_slices(positive_images)
negative_dataset = tf.data.Dataset.from_tensor_slices(negative_images)

# To generate the list of negative images, let's randomize the list of
# available images and concatenate them together.

dataset = tf.data.Dataset.zip((anchor_dataset, positive_dataset, negative_dataset))
dataset = dataset.shuffle(buffer_size=1024)
dataset = dataset.map(preprocess_triplets)


# Remaining dataset for training
train_dataset = dataset.enumerate()  # Add indices to the dataset
train_dataset = train_dataset.map(lambda i, data: data)  # Remove the indices, keep only data
train_dataset = train_dataset.shuffle(buffer_size=1000, seed=42).batch(128, drop_remainder=False)
train_dataset = train_dataset.prefetch(tf.data.AUTOTUNE)

In [None]:
base_cnn = resnet.ResNet50(
    weights="imagenet", input_shape=target_shape + (3,), include_top=False
)
#print(base_cnn.layers)
flatten = layers.Flatten()(base_cnn.output)
dense1 = layers.Dense(512, activation="relu")(flatten)
dense1 = layers.BatchNormalization()(dense1)
dense2 = layers.Dense(256, activation="relu")(dense1)
dense2 = layers.BatchNormalization()(dense2)
output = layers.Dense(256)(dense2)

embedding = Model(base_cnn.input, output, name="Embedding")

trainable = False
#adjust the number of trainable layers
for layer in base_cnn.layers:
    if layer.name == "conv5_block1_out":
        trainable = True
    layer.trainable = trainable

In [None]:
class DistanceLayer(layers.Layer):
    """
    This layer is responsible for computing the distance between the anchor
    embedding and the positive embedding, and the anchor embedding and the
    negative embedding.
    """

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def call(self, anchor, positive, negative):
        ap_distance = ops.sum(tf.square(anchor - positive), -1)
        an_distance = ops.sum(tf.square(anchor - negative), -1)
        return (ap_distance, an_distance)


anchor_input = layers.Input(name="anchor", shape=target_shape + (3,))
positive_input = layers.Input(name="positive", shape=target_shape + (3,))
negative_input = layers.Input(name="negative", shape=target_shape + (3,))

distances = DistanceLayer()(
    embedding(resnet.preprocess_input(anchor_input)),
    embedding(resnet.preprocess_input(positive_input)),
    embedding(resnet.preprocess_input(negative_input)),
)

siamese_network = Model(
    inputs=[anchor_input, positive_input, negative_input], outputs=distances
)

In [None]:
class SiameseModel(Model):
    """The Siamese Network model with a custom training and testing loops.

    Computes the triplet loss using the three embeddings produced by the
    Siamese Network.

    The triplet loss is defined as:
       L(A, P, N) = max(‖f(A) - f(P)‖² - ‖f(A) - f(N)‖² + margin, 0)
    """

    def __init__(self, siamese_network, margin=0.5):
        super().__init__()
        self.siamese_network = siamese_network
        self.margin = margin
        self.loss_tracker = metrics.Mean(name="loss")

    def call(self, inputs):
        return self.siamese_network(inputs)

    def train_step(self, data):
        # GradientTape is a context manager that records every operation that
        # you do inside. We are using it here to compute the loss so we can get
        # the gradients and apply them using the optimizer specified in
        # `compile()`.
        with tf.GradientTape() as tape:
            loss = self._compute_loss(data)

        # Storing the gradients of the loss function with respect to the
        # weights/parameters.
        gradients = tape.gradient(loss, self.siamese_network.trainable_weights)

        # Applying the gradients on the model using the specified optimizer
        self.optimizer.apply_gradients(
            zip(gradients, self.siamese_network.trainable_weights)
        )

        # Let's update and return the training loss metric.
        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result()}

    def test_step(self, data):
        loss = self._compute_loss(data)

        # Let's update and return the loss metric.
        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result()}

    def _compute_loss(self, data):
        # The output of the network is a tuple containing the distances
        # between the anchor and the positive example, and the anchor and
        # the negative example.
        ap_distance, an_distance = self.siamese_network(data)

        # Computing the Triplet Loss by subtracting both distances and
        # making sure we don't get a negative value.
        loss = ap_distance - an_distance
        loss = tf.maximum(loss + self.margin, 0.0)
        return loss

    @property
    def metrics(self):
        # We need to list our metrics here so the `reset_states()` can be
        # called automatically.
        return [self.loss_tracker]

    # Add get_config and from_config for serialization
    def get_config(self):
        config = super().get_config()
        config.update({
            "siamese_network": self.siamese_network,
            "margin": self.margin,
        })
        return config

    @classmethod
    def from_config(cls, config):
        siamese_network = config.pop("siamese_network")  # Extract custom objects
        return cls(siamese_network=siamese_network, **config)

In [None]:
siamese_model = SiameseModel(siamese_network)
siamese_model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001))

# Train the model without validation dataset
#adjust the number of epoch
siamese_model.fit(train_dataset, epochs=20)


In [None]:
siamese_model.build(input_shape=(None, 200, 200, 3))  # 替换为你输入数据的形状
print(siamese_model)
siamese_model.summary()

# Save the model weights with the correct filename
siamese_model.save_weights('siamese_18k_epoch20_trainable1_weights.weights.h5')