In [None]:
# imports
import torch
import torch.nn.functional as F
import os
import json
import random
import kagglehub
import matplotlib.pyplot as plt 
%matplotlib inline

In [None]:
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
print("Using device:", device)

In [None]:
path = "/root/.cache/kagglehub/datasets/himanshuwagh/spotify-million/versions/1"
if not os.path.exists(path):
  path = kagglehub.dataset_download("himanshuwagh/spotify-million")

In [None]:
num_slices = 1000           # number of slices to load
num_playlists = 1000        # number of playlists for each slice to load

# load the data
playlists = list()          # contains all the playlists of the chosen subset

for i in range(num_slices):
  mille_playlist = os.listdir(os.path.join(path, "data"))[i]      # contains 
  a_slice = os.path.join(path, "data", mille_playlist)            # path di una slice
  with open(a_slice, "r") as f:
    a_slice_file = json.load(f)
    for j in range(num_playlists):
      playlists.append(a_slice_file['playlists'][j])              # adding the playlists to the list

In [None]:
playlists_chosen_features = [list() for _ in range(len(playlists))]

# we choose as features only the track_name of each track
for i in range(len(playlists)):
  for j in range(len(playlists[i]['tracks'])):
    playlists_chosen_features[i].append(playlists[i]['tracks'][j]['track_name'])

playlists = playlists_chosen_features

In [None]:
# building the vocabulary for the language model

chars = list(set(playlists[i][j] for i in range(len(playlists)) for j in range(len(playlists[i]))))     # unique songs
if '.' in chars:
    chars.remove('.') # remove the stop token if it is already present

stoi = {s:i+1 for i,s in enumerate(chars)} # mapping from character (track_names) to integer
stoi['.'] = 0                              # stop/start token
itos = {i:s for s,i in stoi.items()}       # mapping from integer to character (track_names)

vocab_size = len(itos)

In [None]:
# building the dataset for the language model

block_size = 5 # context length: how many songs to consider to predict the next one

def build_dataset(playlists):  
    """
    We are creating a character-based dataset. 
        Given a sequence of characters, we want to predict the next character in the sequence.
        i.e: 
        playlist = [let it be, hey jude] we have:
        X = [[.,.,.,.,.],                            Y = [let it be,
            [.,.,.,.,let it be],                         hey jude,
            [.,.,.,let it be, hey jude]                  .]
        
        (remember that '.' is the start/stop token and context length is 5)
    """
    X, Y = [], []
    for playlist in playlists:
        context = [0] * block_size
        for track_name in playlist + ['.']:
            ix = stoi[track_name]
            X.append(context)
            Y.append(ix)
            context = context[1:] + [ix] # crop and append

    X = torch.tensor(X)
    Y = torch.tensor(Y)
    return X, Y

random.seed(42)
random.shuffle(playlists)
n1 = int(0.8*len(playlists))
n2 = int(0.9*len(playlists))

Xtr,  Ytr  = build_dataset(playlists[:n1])     # 80%
Xdev, Ydev = build_dataset(playlists[n1:n2])   # 10%
Xte,  Yte  = build_dataset(playlists[n2:])     # 10%


# GPU
Xtr = Xtr.to(device)
Ytr = Ytr.to(device)
Xdev = Xdev.to(device)
Ydev = Ydev.to(device)
Xte = Xte.to(device)
Yte = Yte.to(device)

In [None]:
# MLP model & batch normalization

class Linear:
  
  def __init__(self, fan_in, fan_out, bias=True):
    self.weight = torch.randn((fan_in, fan_out), generator=g, device=device) / fan_in**0.5
    self.bias = torch.zeros(fan_out, device=device) if bias else None
  
  def __call__(self, x):
    self.out = x @ self.weight
    if self.bias is not None:
      self.out += self.bias
    return self.out
  
  def parameters(self):
    return [self.weight] + ([] if self.bias is None else [self.bias])


class BatchNorm1d:
  
  def __init__(self, dim, eps=1e-5, momentum=0.1):
    self.eps = eps
    self.momentum = momentum
    self.training = True
    # parameters (trained with backprop)
    self.gamma = torch.ones(dim, device=device)
    self.beta = torch.zeros(dim, device=device)
    # buffers (trained with a running 'momentum update')
    self.running_mean = torch.zeros(dim, device=device)
    self.running_var = torch.ones(dim, device=device)
  
  def __call__(self, x):
    # calculate the forward pass
    if self.training:
      xmean = x.mean(0, keepdim=True) # batch mean
      xvar = x.var(0, keepdim=True) # batch variance
    else:
      xmean = self.running_mean
      xvar = self.running_var
    xhat = (x - xmean) / torch.sqrt(xvar + self.eps) # normalize to unit variance
    self.out = self.gamma * xhat + self.beta
    # update the buffers
    if self.training:
      with torch.no_grad():
        self.running_mean = (1 - self.momentum) * self.running_mean + self.momentum * xmean
        self.running_var = (1 - self.momentum) * self.running_var + self.momentum * xvar
    return self.out
  
  def parameters(self):
    return [self.gamma, self.beta]


class Tanh:
  def __call__(self, x):
    self.out = torch.tanh(x)
    return self.out
  def parameters(self):
    return []

In [None]:
# defining the model

g = torch.Generator().manual_seed(2147483647)

n_embd = 30         # the dimensionality of the character (song) embedding vectors
n_hidden = 100      # the number of neurons in the hidden layer of the MLP

C = torch.randn((vocab_size, n_embd), generator=g, device=device)

layers = [
  Linear(n_embd * block_size, n_hidden, bias=False), BatchNorm1d(n_hidden), Tanh(),
  Linear(           n_hidden, n_hidden, bias=False), BatchNorm1d(n_hidden), Tanh(),
  Linear(           n_hidden, n_hidden, bias=False), BatchNorm1d(n_hidden), Tanh(),
  Linear(           n_hidden, n_hidden, bias=False), BatchNorm1d(n_hidden), Tanh(),
  Linear(           n_hidden, n_hidden, bias=False), BatchNorm1d(n_hidden), Tanh(),
  Linear(           n_hidden, vocab_size, bias=False), BatchNorm1d(vocab_size),
]


with torch.no_grad():
  # last layer: make less confident
  layers[-1].gamma *= 0.1
  #layers[-1].weight *= 0.1
  # all other layers: apply gain
  for layer in layers[:-1]:
    if isinstance(layer, Linear):
      layer.weight *= 1.0 # 5/3

parameters = [C] + [p for layer in layers for p in layer.parameters()]
print(sum(p.nelement() for p in parameters))  # number of parameters in total
for p in parameters:
  p.requires_grad = True

In [None]:
# optimization step

max_steps = 200000
batch_size = 48
lossi = []
ud = []

for i in range(max_steps):
  
  # minibatch construct
  ix = torch.randint(0, Xtr.shape[0]-1, (batch_size,), generator=g)
  Xb, Yb = Xtr[ix], Ytr[ix] # batch X,Y
  
  # forward pass
  emb = C[Xb] # embed the characters into vectors
  x = emb.view(emb.shape[0], -1) # concatenate the vectors
  for layer in layers:
    x = layer(x)
  loss = F.cross_entropy(x, Yb) # loss function
  
  # backward pass
  for p in parameters:
    p.grad = None
  loss.backward()
  
  # update
  lr = 0.1 if i < 150000 else 0.01 # step learning rate decay
  for p in parameters:
    p.data += -lr * p.grad

  # track stats
  if i % 10000 == 0: # print every once in a while
    print(f'{i:7d}/{max_steps:7d}: {loss.item():.4f}')
  lossi.append(loss.log10().item())
  with torch.no_grad():
    ud.append([((lr*p.grad).std() / p.data.std()).log10().item() for p in parameters])

In [None]:
plt.plot(lossi)

In [None]:
# da vedere metriche
# molto banalmente per iniziare

# testing
n_oks = 0

for x,y in zip(Xte, Yte):
    context = x.tolist()
    print(context)

    # forward pass the neural net
    emb = C[torch.tensor([context])] # (1,block_size,n_embd)
    x = emb.view(emb.shape[0], -1) # concatenate the vectors
    for layer in layers:
        x = layer(x)
    logits = x
    probs = F.softmax(logits, dim=1)
    # sample from the distribution
    ix = torch.multinomial(probs, num_samples=1, generator=g).item()
    out = ix

    # metrics
    if out == y:
        print('OK')
        n_oks += 1

n_oks / Xte.shape[0]   # accuracy