<a href="https://colab.research.google.com/github/gunjanak/Siamese-Network/blob/main/coversongs_cnn.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Load

In [1]:
folder_location = '/content/drive/MyDrive/Colab Notebooks/covers32k'

In [2]:
!pip install torchaudio



In [3]:
import os
import random
import torchaudio
from torch.utils.data import Dataset
from torch.nn import functional as F

In [4]:
class SiameseDataset(Dataset):
  def __init__(self,root_folder,sample_rate=22050,waveform_length=16000):
    self.root_folder = root_folder
    self.sample_rate = sample_rate
    self.waveform_length = waveform_length
    self.folders = os.listdir(root_folder)
    self.triplets = self.generate_triplets()

  def generate_triplets(self):
    triplets = []
    for folder in self.folders:
      folder_path = os.path.join(self.root_folder,folder)
      mp3_files = [f for f in os.listdir(folder_path) if f.endswith('.mp3')]

      #Create positive pairs
      for i in range(len(mp3_files)):
        anchor = mp3_files[i]
        positive = mp3_files[(i+1) % len(mp3_files)]

        #Create negative pairs
        other_folders = [f for f in self.folders if f != folder]
        random_folder = random.choice(other_folders)
        random_negative = random.choice(os.listdir(os.path.join(self.root_folder,random_folder)))
        triplets.append((os.path.join(folder_path,anchor),
                         os.path.join(folder_path,positive),
                         os.path.join(self.root_folder,random_folder,random_negative)))
    return triplets

  def load_audio_file(self,file_path):
    waveform,sample_rate = torchaudio.load(file_path,normalize=True)
    return waveform, sample_rate

  def resize_waveform(self,waveform,length):
    if waveform.size(1) < length:
      #pad if the waveform is shorter than the specified length
      pad_length = length - waveform.size(1)
      waveform = F.pad(waveform,(0,pad_length))
    elif waveform.size(1) > length:
      #Randomly crop if the waveform is longet than the specified length
      start = random.randint(0,waveform.size(1)-length)
      waveform = waveform[:,start:start+length]
    return waveform

  def __len__(self):
    return len(self.triplets)

  def __getitem__(self, index):
    anchor_path,positive_path,negative_path = self.triplets[index]

    #Load audio data and convert to tensors
    anchor_waveform, _ = self.load_audio_file(anchor_path)
    positive_waveform, _ = self.load_audio_file(positive_path)
    negative_waveform, _ = self.load_audio_file(negative_path)

    anchor_waveform = self.resize_waveform(anchor_waveform,self.waveform_length)
    positive_waveform = self.resize_waveform(positive_waveform,self.waveform_length)
    negative_waveform = self.resize_waveform(negative_waveform,self.waveform_length)

    #again resize from (1,160000) to (1,400,400)
    anchor_waveform = anchor_waveform.resize_(1,200,200)
    positive_waveform = positive_waveform.resize_(1,200,200)
    negative_waveform = negative_waveform.resize_(1,200,200)

    return anchor_waveform, positive_waveform,negative_waveform

In [5]:
simaese_dataset = SiameseDataset(folder_location)

In [6]:
anchor,positive,negative = simaese_dataset[0]

In [7]:
anchor.shape

torch.Size([1, 200, 200])

In [8]:
negative.shape

torch.Size([1, 200, 200])

In [9]:
positive.shape

torch.Size([1, 200, 200])

#SiameseNetwork

In [10]:
import torch
import torch.nn as nn

In [11]:
class SiameseNetwork(nn.Module):
  def __init__(self,dropout_prob=0.5):
    super(SiameseNetwork,self).__init__()

    #Shared convolutional layers
    self.convolutional_layers = nn.Sequential(
        nn.Conv2d(1,64,kernel_size=3,padding=1),
        nn.ReLU(inplace=True),
        nn.Dropout2d(p=dropout_prob),
        nn.MaxPool2d(kernel_size=2,stride=2),
        nn.Conv2d(64,128,kernel_size=3,padding=1),
        nn.ReLU(inplace=True),
        nn.Dropout2d(p=dropout_prob),
        nn.MaxPool2d(kernel_size=2,stride=2),
        nn.Conv2d(128,256,kernel_size=3,padding=1),
        nn.ReLU(inplace=True),
        nn.Dropout2d(p=dropout_prob),
        nn.MaxPool2d(kernel_size=2,stride=2)

    )

    #Fully connected layers for each branch
    self.fc_layers = nn.Sequential(
        nn.Linear(160000,1024),
        nn.ReLU(inplace=True),
        nn.Linear(1024,512),
        nn.ReLU(inplace=True),
        nn.Linear(512,256),
        nn.ReLU(inplace=True),
        nn.Linear(256,128)
    )

  def forward_one(self,x):
    #forward pass for one branch
    x = self.convolutional_layers(x)
    # print(x.shape)
    x = x.view(x.size()[0],-1)
    # print(x.shape)
    x = self.fc_layers(x)
    return x

  def forward(self,anchor,positive,negative):
    #forward pass for each branch
    anchor_out = self.forward_one(anchor)
    positive_out = self.forward_one(positive)
    negative_out = self.forward_one(negative)

    return anchor_out, positive_out, negative_out



