In [1]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [2]:
gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
  print('Not connected to a GPU')
else:
  print(gpu_info)

/bin/bash: nvidia-smi: command not found


In [None]:
!pip uninstall torch
!pip install torch==1.13.1

In [4]:
import os
import csv
import json
import multiprocessing

import pandas as pd
import numpy as np
import librosa
import soundfile as sf
import torch
from torch.utils.data import Dataset, random_split, DataLoader
import torch.nn as nn
import torch.optim as optim
from torch.autograd import Variable 

import matplotlib.pyplot as plt

### создаем датасет: music folder - музфкальные треки длиной в 1 секунду

### annotation.csv - содержит id треков, arousal и valence

In [None]:
with open('/content/gdrive/MyDrive/PMEmo/annotation.csv', 'w') as file:
          writer = csv.writer(file)
          writer.writerows([['song_id', 'arousal', 'valence']])

def save_csv(music, music_name, sr, arousal_sec, valence_sec, counter):
  music = music[counter]
  sf.write(f'/content/gdrive/MyDrive/PMEmo/music/{music_name}_{counter}.wav', music, sr)
  with open('/content/gdrive/MyDrive/PMEmo/annotation.csv', 'a') as file:
    writer = csv.writer(file)
    writer.writerows([[f'{music_name}_{counter}', arousal_sec, valence_sec]])

def make_dataset(song_dir, annotaion_dir):
  df = pd.read_csv(annotaion_dir)
  for elem in sorted(os.listdir(song_dir), key=lambda x: int(x[:x.index('.')])):
    music, sr = librosa.load(f'{song_dir}/{elem}', mono=True, sr=None)
    start_time = librosa.time_to_samples(15, sr=sr)
    music = music[start_time:]
    music_length = music.shape[0] // sr
    if music_length == 0:
      continue
    frame_duration = 1
    frame_length = int(frame_duration * sr)
    music = librosa.util.frame(music, frame_length=frame_length, hop_length=frame_length, axis=0)
    music_name = int(elem.split('.')[0])

    arousal = list(df[df['musicId'] == music_name]['Arousal(mean)'].values)
    valence = list(df[df['musicId'] == music_name]['Valence(mean)'].values)

    for counter, index in enumerate(range(0, len(arousal), 2)):
      if (counter == len(music)) or (index+1 == len(arousal)):
        continue
      arousal_sec = (arousal[index] + arousal[index+1])/2
      valence_sec = (valence[index] + valence[index+1])/2
      save_csv(music, music_name, sr, arousal_sec, valence_sec, counter)


In [3]:
len(os.listdir('/content/gdrive/MyDrive/PMEmo/music'))

18212

In [5]:
len(os.listdir('/content/gdrive/MyDrive/PMEmo/lstm/mfcc'))

18212

### Создаем dataframe для обучения lstm: извлекаем mfcc, song_id, arousal, valence

In [24]:
def make_mfcc_dataset(music_dir):
    for song in sorted(os.listdir(music_dir)):
        music, sr = librosa.load(os.path.join(music_dir, song), mono=True, sr=None)
        features = librosa.feature.mfcc(y=music, sr=sr, n_fft=2048, n_mfcc=30,
                                hop_length=512)
        music_id = song.split('.')[0]
        np.save(f'/content/gdrive/MyDrive/PMEmo/lstm/mfcc/{music_id}.npy', features)
        

In [25]:
music_dir = '/content/gdrive/MyDrive/PMEmo/music'
make_mfcc_dataset(music_dir)

In [6]:
class MusicDataset(Dataset):
  def __init__(self):
    self.mfcc_dir = '/content/gdrive/MyDrive/PMEmo/lstm/mfcc'
    self.music_name = sorted(os.listdir(self.mfcc_dir))
    self.annot_df = pd.read_csv('/content/gdrive/MyDrive/PMEmo/annotation.csv')
  
  def __len__(self):
    return len(self.music_name)

  def __getitem__(self, idx):
    music = self.music_name[idx]
    mfcc = torch.tensor(np.load(os.path.join(self.mfcc_dir, music)).T, dtype=torch.float)
    arousal = torch.tensor(self.annot_df[self.annot_df['song_id'] == music.split('.')[0]]['arousal'].values[0], dtype=torch.float)
    valence = torch.tensor(self.annot_df[self.annot_df['song_id'] == music.split('.')[0]]['valence'].values[0], dtype=torch.float)
    return music.split('.')[0], mfcc, arousal, valence


In [7]:
dataset = MusicDataset()

