# Mount Google Drive
For easy access to datasets

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Imports

In [None]:
import torch
import torchvision
from PIL import Image
from torch.utils.data import Dataset, random_split, DataLoader
import matplotlib.pyplot as plt
from tqdm import tqdm
import h5py
import numpy as np
import random
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision.transforms as transforms
import os
from datetime import datetime
import csv
import sys
import math
import scipy
import statistics
csv.field_size_limit(sys.maxsize)

131072

# **Settings:** Paths, version etc.
Specify if Christian or Philip here

In [None]:
#Put name here!
name = "christian"

#Dataset version
version = "resnet"

if name.lower() == "christian":
  checkpoint_path = r"/content/drive/MyDrive/ML models/Checkpoints/IRM"
  train_path = r"/content/drive/MyDrive/Data/Train.hdf5"
  dev_path = r"/content/drive/MyDrive/Data/Dev.hdf5"
  test_path = r"/content/drive/MyDrive/Data/Test.hdf5"
  csv_path = r"/content/drive/MyDrive/Data/CSV_Data"
  image_path = r"/content/drive/MyDrive/Data/Images"
  seq_lengths_path = r"/content/drive/MyDrive/Data/seq_lengths.hdf5"
elif name.lower() == "philip":
  checkpoint_path = r"/content/drive/MyDrive/ITU/Software Design - Kandidat/Master Thesis/ML models/Checkpoints/IRM"
  train_path = r"/content/drive/MyDrive/ITU/Software Design - Kandidat/Master Thesis/Data/Train.hdf5"
  dev_path = r"/content/drive/MyDrive/ITU/Software Design - Kandidat/Master Thesis/Data/Dev.hdf5"
  test_path = r"/content/drive/MyDrive/ITU/Software Design - Kandidat/Master Thesis/Data/Test.hdf5"
  csv_path = r"/content/drive/MyDrive/ITU/Software Design - Kandidat/Master Thesis/Data/CSV_Data"
  image_path = r"/content/drive/MyDrive/ITU/Software Design - Kandidat/Master Thesis/Data/Images"
  seq_lengths_path = r"/content/drive/MyDrive/ITU/Software Design - Kandidat/Master Thesis/Data/seq_lengths.hdf5"
else:
  raise Exception("Invalid name for path - use 'christian' or 'philip'")

# Set Device

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

print(device)

cuda


# **Setup:** Dataset Class
Note, this is slightly different from the tutorial that this implementation is based on as our dataset is in the *HDF5*-format

In [None]:
# Current version used for testing the set up

class FiskeSet_test(Dataset):
  def __init__(self, anchors, positives, seed, amount, transform=None, version="vanilla"):
    random.seed(seed)
    if amount <=1:
      raise Exception("Amount of samples must be greater than 1 or negatives cannot be fetched.")


    self.anchors = anchors
    self.positives = positives
    self.transform = transform
    self.amount = amount

  def __len__(self):
    return self.amount

  def __getitem__(self, idx):
    anchor = self.anchors[idx]
    positive = self.positives[idx]

    negative = self.anchors[random.choice(range(self.amount))]

    while list(anchor) == list(negative):
      negative = self.anchors[random.choice(range(self.amount))]

    if self.transform:
      if version != "vanilla":
        positive = Image.fromarray(positive)
      positive = self.transform(positive)

    return anchor, positive, negative

In [None]:
# Generic version used for the entire dataset (once ready)

class FiskeSet(Dataset):
  def __init__(self, anchors, positives, seed, transform=None, version = "vanilla"):
    random.seed(seed)
    
    self.anchors = anchors
    self.positives = positives
    self.transform = transform

  def __len__(self):
    return len(self.positives)

  def __getitem__(self, idx):
    anchor = self.anchors[idx]
    positive = self.positives[idx]

    negative = random.choice(self.anchors)

    while list(anchor) == list(negative):
      negative = random.choice(self.anchors)

    if self.transform:
      if version != "vanilla":
        positive = Image.fromarray(positive)
      positive = self.transform(positive)

    return anchor, positive, negative

# **Setup:** Data Loading

In [None]:
#Images need to be transformed in accordance with the specifications of the version

if version == "resnet":
  transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