#Loss function

In [12]:
# Define triplet loss
class TripletLoss(nn.Module):
    def __init__(self, margin=1.0):
        super(TripletLoss, self).__init__()
        self.margin = margin

    def forward(self, anchor, positive, negative):
        distance_positive = (anchor - positive).pow(2).sum(1)
        distance_negative = (anchor - negative).pow(2).sum(1)
        loss = torch.relu(distance_positive - distance_negative + self.margin)
        return loss.mean()

In [13]:
triplet_loss = TripletLoss()

#Train

In [22]:
import torch.optim as optim
from torch.utils.data import DataLoader

In [23]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [24]:
#create an instance of the SiameseNetwork
dropout_prob = 0.5  # Adjust as needed
siamese_net = SiameseNetwork(dropout_prob).to(device)
# siamese_net = SiameseNetwork().to(device)

In [25]:
# #define the triplet loss
# triplet_loss = nn.TripletMarginLoss(margin=1.0)

In [26]:
#create optimizer
optimizer = optim.Adam(siamese_net.parameters(),lr=0.001)

In [27]:
dataloader = DataLoader(simaese_dataset,batch_size=64,shuffle=True,num_workers=2)

In [28]:
num_epochs = 50

#Main training loop

In [29]:
for epoch in range(num_epochs):
  total_loss = 0.0

  for batch in dataloader:
    anchor,positive,negative = batch
    anchor,positive,negative = anchor.to(device),positive.to(device),negative.to(device)

    #Zero the gradients
    optimizer.zero_grad()

    #Forward pass
    anchor_out,positive_out,negative_out = siamese_net(anchor,positive,negative)

    #Compute triplet loss
    loss = triplet_loss(anchor_out,positive_out,negative_out)

    #Backward pass and optimization
    loss.backward()
    optimizer.step()

    total_loss += loss.item()

  #print average loss for the epoch
  avg_loss = total_loss / len(dataloader)
  print(f"Epoch {epoch +1}/{num_epochs}, Average Loss: {avg_loss:.4f}")


Epoch 1/50, Average Loss: 1.3641
Epoch 2/50, Average Loss: 1.1888
Epoch 3/50, Average Loss: 1.1002
Epoch 4/50, Average Loss: 1.0865
Epoch 5/50, Average Loss: 1.0161
Epoch 6/50, Average Loss: 0.9764
Epoch 7/50, Average Loss: 1.0817
Epoch 8/50, Average Loss: 1.0156
Epoch 9/50, Average Loss: nan
Epoch 10/50, Average Loss: 1.0000
Epoch 11/50, Average Loss: 1.0000
Epoch 12/50, Average Loss: 1.0000
Epoch 13/50, Average Loss: 1.0000
Epoch 14/50, Average Loss: 1.0000
Epoch 15/50, Average Loss: 1.0000
Epoch 16/50, Average Loss: 1.0000
Epoch 17/50, Average Loss: 1.0000
Epoch 18/50, Average Loss: 1.0000
Epoch 19/50, Average Loss: 1.0000
Epoch 20/50, Average Loss: 1.0000
Epoch 21/50, Average Loss: 1.0000
Epoch 22/50, Average Loss: 1.0000
Epoch 23/50, Average Loss: 1.0000
Epoch 24/50, Average Loss: 1.0000
Epoch 25/50, Average Loss: 1.0000
Epoch 26/50, Average Loss: 1.0000
Epoch 27/50, Average Loss: 1.0000
Epoch 28/50, Average Loss: 1.0000
Epoch 29/50, Average Loss: 1.0000
Epoch 30/50, Average Loss: