<a href="https://colab.research.google.com/github/Abhi10699/google-colab-playground/blob/main/the_siamese_network.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.applications.vgg16 import VGG16

In [None]:
def siamese_network(image_input_shape, sound_input_shape):

    image_input = tf.keras.Input(shape=image_input_shape)
    sound_input = tf.keras.Input(shape=sound_input_shape)
    vgg = VGG16(
        include_top=False,
        input_shape=(512, 512, 3),
        pooling='max',
    )

    image_x = vgg(image_input)
    image_x = layers.Flatten()(image_x)

    sound_embeddings = layers.Embedding(2,128)
    sound_fc1 = layers.Dense(64, activation='relu')
    sound_fc2 = layers.Dense(64, activation='relu')

    sound_x = sound_embeddings(sound_input)
    sound_x = layers.Flatten()(sound_x)
    sound_x = sound_fc1(sound_x)
    sound_x = sound_fc2(sound_x)

    concatenated = layers.Concatenate()([image_x, sound_x])

    x = layers.Dense(128, activation='relu')(concatenated)

    output = layers.Dense(1, activation='sigmoid')(x)

    model = tf.keras.Model(inputs=[image_input, sound_input], outputs=output)

    return model


In [None]:
model = siamese_network([512,512, 3],[2])
model.compile(optimizer="adam",loss="binary_crossentropy",metrics=['accuracy'])
model.summary()

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/vgg16/vgg16_weights_tf_dim_ordering_tf_kernels_notop.h5
Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_4 (InputLayer)           [(None, 2)]          0           []                               
                                                                                                  
 embedding (Embedding)          (None, 2, 128)       256         ['input_4[0][0]']                
                                                                                                  
 input_3 (InputLayer)           [(None, 512, 512, 3  0           []                               
                                )]                                                                
                                                      

### Model Testing

In [None]:
import numpy as np
import matplotlib.pyplot as plt

from random import randint

In [None]:
sample_size = 10
img_test = np.random.random(size= [sample_size,512,512,3])
sound_test = np.random.random(size=[sample_size,2])

labels = np.array([randint(0,1) for i in range(0,sample_size)])

In [None]:
model.fit([img_test, sound_test], labels)



<keras.callbacks.History at 0x7e11f2146a40>

In [None]:
img_test = np.random.random(size= [1,512,512,3])
sound_test = np.random.random(size=[1,2])

model.predict([img_test, sound_test])



array([[0.96297336]], dtype=float32)

In [None]:
import psycopg2

conn = psycopg2.connect(
    host="db.ncezbfbkdrpncldnvrys.supabase.co",
    port="5432",
    user="postgres",
    password="imsrecsys@1234",
    database="postgres",
    sslmode="verify-ca",
    sslrootcert="./prod-ca-2021.crt"
)


def get_labelled_data():
    cur = conn.cursor()
    try:
        cur.execute('''
          select
            url_ref,
            sng.acousticness,
            sng.danceability,
            sng.valence,
            sng.speechiness,
            ims.liked,
            img._id as "image_id"
          from
              tbl_image_songs as ims
          join
            tbl_images as img
            on
              img._id = ims.image_id
            join
              tbl_songs as sng
            on
              ims.track_id = sng._id;
          ''')
        data = cur.fetchall()
        return data
    except Exception as e:
        print(e)
        conn.rollback()

    finally:
        cur.close()

In [None]:
!rm -rf ./images

In [None]:
import requests
import numpy as np
import cv2
import os
import ssl
import tensorflow as tf

from google.colab.patches import cv2_imshow
from tqdm import tqdm
from urllib.parse import quote

from urllib3.exceptions import InsecureRequestWarning
from urllib.request import urlopen

from tensorflow.keras import layers
from tensorflow.keras.applications.vgg16 import VGG16

from random import shuffle

# disable warnings
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)


# constants

IMAGE_DIMENSIONS = (128, 128)
IMAGE_SHAPE = [128,128,3]

# create image director
try:
    os.makedirs("./images/")
except:
    pass

