In [3]:
# import the necessary packages
import os
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input
from tensorflow.keras.layers import Conv2D
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Dropout
from tensorflow.keras.layers import GlobalAveragePooling2D
from tensorflow.keras.layers import MaxPooling2D
# import the necessary packages
from tensorflow.keras.applications import resnet
from tensorflow.keras import layers
from tensorflow import keras
import tensorflow as tf

In [4]:
# configurations

In [5]:
image_shape = (224,224,1)
batch_size = 64
epochs = 10

In [6]:

def get_embedding_module(imageSize):
    # construct the input layer and pass the inputs through a
    # pre-processing layer
    inputs = keras.Input(imageSize + (3,))
    x = resnet.preprocess_input(inputs)
    
    # fetch the pre-trained resnet 50 model and freeze the weights
    baseCnn = resnet.ResNet50(weights="imagenet", include_top=False)
    baseCnn.trainable=False
    
    # pass the pre-processed inputs through the base cnn and get the
    # extracted features from the inputs
    extractedFeatures = baseCnn(x)
    # pass the extracted features through a number of trainable layers
    x = layers.GlobalAveragePooling2D()(extractedFeatures)
    x = layers.Dense(units=1024, activation="relu")(x)
    x = layers.Dropout(0.2)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Dense(units=512, activation="relu")(x)
    x = layers.Dropout(0.2)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Dense(units=256, activation="relu")(x)
    x = layers.Dropout(0.2)(x)
    outputs = layers.Dense(units=128)(x)
    # build the embedding model and return it
    embedding = keras.Model(inputs, outputs, name="embedding")
    return embedding

In [7]:
def get_siamese_network(imageSize, embeddingModel):
    # build the anchor, positive and negative input layer
    anchorInput = keras.Input(name="anchor", shape=imageSize + (3,))
    positiveInput = keras.Input(name="positive", shape=imageSize + (3,))
    negativeInput = keras.Input(name="negative", shape=imageSize + (3,))
    # embed the anchor, positive and negative images
    anchorEmbedding = embeddingModel(anchorInput)
    positiveEmbedding = embeddingModel(positiveInput)
    negativeEmbedding = embeddingModel(negativeInput)
    # build the siamese network and return it
    siamese_network = keras.Model(
        inputs=[anchorInput, positiveInput, negativeInput],
        outputs=[anchorEmbedding, positiveEmbedding, negativeEmbedding]
    )
    return siamese_network

In [8]:
class SiameseModel(keras.Model):
    def __init__(self, siameseNetwork, margin, lossTracker):
        super().__init__()
        self.siameseNetwork = siameseNetwork
        self.margin = margin
        self.lossTracker = lossTracker
    def _compute_distance(self, inputs):
        (anchor, positive, negative) = inputs
        # embed the images using the siamese network
        embeddings = self.siameseNetwork((anchor, positive, negative))
        anchorEmbedding = embeddings[0]
        positiveEmbedding = embeddings[1]
        negativeEmbedding = embeddings[2]
        # calculate the anchor to positive and negative distance
        apDistance = tf.reduce_sum(
            tf.square(anchorEmbedding - positiveEmbedding), axis=-1
        )
        anDistance = tf.reduce_sum(
            tf.square(anchorEmbedding - negativeEmbedding), axis=-1
        )
        
        # return the distances
        return (apDistance, anDistance)
    def _compute_loss(self, apDistance, anDistance):
        loss = apDistance - anDistance
        loss = tf.maximum(loss + self.margin, 0.0)
        return loss
    def call(self, inputs):
        # compute the distance between the anchor and positive,
        # negative images
        (apDistance, anDistance) = self._compute_distance(inputs)
        return (apDistance, anDistance)
    def train_step(self, inputs):
        with tf.GradientTape() as tape:
            # compute the distance between the anchor and positive,
            # negative images
            (apDistance, anDistance) = self._compute_distance(inputs)
            # calculate the loss of the siamese network
            loss = self._compute_loss(apDistance, anDistance)
        # compute the gradients and optimize the model
        gradients = tape.gradient(
            loss,
            self.siameseNetwork.trainable_variables)
        self.optimizer.apply_gradients(
            zip(gradients, self.siameseNetwork.trainable_variables)
        )
        # update the metrics and return the loss
        self.lossTracker.update_state(loss)
        return {"loss": self.lossTracker.result()}
    def test_step(self, inputs):
        # compute the distance between the anchor and positive,
        # negative images
        (apDistance, anDistance) = self._compute_distance(inputs)
        # calculate the loss of the siamese network
        loss = self._compute_loss(apDistance, anDistance)
        
        # update the metrics and return the loss
        self.lossTracker.update_state(loss)
        return {"loss": self.lossTracker.result()}
    @property
    def metrics(self):
        return [self.lossTracker]