In [8]:
train_size = int(len(dataset) * 0.8)
valid_size = len(dataset) - train_size
print(train_size)
print(valid_size)

14569
3643


In [9]:
train_dataset, valid_dataset = random_split(dataset, [train_size, valid_size])
train_data = torch.utils.data.DataLoader(train_dataset, batch_size=128, shuffle=True)
valid_data = torch.utils.data.DataLoader(valid_dataset, batch_size=128, shuffle=True)

In [10]:
music_name, music, arousal, valence = next(iter(train_data))
music.shape

torch.Size([128, 87, 20])

In [11]:
arousal.shape

torch.Size([128])

### Модель

In [12]:
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, seq_length, device):
        super().__init__()
        self.device = device
        self.seq_length = seq_length
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(self.input_size, self.hidden_size, num_layers=self.num_layers, batch_first=True)
        self.batch_norm1 = nn.BatchNorm1d(32)
        self.fc1 = nn.Sequential(nn.Linear(self.hidden_size, 32),
                                 self.batch_norm1,
                                 nn.ReLU())
        self.fc2 = nn.Sequential(nn.Linear(32, 1))
    
    def forward(self, x):
        h_0 = Variable(torch.zeros(self.num_layers, x.size(0), self.hidden_size))
        c_0 = Variable(torch.zeros(self.num_layers, x.size(0), self.hidden_size))
        output, (hidden_layer, _) = self.lstm(x.to(self.device), (h_0.to(self.device), c_0.to(self.device)))
        out_fc1 = self.fc1(hidden_layer[-1])
        out = self.fc2(out_fc1)
        return out



In [13]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(device)

cuda


In [14]:
model = LSTMModel(20, 64, 1, 87, device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [15]:
modle = model.to(device)
model(music.to(device)).shape

torch.Size([128, 1])

### Обучение

In [16]:
with open('/content/gdrive/MyDrive/PMEmo/lstm/history.csv', 'w') as file:
          writer = csv.writer(file)
          writer.writerows([['epoch', 'train_loss', 'valid_loss']])

In [17]:
def train_model(model, n_epochs, optimizer, criterion, target):
    model.to(device)
    
    history = {
    'train_losses': [],
    'valid_losses': []
    }
    
    for epoch in range(n_epochs):
        
        train_losses_iter = []
        model.train()
        j=0
        for _, music, arousal, valence in train_data:
            if j % 10 == 0:
              print(f'{j} итерация в train')
            j+=1
            music, arousal, valence = music.to(device), arousal.to(device), valence.to(device)
            out = model(music)
            if target == 'arousal':
              loss = torch.sqrt(criterion(out.float().squeeze(), arousal.float()))
            elif target == 'valence':
              loss = torch.sqrt(criterion(out.float().squeeze(), valence.float()))
            train_losses_iter.append(loss.item())
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        
        history['train_losses'].append(np.mean(train_losses_iter))

        valid_losses_iter = []
        model.eval()

        i=0
        for _, music, arousal, valence in valid_data:
          i+= 1
          if i % 10 == 0:
            print(f'{i} итерация в valid')
    
          music, arousal, valence = music.to(device), arousal.to(device), valence.to(device)
          out = model(music)
          if target == 'arousal':
            loss = torch.sqrt(criterion(out.float().squeeze(), arousal.float()))
          elif target == 'valence':
            loss = torch.sqrt(criterion(out.float().squeeze(), valence.float()))
          valid_losses_iter.append(loss.item())

        history['valid_losses'].append(np.mean(valid_losses_iter))

        with open('/content/gdrive/MyDrive/PMEmo/lstm/history.csv', 'a') as file:
          writer = csv.writer(file)
          writer.writerows([[epoch, round(history["train_losses"][-1], 4), round(history["valid_losses"][-1], 4)]])
  
        torch.save(model.state_dict(), '/content/gdrive/MyDrive/PMEmo/lstm/weights/each_epochs.pt')
        if epoch == 100:
          torch.save(model.state_dict(), '/content/gdrive/MyDrive/PMEmo/lstm/weights/100_epochs.pt')
        if epoch == 199:
          torch.save(model.state_dict(), '/content/gdrive/MyDrive/PMEmo/lstm/weights/200_epochs.pt')
        print(f'train: accuracy {history["train_losses"][-1]:.4f}\n'
        f'valid:  accuracy {history["valid_losses"][-1]:.4f}')
        print(f'{"-"*35}')
        print() 
    return history      

In [None]:
hystory = train_model(model, 200, optimizer, criterion, 'arousal')