def get_dataset():
    # get data
    dataset = get_labelled_data()
    shuffle(dataset)

    # download images and preprocess to train

    image_data = []
    downloaded_images = os.listdir("./images/")

    ctx = ssl.create_default_context()
    ctx.check_hostname = False
    ctx.verify_mode = ssl.CERT_NONE

    images_arr = []
    songs_arr = []
    liked_arr = []

    for data in tqdm(dataset):
        # TODO: get extension from image name itself

        file_name = f"./images/{data[6]}.jpeg"
        if not f"{data[6]}.jpeg" in downloaded_images:

            url_clean = data[0].split("https://")[1]
            url_clean = f"https://{url_clean}".replace(" ", "%20")

            resp = urlopen(url_clean, context=ctx)
            image = np.asarray(bytearray(resp.read()), dtype="uint8")
            image = cv2.imdecode(image, cv2.IMREAD_COLOR)
            # image = image / 255
            image = cv2.resize(image, IMAGE_DIMENSIONS)

            cv2.imwrite(file_name, image)

            # mark id as downloaded
            downloaded_images.append(data[6])

        else:
            image = cv2.imread(file_name)
            image = cv2.resize(image, IMAGE_DIMENSIONS)



        song_attribs = data[1:5]
        liked = 1 if data[5] == True else 0

        songs_arr.append(song_attribs)
        images_arr.append(image)
        liked_arr.append(liked)

    return np.array(songs_arr), np.array(images_arr), np.array(liked_arr)

# setup training data

songs_arr, images_arr, liked_arr = get_dataset()
print(f"Image Shape: {images_arr.shape}")
print(f"Songs Shape: {songs_arr.shape}")
print(f"Labels Shape: {liked_arr.shape}")

100%|██████████| 4600/4600 [1:19:54<00:00,  1.04s/it]

Image Shape: (4600, 128, 128, 3)
Songs Shape: (4600, 4)
Labels Shape: (4600,)





In [None]:
!unzip checkpoints.zip

Archive:  checkpoints.zip
   creating: checkpoints/
  inflating: checkpoints/my_checkpoint.data-00000-of-00001  
  inflating: checkpoints/my_checkpoint.index  
  inflating: checkpoints/checkpoint  


In [None]:

from tensorflow.keras import Model

def LandscapeConvNet():

  # image input layer
  image_input = layers.Input(shape=(128, 128, 3))

  # vgg conv layer
  vgg = VGG16(
    include_top=False,
    input_shape=(128, 128, 3),
    pooling='max',
  )

  # dense layers

  fc1 = layers.Dense(128, activation="relu")
  fc2 = layers.Dense(64, activation="relu")
  fc3 = layers.Dense(32, activation="relu")
  fc4 = layers.Dense(16, activation="relu")


  # layer connections
  image_x = vgg(image_input)
  image_x = fc1(image_x)
  image_x = fc2(image_x)
  image_x = fc3(image_x)
  image_x = fc4(image_x)

  # output connection

  output = layers.Dense(4, activation="softmax")(image_x)


  # model

  model = Model(inputs=image_input, outputs=output)

  # compile
  model.compile(
      optimizer="adam",
      loss="categorical_crossentropy",
      metrics = ['accuracy']
  )
  return model


def siamese_network(image_input_shape, sound_input_shape):
    """
    Siamese network architecture for image and sound similarity.

    Args:
        image_input_shape: Shape of the image input samples.
        sound_input_shape: Shape of the sound input samples.

    Returns:
        Siamese network model.
    """

    # Define the input layers for the image and sound samples
    image_input = tf.keras.Input(shape=image_input_shape)
    sound_input = tf.keras.Input(shape=sound_input_shape)
    vgg = VGG16(
        include_top=False,
        input_shape=(128, 128, 3),
    )

    # load pretrained
    # landscape_conv_base = LandscapeConvNet()
    # landscape_conv_base.load_weights("./checkpoints/my_checkpoint").expect_partial()


    # landscape_conv_pretrained = tf.keras.Sequential(landscape_conv_base.layers[:2])


    # # do not learn
    # landscape_conv_base.trainable = False
    # landscape_conv_pretrained.trainable = False


    # Shared image convolutional layers
    image_x = vgg(image_input)
    image_dropout = layers.Dropout(0.2)(image_x)
    image_x = layers.Flatten()(image_x)

    # shared sound fc layers
    # sound_embeddings = layers.Embedding(4, 512)
    sound_fc1 = layers.Dense(128, activation='tanh')
    sound_fc2 = layers.Dense(64, activation='tanh')
    sound_fc3 = layers.Dense(32, activation='tanh')


    # Process the sound sample
    sound_x = sound_fc1(sound_input)
    sound_x = sound_fc2(sound_input)
    sound_x = sound_fc3(sound_input)

    # Concatenate the processed image and sound features
    concatenated = layers.Concatenate()([image_x, sound_x])

    # Fully connected layer
    x = layers.Dense(128, activation='relu')(concatenated)

    # Output layer
    output = layers.Dense(1, activation='sigmoid')(x)

    model = tf.keras.Model(inputs=[image_input, sound_input], outputs=output)

    return model