In [9]:
import pandas as pd

In [10]:
music_df = pd.read_csv('spectrogram_df.csv')
music_df.head()

Unnamed: 0,Song Name,Original Spectrogram Path,Hummed Spectrogram Path
0,1,spectrograms/miditowav/00001.png,spectrograms/hummed/year2006a/person00001/0000...
1,1,spectrograms/miditowav/00001.png,spectrograms/hummed/year2006a/person00002/0000...
2,1,spectrograms/miditowav/00001.png,spectrograms/hummed/year2006a/person00003/0000...
3,1,spectrograms/miditowav/00001.png,spectrograms/hummed/year2006a/person00004/0000...
4,1,spectrograms/miditowav/00001.png,spectrograms/hummed/year2006a/person00005/0000...


SyntaxError: invalid syntax (<ipython-input-20-95664e552e4d>, line 1)

In [21]:
class TripletGenerator():
    def __init__(self,datasetPath):
        self.dataset = pd.read_csv(datasetPath)
        self.index = 0
    def get_next_element(self):
        while True:
            anchor_path = self.dataset.iloc[self.index]['Original Spectrogram Path']
            positive_path = self.dataset.iloc[self.index]['Hummed Spectrogram Path']
             # Negative (Non-Matching Original Audio)
            negative_index = np.random.randint(0, len(self.dataset))
            while negative_index == self.index:
                negative_index = np.random.randint(0, len(self.dataset))
            negative_path = self.dataset.iloc[negative_index]['Original Spectrogram Path']
            self.index = (self.index + 1) % len(self.dataset)
            yield anchor_path, positive_path, negative_path       
    def load_and_preprocess_image(anchor_path, positive_path, negative_path):
        def load_image(img_path):
            
            anchor_path = tf.strings.as_string(anchor_path)
            positive_path = tf.strings.as_string(positive_path)
            negative_path = tf.strings.as_string(negative_path)
            img = tf.io.read_file(img_path)
            img = tf.image.decode_image(img, channels=3)
            img = tf.image.resize(img, (224, 224))  # Resize as per your model's requirements
            img = img / 255.0  # Normalize
            return img

        anchor_img = load_image(anchor_path)
        positive_img = load_image(positive_path)
        negative_img = load_image(negative_path)
        return (anchor_img, positive_img, negative_img), None

In [22]:
trainTfDataset = tf.data.Dataset.from_generator(
    generator=TripletGenerator.get_next_element,
    output_types=(tf.string, tf.string, tf.string),  
    output_shapes=((), (), ())
)

valTfDataset = tf.data.Dataset.from_generator(
    generator=TripletGenerator.get_next_element,
    output_types=(tf.string, tf.string, tf.string),  
    output_shapes=((), (), ())
)

# Map the paths to actual images
trainTfDataset = trainTfDataset.map(lambda anchor, positive, negative: 
                                    TripletGenerator.load_and_preprocess_image(anchor, positive, negative),
                                    num_parallel_calls=-1)

valTfDataset = valTfDataset.map(lambda anchor, positive, negative: 
                                  TripletGenerator.load_and_preprocess_image(anchor, positive, negative),
                                  num_parallel_calls=-1)


# Optionally add batching, prefetching
trainTfDataset = trainTfDataset.batch(32).prefetch(-1)
valTfDataset = valTfDataset.batch(32).prefetch(-1)