elif version == "vanilla":
  transform = transforms.Compose(
      [transforms.ToTensor(),
      transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

else:
  raise Exception("Please input a valid cnn-model")

def generate_genres_tensor(genres_vectors):
  """
  Takes a bunch of vectors (numpy arrays) of genre-names and creates one-hot vectors
  based on them. The resulting tensor can be concatenated to anchors using .cat.
  """
  genres_master = ["Blues", "Classical", "Electronic", "Folk World & Country", "Funk / Soul", "Hip Hop", "Jazz", "Latin",
              "Pop", "Reggae", "Rock"]
  genres_tensor = []
  for idx, genres in enumerate(genres_vectors):
    list_vector = []
    if "Folk" in genres:
      genres = genres.replace("Folk, World,", "Folk World")
    genres = genres.split(", ")
    for i, g in enumerate(genres_master):
      if g in genres:
        list_vector.append(1)
      else:
        list_vector.append(0)
    genres_tensor.append(torch.tensor(list_vector))
  genres_tensor = torch.stack(tuple(genres_tensor))
  return genres_tensor

#One hot encoding for years. Needs review.
def generate_years_tensor(years_vectors):

  years_master = list(range(100))

  years_tensor = []
  for idx, year in enumerate(years_vectors):
    list_vector = []
    for y in years_master:
      if (int(year[-2:]) == int(y)):
        list_vector.append(1)
      else:
        list_vector.append(0)
    years_tensor.append(torch.tensor(list_vector))
  years_tensor = torch.stack(tuple(years_tensor))
  return years_tensor

def data_normalization(data):

  minimum = np.amin(data)
  maximum = np.amax(data)
  for i in range(len(data)):
    data[i] = (data[i]-minimum)/(maximum-minimum)
  
  return torch.tensor(data)

def create_dataset(path, input_attributes, seed, amount = None, debug = False, version = "vanilla"):
  h = h5py.File(path)

  svd_attributes = list(h["Single Value Data"].attrs["column_names"])
  md_attributes = list(h["Metadata"].attrs["column_names"])


  svd_idx_list = []
  tensor_lyf = []
  if debug:
    random.seed(seed)
    rands = random.sample(range(h["Single Value Data"].shape[0]), amount)
    rands.sort()
    images = h["Images"][rands]
    for att in input_attributes:
      if "Genres" == att:
        genres_vectors = h["Metadata"].asstr()[rands, md_attributes.index("Genres")]
        genres_tensor = generate_genres_tensor(genres_vectors)
        tensor_lyf.append(genres_tensor)
        #anchors = torch.cat((anchors, genres_tensor), dim=1)
      elif "Year" == att:
        years_vectors = h["Metadata"].asstr()[rands, md_attributes.index("Year")]
        years_tensor = generate_years_tensor(years_vectors)
        tensor_lyf.append(years_tensor)
        #anchors = torch.cat((anchors, years_tensor), dim=1)
      elif "_norm" in att:
        clean_att = att.replace("_norm", "")
        idx = svd_attributes.index(clean_att)
        att_vectors = h["Single Value Data"][rands, idx]
        att_tensor = data_normalization(att_vectors)
        att_tensor = att_tensor[:, None]
        tensor_lyf.append(att_tensor)
        #anchors = torch.cat((anchors, att_tensor), dim=1)
      else:
        svd_idx_list.append(svd_attributes.index(att))
    for svd in svd_idx_list:
      addition_tensor = h["Single Value Data"][rands, svd]
      addition_tensor = addition_tensor[:, None]
      tensor_lyf.append(torch.tensor(addition_tensor))
    anchors = torch.cat(tuple(tensor_lyf), dim=1)
    print("Anchors SHAPE:", anchors.shape)
    dataset = FiskeSet_test(anchors, images, seed, amount, transform, version)
    
  else:
    images = h["Images"]
    for att in input_attributes:
      if "Genres" == att:
        genres_vectors = h["Metadata"].asstr()[:, md_attributes.index("Genres")]
        genres_tensor = generate_genres_tensor(genres_vectors)
        tensor_lyf.append(genres_tensor)
      elif "Year" == att:
        years_vectors = h["Metadata"].asstr()[:, md_attributes.index("Year")]
        years_tensor = generate_years_tensor(years_vectors)
        tensor_lyf.append(years_tensor)
      elif "_norm" in att:
        clean_att = att.replace("_norm", "")
        idx = svd_attributes.index(clean_att)
        att_vectors = h["Single Value Data"][:, idx]
        att_tensor = data_normalization(att_vectors)
        att_tensor = att_tensor[:, None]
        tensor_lyf.append(att_tensor)
      else:
        svd_idx_list.append(svd_attributes.index(att))
    for svd in svd_idx_list:
      addition_tensor = h["Single Value Data"][:, svd]
      addition_tensor = addition_tensor[:, None]
      tensor_lyf.append(torch.tensor(addition_tensor))
    anchors = torch.cat(tuple(tensor_lyf), dim=1)
    print("Anchors SHAPE:", anchors.shape)

    dataset = FiskeSet(anchors, images, seed, transform, version)
    # print("Hello?", genres_tensor.shape, anchors.shape)

  return dataset

# **Setup:** Image conversion


In [None]:
def show_img(rgb_matrix):
  img = rgb_matrix.permute(1, 2, 0)
  img = np.array(img)
  plt.imshow(img)

def show_x_images(imgs):
  fig = plt.figure(figsize=(50, 30))

  x = len(imgs)
  # setting values to rows and column variables
  if x % 2 == 0:
    rows = x/2
    columns = x/2
  else:
    rows = (x+1)/2
    columns = (x+1)/2

  for i in range(1, x+1):
    # Adds a subplot at the 1st position
    fig.add_subplot(rows, columns, i)

    # showing image
    show_img(imgs[i-1])
    plt.axis('off')
    if i == 1:
      plt.title("ANCHOR")
    else:
      plt.title(i)
    
def show_images_advanced(img_dict, save = False):
  fig = plt.figure(figsize=(20, 20))
  plt.tight_layout()
  x = len(img_dict)
  postfix = str(x) + "_images_" + datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  if x % 2 == 0:
    rows = x//2
    columns = x//2
  else:
    rows = (x+1)//2
    columns = rows

  idx = 0
  for key, value in img_dict.items():
    if key == "Anchor":
      plt.subplot2grid((rows, columns), (0, columns//2-1), rowspan=2, colspan=2)
      show_img(value)
      plt.axis('off')
      idx += columns*2

    else:
      plt.subplot2grid((rows, columns), (idx//rows, idx%columns))
      show_img(value)
      plt.axis('off')
      idx += 1
    plt.title(key)

  if save:
    plt.savefig(f"{image_path}/{postfix}.pdf", dpi=300, bbox_inches="tight")
    

# **Settings:** Training initialization
Options are specified here

In [None]:
input_attributes = ["energy", "danceability", "speechiness", "acousticness", "instrumentalness",
                    "liveness", "valence", "Year", "Genres", "tempo_norm", "loudness_norm", "key_norm",
                    "mode"]
genre_dim = 10
year_dim = 99

in_dim = len(input_attributes) + genre_dim + year_dim
out_dim = 1000
print("in_dim:", in_dim)
seed = 42
batch_size = 16
##Change if in Debug mode
amount = 10_000


train_set = create_dataset(train_path, input_attributes, seed, amount, True, version)
show_img(train_set[0][1])
#dev_set = create_dataset(dev_path, input_attributes)
#test_set = create_dataset(test_path, input_attributes)
amount = len(train_set)

trainloader = DataLoader(train_set, shuffle = True, num_workers = 2, batch_size=batch_size)
#devloader = DataLoader(dev_set, shuffle = True, num_workers = 2, batch_size=batch_size)
#testloader = DataLoader(test_set, shuffle = True, num_workers = 2, batch_size=batch_size)

# CNN Class
Output mapped between -1 to 1 with tanh

In [None]:
class CNN(nn.Module):
  def __init__(self):
    super().__init__()
    self.conv1 = nn.Conv2d(3,6,5)
    self.pool = nn.MaxPool2d(2,2)
    self.conv2 = nn.Conv2d(6,16,5)
    self.fc1 = nn.Linear(2704, 120)
    self.fc2 = nn.Linear(120, 84)
    self.fc3 = nn.Linear(84, 128)

  def forward(self, x):
    x = self.pool(F.leaky_relu(self.conv1(x)))
    x = self.pool(F.leaky_relu(self.conv2(x)))
    x = torch.flatten(x,1)
    x = F.leaky_relu(self.fc1(x))
    x = F.leaky_relu(self.fc2(x)) 
    x = torch.tanh(self.fc3(x))
    return x


# Deep CNN Class

In [None]:
class Deep_CNN(nn.Module):
  def __init__(self):
    super().__init__()
    self.conv1 = nn.Conv2d(3,6,5)
    self.pool = nn.MaxPool2d(2,2)
    self.conv2 = nn.Conv2d(6,16,5)
    self.fc1 = nn.Linear(2704, 256)
    self.fc2 = nn.Linear(256, 256)
    self.fc3 = nn.Linear(256, 128)
    self.fc4 = nn.Linear(128, 84)
    self.fc5 = nn.Linear(84, 128)

  def forward(self, x):
    x = self.pool(F.leaky_relu(self.conv1(x)))
    x = self.pool(F.leaky_relu(self.conv2(x)))
    x = torch.flatten(x,1)
    x = F.leaky_relu(self.fc1(x))
    x = F.leaky_relu(self.fc2(x))
    x = F.leaky_relu(self.fc3(x))
    x = F.leaky_relu(self.fc4(x)) 
    x = torch.tanh(self.fc5(x))

    return x

# ResNet

In [None]:
cnn = torch.hub.load('pytorch/vision:v0.10.0', 'resnet18', pretrained=True).to(device)

Using cache found in /root/.cache/torch/hub/pytorch_vision_v0.10.0


# MLP Class

This is a setup of the multilayer perceptron used for encoding.
Output mapped between -1 to 1 with tanh

In [None]:
class MLP(nn.Module):
  def __init__(self, in_dim, out_dim):
    super().__init__()
    self.fc1 = nn.Linear(in_dim, 32)
    self.fc2 = nn.Linear(32, 64)
    self.fc3 = nn.Linear(64, out_dim)

  def forward(self, x):
    x = self.fc1(x)
    x = F.leaky_relu(x)
    x = self.fc2(x)
    x = F.leaky_relu(x)
    x = self.fc3(x)
    x = torch.tanh(x)
    
    return x

# Deep MLP Class

In [None]:
class Deep_MLP(nn.Module):
  def __init__(self, in_dim, out_dim):
    super().__init__()
    self.fc1 = nn.Linear(in_dim, 32)
    self.fc2 = nn.Linear(32, 32)
    self.fc3 = nn.Linear(32, 64)
    self.fc4 = nn.Linear(64, 64)
    self.fc5 = nn.Linear(64, out_dim)

  def forward(self, x):
    x = F.leaky_relu(self.fc1(x))
    x = F.leaky_relu(self.fc2(x))
    x = F.leaky_relu(self.fc3(x))
    x = F.leaky_relu(self.fc4(x)) 
    x = torch.tanh(self.fc5(x))
    
    return x

# Loss and Optimizer

In [None]:
criterion = nn.TripletMarginLoss()
params = list(cnn.parameters())
params.extend(mlp.parameters())
optimizer = optim.Adam(params, lr=0.001)

# **Settings:** Training

Specify epochs and which models to train

In [None]:
mlp.train()
cnn.train()

losses = []
epochs = 40

#Change to save checkpoints
enable_checkpoints = True

# Training Routine

In [None]:
if enable_checkpoints:
  time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  folder_path = checkpoint_path + "/" + str(time)
  os.mkdir(folder_path)
  os.mkdir(folder_path + "/mlp")
  os.mkdir(folder_path + "/cnn")

  with open(folder_path + r"/model_settings.txt", "w") as current_settings:
    current_settings.write(f"ATTRIBUTES: {input_attributes} \nAMOUNT: {amount} \nSEED: {seed} \nBATCH SIZE: {batch_size}")

  with open(folder_path + r"/epoch_losses.csv", "w", encoding="UTF8", newline="") as epoch_losses:
    writer = csv.writer(epoch_losses, delimiter=";")
    writer.writerow(["EPOCH", "LOSS PER SAMPLE"])

for epoch in tqdm(range(epochs)):

  running_loss = 0.0
  #print(f"Epoch: {epoch}")
  for i, data in enumerate(trainloader):

    anchors, positives, negatives = data

    anchors = anchors.to(device)
    anchors = anchors.float()
    positives = positives.to(device)
    positives = positives.float()
    negatives = negatives.to(device)
    negatives = negatives.float()


    optimizer.zero_grad()

    anchors = mlp(anchors)
    negatives = mlp(negatives)
    positives = cnn(positives)

    loss = criterion(anchors, positives, negatives)
    loss.backward()
    optimizer.step()

    running_loss += loss.item()
  
  print(f"\nEpoch Loss: {running_loss/(amount/batch_size)}")
  losses.append(running_loss/(amount/batch_size))

  if enable_checkpoints:
    torch.save(mlp.state_dict(), f"{folder_path}/mlp/MLP_epoch_{epoch}")
    torch.save(cnn.state_dict(), f"{folder_path}/cnn/CNN_epoch_{epoch}")

    with open(folder_path + r"/epoch_losses.csv", "a", encoding="UTF8", newline="") as epoch_losses:
      writer = csv.writer(epoch_losses, delimiter=";")
      writer.writerow([epoch, running_loss/(amount/batch_size)])

print("Finished training!")
print(losses)