model = siamese_network(IMAGE_SHAPE, [4])
model.compile(
    optimizer="adam",
    loss="binary_crossentropy",
    metrics=['accuracy']
)

print(model.summary())

# setup checkpointing callback

checkpoint_path = "./models/siamese-ckpts/"
ckpt_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_path,
    verbose=1
)

# train the model

model.fit(
    [images_arr, songs_arr],
    liked_arr,
    callbacks=[ckpt_callback],
    validation_split=0.1,
    epochs=4
)

Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_6 (InputLayer)           [(None, 128, 128, 3  0           []                               
                                )]                                                                
                                                                                                  
 vgg16 (Functional)             (None, 4, 4, 512)    14714688    ['input_6[0][0]']                
                                                                                                  
 input_7 (InputLayer)           [(None, 4)]          0           []                               
                                                                                                  
 flatten (Flatten)              (None, 8192)         0           ['vgg16[0][0]']            



Epoch 2/4
Epoch 2: saving model to ./models/siamese-ckpts/




Epoch 3/4
Epoch 3: saving model to ./models/siamese-ckpts/




Epoch 4/4
Epoch 4: saving model to ./models/siamese-ckpts/






<keras.callbacks.History at 0x7d15702f7f10>

In [None]:
def test_image(image_path, song_attrib):
  image = cv2.imread(image_path)
  image = cv2.resize(image, (128, 128))
  image = np.expand_dims(image, axis=0).astype('float32')

  song = np.array(song_attrib).reshape(1,-1).astype('float32')
  preds = model.predict([image, song],verbose=0)
  return preds[0]

In [None]:
import pandas as pd

df = pd.read_csv("spotify_200K.csv")
df = df[['name','track_uri','acousticness','danceability','valence','speechiness']]

In [None]:
similarity_idx = {}
for idx, vals in tqdm(enumerate(df.sample(100).values)):
  song = np.array(vals[2:])
  pred = test_image("./9945.jpeg",song)
  similarity_idx[vals[1]] = pred

100it [00:07, 12.77it/s]


In [None]:
from operator import itemgetter
songs = sorted(similarity_idx.items(), key=itemgetter(1),reverse=True)
for song in songs:
  print(f"https://open.spotify.com/track/{song[0].split(':')[2]} - {song[1]}")

https://open.spotify.com/track/6kz2mblgBKN2GyKnjfiB5Z - [0.14452843]
https://open.spotify.com/track/6D9JyzMU2sPPt1cAdtTUlo - [0.1422873]
https://open.spotify.com/track/3XnMv2HUUzl5rCwWJlYELW - [0.14178547]
https://open.spotify.com/track/5yMH1OfBOrKE3W3wGkRRCR - [0.14112675]
https://open.spotify.com/track/1ZC1vU3OWjOZqREj9th9eh - [0.14095347]
https://open.spotify.com/track/4T9asf08sPU9aAMiPe3wWh - [0.13900433]
https://open.spotify.com/track/2drjvvxnEhyJmlokeZSvhr - [0.13888806]
https://open.spotify.com/track/6vYpHgyoU5m3DdJIwHzWTJ - [0.13845861]
https://open.spotify.com/track/54Cd9MzI1eC9jxXrL7SshL - [0.13454334]
https://open.spotify.com/track/1qe1mHUozUo7PoMBBLnr3i - [0.13393535]
https://open.spotify.com/track/5hGsKbBKTyu0GylYdHcuaf - [0.13353013]
https://open.spotify.com/track/1waSaXG4TvuscnmGvyneff - [0.13243368]
https://open.spotify.com/track/3ncySJfjsBGeYyiEEkglqq - [0.12929824]
https://open.spotify.com/track/6sMYYobL2yYL5mhg8hZAmB - [0.12060738]
https://open.spotify.com/track/2gS1

In [None]:
!zip -r model.zip models/siamese-ckpts

  adding: models/siamese-ckpts/ (stored 0%)
  adding: models/siamese-ckpts/keras_metadata.pb (deflated 96%)
  adding: models/siamese-ckpts/fingerprint.pb (stored 0%)
  adding: models/siamese-ckpts/variables/ (stored 0%)
  adding: models/siamese-ckpts/variables/variables.data-00000-of-00001 (deflated 7%)
  adding: models/siamese-ckpts/variables/variables.index (deflated 71%)
  adding: models/siamese-ckpts/assets/ (stored 0%)
  adding: models/siamese-ckpts/saved_model.pb (deflated 90%)
