- CosineEmbeddingLoss Multimodal을 활용하여 장르, 멜로디의 유사성을 함께 반영할 수 있는 노래의 임베딩을 구함

- Word2Vec Embedding과 Mel Embedding을 동시에 표현할 수 있는 노래의 임베딩을 구하는 것이 본 모델의 목표

- 모델의 구조는 다음과 같음
    - Word2Vec Embedding → layer → output 1
    - Mel Embedding → layer → output 2
    - concat(output 1 + output 2) → layer → Output Embedding
    - loss = CosineEmbeddingLoss(Word2Vec Embedding, Output Embedding) + CosineEmbeddingLoss(Mel Embedding, Output Embedding)
    - 다음과 같이 CosineEmbeddingLoss를 활용하여 Output Embedding이 Word2Vec Embedding과 Mel Embedding의 동시에 표현할 수 있도록 모델을 학습함


- Output Embedding의 Loss와 시각화 결과 Word2Vec과 Mel의 특성을 같이 반영한다는 것을 확인함

In [1]:
import pandas as pd
import numpy as np
from tqdm.notebook import tqdm
import warnings

import torch
import torch.nn as nn
import torch.nn.functional as F

warnings.filterwarnings(action='ignore')

data_dir = '/content/drive/MyDrive/제 13회 투빅스 컨퍼런스 음악추천/Data/'
model_dir = '/content/drive/MyDrive/제 13회 투빅스 컨퍼런스 음악추천/Model/'

# 데이터 확인

In [2]:
song_meta_df = pd.read_json(data_dir + 'song_meta_data_v3.json')
song_meta_df = song_meta_df.sort_values('id')
song_meta_df = song_meta_df.reset_index(drop = True)
song_meta_df['song_embedding_idx'] = song_meta_df.index

In [3]:
mel_embeding = np.load(data_dir + 'tanh_mel_embeding_loss63.npy')
word2vec_embedding = np.load(data_dir + 'word2vec_embedding.npy')

In [4]:
song_id = song_meta_df['id'].tolist()
song_embedding_idx = song_meta_df['song_embedding_idx'].tolist()

song_id2song_embedding_idx = {}
song_embedding_idx2song_id = {}
for id, idx in zip(song_id, song_embedding_idx):
    song_id2song_embedding_idx[id] = idx
    song_embedding_idx2song_id[idx] = id

# 학습 설정

In [5]:
batch_size = 512
lr = 0.005
epochs = 100

In [6]:
# 환경설정
if torch.cuda.is_available():
  DEVICE = torch.device('cuda')
else:
  DEVICE = torch.device('cpu')
print(DEVICE)

cpu


In [7]:
from torch.utils.data import Dataset
from torch.utils.data import DataLoader

class MakeDataSet(Dataset):
    def __init__(self, input_li, output_li = None):
        super(MakeDataSet, self).__init__()
        self.input_li = input_li
        self.output_li = output_li
    
    def __len__(self):
        return len(self.input_li)
    
    def __getitem__(self, idx):
        if self.output_li is None:
            input = self.input_li[idx]
            return input
        
        else:
            input = self.input_li[idx]
            output = self.output_li[idx]
            return input, output

In [8]:
class CosineEmbeddingLossMultimodal(nn.Module):
    def __init__(self, mel_input_size, word2vec_input_size):
        super(CosineEmbeddingLossMultimodal, self).__init__()
        self.mel_layer = nn.Sequential(
            nn.Linear(mel_input_size, mel_input_size),
            nn.BatchNorm1d(mel_input_size),
            nn.Tanh()
            )
        
        self.word2vec_layer = nn.Sequential(
            nn.Linear(word2vec_input_size, word2vec_input_size),
            nn.BatchNorm1d(word2vec_input_size),
            nn.Tanh()
            )
    
        self.Layer = nn.Sequential(
            nn.Linear(word2vec_input_size + mel_input_size , 256),
            nn.BatchNorm1d(256),
            nn.Tanh(),
            nn.Linear(256, 128),
            nn.Tanh(),
        )

        self._init_weight_()
    
    def _init_weight_(self):
        for m in self.Layer:
            if isinstance(m, nn.Linear):
                nn.init.kaiming_uniform_(m.weight)

        for m in self.mel_layer:
            if isinstance(m, nn.Linear):
                nn.init.xavier_uniform_(m.weight)
                
        for m in self.word2vec_layer:
            if isinstance(m, nn.Linear):
                nn.init.xavier_uniform_(m.weight)

    def forward(self, mel_embeding, word2vec_embedding):
        mel_embeding_output = self.mel_layer(mel_embeding)

        word2vec_embedding_output = self.word2vec_layer(word2vec_embedding)

        embedding = torch.cat((mel_embeding_output, word2vec_embedding_output), -1)

        embedding = self.Layer(embedding)

        return embedding