UnboundLocalError: in user code:

    <ipython-input-18-daf696f01e55>:14 None  *
        trainTfDataset = trainTfDataset.map(lambda anchor, positive, negative:
    <ipython-input-21-f0ea91882cbd>:19 load_image  *
        anchor_path = tf.strings.as_string(anchor_path)

    UnboundLocalError: local variable 'anchor_path' referenced before assignment


In [None]:
# preprocess the images
mapFunction = MapFunction(imageSize=config.IMAGE_SIZE)
print("[INFO] building the train and validation `tf.data` pipeline...")
trainDs = (trainTfDataset
    .map(mapFunction)
    .shuffle(config.BUFFER_SIZE)
    .batch(config.BATCH_SIZE)
    .prefetch(config.AUTO)
)
valDs = (valTfDataset
    .map(mapFunction)
    .batch(config.BATCH_SIZE)
    .prefetch(config.AUTO)
)
# build the embedding module and the siamese network
print("[INFO] build the siamese model...")
embeddingModule = get_embedding_module(imageSize=config.IMAGE_SIZE)
siameseNetwork =  get_siamese_network(imageSize=config.IMAGE_SIZE,embeddingModel=embeddingModule,)
siameseModel = SiameseModel(siameseNetwork=siameseNetwork,margin=0.5,lossTracker=keras.metrics.Mean(name="loss"),)
# compile the siamese model
siameseModel.compile(optimizer=keras.optimizers.Adam(config.LEARNING_RATE))
# train and validate the siamese model
print("[INFO] training the siamese model...")
siameseModel.fit(
trainDs,
steps_per_epoch=config.STEPS_PER_EPOCH,
validation_data=valDs,
validation_steps=config.VALIDATION_STEPS,
epochs=config.EPOCHS,
)
# check if the output directory exists, if it doesn't, then
# create it
if not os.path.exists(config.OUTPUT_PATH):
os.makedirs(config.OUTPUT_PATH)
# save the siamese network to disk
modelPath = config.MODEL_PATH
print(f"[INFO] saving the siamese network to {modelPath}...")
keras.models.save_model(
model=siameseModel.siameseNetwork,
filepath=modelPath,
include_optimizer=False,
)

In [None]:
# USAGE
# python inference.py
# import the necessary packages
from pyimagesearch.dataset import TripletGenerator
from pyimagesearch.dataset import MapFunction
from pyimagesearch.model import SiameseModel
from matplotlib import pyplot as plt
from pyimagesearch import config
from tensorflow import keras
import tensorflow as tf
import os
# create the data input pipeline for test dataset
print("[INFO] building the test generator...")
testTripletGenerator = TripletGenerator(datasetPath=config.TEST_DATASET)
print("[INFO] building the test `tf.data` dataset...")
testTfDataset = tf.data.Dataset.from_generator(generator=testTripletGenerator.get_next_element,
                                               output_signature=(tf.TensorSpec(shape=(), dtype=tf.string),
                                                                 tf.TensorSpec(shape=(), dtype=tf.string),
                                                                 tf.TensorSpec(shape=(), dtype=tf.string),))
mapFunction = MapFunction(imageSize=config.IMAGE_SIZE)
testDs = (testTfDataset.map(mapFunction).batch(4).prefetch(config.AUTO))

In [None]:
# load the siamese network from disk and build the siamese model
modelPath = config.MODEL_PATH
print(f"[INFO] loading the siamese network from {modelPath}...")
siameseNetwork = keras.models.load_model(filepath=modelPath)
siameseModel = SiameseModel(siameseNetwork=siameseNetwork,margin=0.5,lossTracker=keras.metrics.Mean(name="loss"),)
# load the test data
(anchor, positive, negative) = next(iter(testDs))
(apDistance, anDistance) = siameseModel((anchor, positive, negative))
plt.figure(figsize=(10, 10))
rows = 4
for row in range(rows):
    plt.imshow(anchor[row])
    plt.axis("off")
    plt.title("Anchor image")
    plt.subplot(rows, 3, row * 3 + 2)
    plt.imshow(positive[row])
    plt.axis("off")
    plt.title(f"Positive distance: {apDistance[row]:0.2f}")
    plt.subplot(rows, 3, row * 3 + 3)
    plt.imshow(negative[row])
    plt.axis("off")
    plt.title(f"Negative distance: {anDistance[row]:0.2f}")
# check if the output directory exists, if it doesn't, then
# create it
if not os.path.exists(config.OUTPUT_PATH):
    os.makedirs(config.OUTPUT_PATH)
# save the inference image to disk
outputImagePath = config.OUTPUT_IMAGE_PATH
print(f"[INFO] saving the inference image to {outputImagePath}...")
plt.savefig(fname=outputImagePath)