In [9]:
def train(model, train_loader, mel_embeding, word2vec_embedding):
    model.train()
    train_loss = 0

    for idx, (input) in enumerate(train_loader):
        mel = mel_embeding[input].to(DEVICE)
        word2vec = word2vec_embedding[input].to(DEVICE)
        input = input.to(DEVICE)
        tar = torch.LongTensor([1 for i in range(len(input))]).to(DEVICE)
        
        optimizer.zero_grad()
        
        pred = model(mel_embeding = mel, word2vec_embedding = word2vec)
        
        loss1 = criterion(pred, mel, tar)
        loss2 = criterion(pred, word2vec, tar)

        loss = loss1 + loss2

        loss.backward()

        optimizer.step()

        train_loss += loss.item()

        if (idx + 1) % 100 == 0:
            print(f'idx: {idx + 1} / {len(train_loader)}, loss: {train_loss / (idx + 1)}')

    
    train_loss /= len(train_loader)
    
    return train_loss

# 학습

In [10]:
input_li = song_meta_df['song_embedding_idx'].tolist()
input_li = torch.LongTensor(input_li)

mel_embeding = torch.FloatTensor(mel_embeding)
word2vec_embedding = torch.FloatTensor(word2vec_embedding)

num_class = len(song_meta_df)
mel_input_size = mel_embeding.shape[1]
word2vec_input_size = word2vec_embedding.shape[1]

In [11]:
train_dataset = MakeDataSet(input_li = input_li)
train_loader = DataLoader(train_dataset, batch_size = batch_size, shuffle = True, drop_last = False)

In [12]:
model = CosineEmbeddingLossMultimodal(mel_input_size = mel_input_size, word2vec_input_size = word2vec_input_size).to(DEVICE)
optimizer = torch.optim.Adam(model.parameters(), lr = lr)
criterion = torch.nn.CosineEmbeddingLoss()

print(model)

CosineEmbeddingLossMultimodal(
  (mel_layer): Sequential(
    (0): Linear(in_features=128, out_features=128, bias=True)
    (1): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): Tanh()
  )
  (word2vec_layer): Sequential(
    (0): Linear(in_features=128, out_features=128, bias=True)
    (1): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): Tanh()
  )
  (Layer): Sequential(
    (0): Linear(in_features=256, out_features=256, bias=True)
    (1): BatchNorm1d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): Tanh()
    (3): Linear(in_features=256, out_features=128, bias=True)
    (4): Tanh()
  )
)


In [None]:
import time

min_loss = 987654321

for epoch in range(1, epochs + 1):
    start = time.time()
    train_loss = train(model = model, train_loader = train_loader, mel_embeding = mel_embeding, word2vec_embedding = word2vec_embedding)
    end = time.time()
    print(f"[EPOCH: {epoch}], Train Loss: {train_loss}, 학습 시간: {end - start}")
    if train_loss < min_loss:
        min_loss = train_loss
        torch.save(model.state_dict(), model_dir + f'SongToPlaylistModel_get_cos.pt')
        print('모델 저장')

# 임베딩 저장

In [None]:
train_dataset = MakeDataSet(input_li = input_li)
train_loader = DataLoader(train_dataset, batch_size = batch_size, shuffle = False, drop_last = False)
embedding_li = []
model.eval()
with torch.no_grad():
    for x in train_loader:
        mel = mel_embeding[x].to(DEVICE)
        word2vec = word2vec_embedding[x].to(DEVICE)

        embedding = model(mel_embeding = mel, word2vec_embedding = word2vec)
        embedding_li.append(embedding.cpu().numpy())

embedding = np.concatenate(embedding_li)

In [None]:
embedding.shape

In [None]:
criterion(embedding, mel_embeding)

In [None]:
criterion(embedding, word2vec_embedding)

In [None]:
np.save(data_dir + 'get_cos_embedding_63.npy', embedding)