In [1]:
import numpy as np
import pickle
import time
import collections
import torch
import torch.nn as nn
import torchvision
from torch.cuda.amp.autocast_mode import autocast
from torch.cuda.amp.grad_scaler import GradScaler
from torch.utils.tensorboard.writer import SummaryWriter
from torch.utils.data import DataLoader
from PIL import Image


In [None]:
proj_name = 'Music_Classifier'

with open('data/features2', 'rb') as f:
    sfeats = pickle.load(f)

with open('data/2rand140', 'rb') as f:
    audio_data = pickle.load(f)

with open('data/mcm140', 'rb') as f:
    mmtrx = pickle.load(f).astype(np.float16)

np.random.shuffle(audio_data)
counts = collections.Counter([x[1][0] for x in audio_data])
sfeats = {x[1]: x[0] for x in sfeats}
#audio_data = [(((torch.tensor(x[0], dtype=torch.float32) - mmtrx).unsqueeze(0), torch.tensor(sfeats[x[1][0]], dtype=torch.float32).unsqueeze(0)), x[1][1]) for x in audio_data]
blur = torchvision.transforms.GaussianBlur(3, sigma=(0.1, 2.0))
erase = torchvision.transforms.RandomErasing(p=0.5, scale=(0.02, 0.1), ratio=(0.8, 1.2), value=0, inplace=False)
crop = torchvision.transforms.RandomResizedCrop((140, 128), scale=(0.84, 1.0))
affine = torchvision.transforms.RandomAffine(0, translate=(0.2, 0), scale=(0.86, 1.14), shear=None, fill=0)
invert = torchvision.transforms.RandomInvert()
transforms = [blur, erase, crop, affine, invert]

In [2]:

def group_predict(adata: list[list[np.ndarray]], model: nn.Module) -> list[tuple[int, int]]:
    predictions = []
    model.to('cuda')
    model.eval()
    with torch.no_grad():
        for song in adata:
            frames = [(torch.tensor(frame, dtype=torch.float32) - mmtrx).unsqueeze(0).unsqueeze(0) for frame in song[0]]
            results = []
            for frame in frames:
                frame.to('cuda')
                results.append(torch.nn.functional.softmax(*model(frame), dim=0).tolist())
            #results = [w for w in results if np.max(w) > np.max(results) * 0.333]
            scores = [round(sum(w), 4) for w in  np.array(results).T]
            predictions.append((song[1], scores.index(np.max(scores))))
    model.to('cpu')
    return predictions

def aug_imgs(img_data: list[tuple[torch.Tensor, int]]) -> list[tuple[torch.Tensor, int]]:
    aug_count = int(len(img_data) * 0.3678)
    change = img_data[:aug_count]
    final = img_data[aug_count:]
    change = [((x[0] + 55).to(torch.uint8), x[1], x[1]) for x in change]
    new_pack = []
    for item in change:
        label = item[1], item[2]
        img = np.random.choice(transforms)(item[0])
        if img.size() != (140, 128):
            img = torchvision.transforms.functional.resize(img, (140, 128))
        new_pack.append((img, label))
    new_pack = [(x[0].to(torch.float32) - 55, x[1]) for x in new_pack]
    final.extend(new_pack)
    return final

class Mini3(nn.Module):

    def __init__(self, ftrs):
        super(Mini3, self).__init__()
        self.dcnv = nn.Conv2d(ftrs, ftrs // 2, 1)
        self.cnv = nn.Conv2d(ftrs // 2, ftrs // 2, 3, 1, 1, bias=False)
        self.ucnv = nn.Conv2d(ftrs // 2, ftrs, 1)
        self.nrm = nn.BatchNorm2d(ftrs // 2)
        self.act = nn.ELU()

    def forward(self, x):
        x = self.act(self.dcnv(x))
        x = self.act(self.nrm(self.cnv(x)))
        x = self.act(self.ucnv(x))
        return x

class ReStep(nn.Module):

    def __init__(self, ftrs, poolks, final=False):
        super(ReStep, self).__init__()
        self.cnv1 = Mini3(ftrs)
        self.cnv2 = Mini3(ftrs)
        self.cnv3 = Mini3(ftrs)
        self.dscale = nn.Conv2d(ftrs, ftrs, 1, poolks, bias=False)
        if final:
            self.uscale = nn.Conv2d(ftrs, ftrs, 1, 1)
        else:
            self.uscale = nn.Conv2d(ftrs, ftrs * 2, 1, 1)
        self.inrm = nn.BatchNorm2d(ftrs)
        self.mxpool = nn.MaxPool2d(poolks, poolks)
        self.iact = nn.ELU()

    def forward(self, x):
        res = x
        x = self.cnv1(x)
        x = self.cnv2(x)
        x = self.mxpool(x)
        x = self.iact(self.inrm(torch.add(x, self.dscale(res))))
        res = x
        x = self.cnv3(x)
        x = self.iact(self.inrm(torch.add(x, res)))
        x = self.iact(self.uscale(x))
        return x

class MusCls(nn.Module):

    def __init__(self, step_1_ftrs: int, step_2_ftrs: int, step_3_ftrs: int, step_4_ftrs: int):
        #torch.manual_seed(1024)
        torch.set_default_tensor_type(torch.cuda.FloatTensor)
        super(MusCls, self).__init__()
        self.final_ftrs = step_4_ftrs // 2
        self.ecnv1 = nn.Conv2d(1, step_1_ftrs, 3, 1, 1)
        self.ecnv2 = nn.Conv2d(step_1_ftrs, step_1_ftrs, 3, 1, 1)
        self.ecnv3 = nn.Conv2d(step_1_ftrs, step_1_ftrs, 3, 1, 1)
        self.fcnv1 = nn.Conv2d(step_4_ftrs, self.final_ftrs, 3, 1, 1, bias=False)
        self.fcnv2 = nn.Conv2d(step_4_ftrs, self.final_ftrs, 3, 1, 1, bias=False)
        self.fcnv3 = nn.Conv2d(step_4_ftrs, self.final_ftrs, 3, 1, 1, bias=False)
        self.nrm1 = nn.BatchNorm2d(self.final_ftrs)
        self.nrm2 = nn.BatchNorm2d(self.final_ftrs)
        self.nrm3 = nn.BatchNorm2d(self.final_ftrs)
        self.lstep1 = ReStep(step_1_ftrs, (1, 2))
        self.lstep2 = ReStep(step_2_ftrs, (1, 2))
        self.lstep3 = ReStep(step_3_ftrs, 2, True)
        self.lstep4 = ReStep(step_4_ftrs, 2, True)
        self.mstep1 = ReStep(step_1_ftrs, (1, 2))
        self.mstep2 = ReStep(step_2_ftrs, 2)
        self.mstep3 = ReStep(step_3_ftrs, 2, True)
        self.mstep4 = ReStep(step_4_ftrs, 2, True)
        self.hstep1 = ReStep(step_1_ftrs, 2)
        self.hstep2 = ReStep(step_2_ftrs, 2)
        self.hstep3 = ReStep(step_3_ftrs, 2, True)
        self.hstep4 = ReStep(step_4_ftrs, 2, True)
        self.flin = nn.Linear(self.final_ftrs * 9 * 3, self.final_ftrs)
        self.flin2 = nn.Linear(self.final_ftrs, 19)
        self.act = nn.ELU()
        self.flat = nn.Flatten()
        self.fmxpool = nn.AdaptiveAvgPool2d(3)
        self.drop = nn.Dropout2d(0.1)

    def forward(self, x):
        x = self.act(self.ecnv1(x))
        x = self.act(self.ecnv2(x))
        x = self.act(self.ecnv3(x))
        x = self.drop(x)
        lows = x[:,:, :16, :]
        mids = x[:,:, 14:62, :]
        highs = x[:,:, 60:, :]
        for i, step in enumerate([
                (self.lstep1, self.mstep1, self.hstep1),
                (self.lstep2, self.mstep2, self.hstep2),
                (self.lstep3, self.mstep3, self.hstep3),
                (self.lstep4, self.mstep4, self.hstep4)
            ]):
            lows = step[0](lows)
            mids = step[1](mids)
            highs = step[2](highs)
            if i >= 2:
                lows = self.drop(lows)
                mids = self.drop(mids)
                highs = self.drop(highs)
        lows = self.act(self.nrm1(self.fcnv1(lows)))
        mids = self.act(self.nrm2(self.fcnv2(mids)))
        highs = self.act(self.nrm3(self.fcnv3(highs)))
        lows = self.flat(self.fmxpool(lows))
        mids = self.flat(self.fmxpool(mids))
        highs = self.flat(self.fmxpool(highs))
        x = torch.cat((lows, mids, highs), 1)
        x = self.flin2(self.act(self.flin(x)))
        return x

class Trainer:

    def __init__(self, model: nn.Module, model_name: str,  learning_rate: float, weight_decay: float, save_threshold: float) -> None:
        self.model = model.to('cuda')
        self.save_threshold = save_threshold
        self.optimizer = torch.optim.NAdam(self.model.parameters(), lr=learning_rate, weight_decay=weight_decay)
        self.loss_function = nn.CrossEntropyLoss()
        self.scaler = GradScaler(enabled=True)
        self.model_name = model_name
        self.tblog = SummaryWriter(log_dir=f'C:/Users/BBA/Coding/tblogs/{proj_name}/{model_name}')
        self.tbatch_counter = 0
        self.vbatch_counter = 0

    def split_data(self, dataset: list[tuple[torch.Tensor, int]], ratio: float, splits: int) -> tuple[list[list[tuple[torch.Tensor, int]]], list[tuple[torch.Tensor, int]]]:
        vdata = []
        for x in counts.items():
            gcounter = 0
            i = 0
            while gcounter < int(x[1] * ratio):
                if dataset[i][1] == x[0]:
                    vdata.append(dataset.pop(i))
                    gcounter += 1
                i += 1
        tdata = [dataset[(x - 1) * int(len(dataset) / splits) : x * int(len(dataset) / splits)] for x in range(1, splits + 1)]
        return tdata, vdata

    def _train(self, dataset: list[tuple[torch.Tensor, int]]):
        total_loss = 0
        for i, x in enumerate(dataset):
            with autocast(enabled=True):
                data, labels = x[0].to('cuda'), x[1].to('cuda')
                results = self.model(data)
                loss = self.loss_function(results, labels)
            self.scaler.scale(loss).backward()
            self.scaler.step(self.optimizer)
            self.scaler.update()
            total_loss += loss.item()
            self.optimizer.zero_grad()
            self.tblog.add_scalar('Train Loss/Batch', total_loss / (i + 1), self.tbatch_counter)
            self.tbatch_counter += 1
        return total_loss / len(dataset)

    def _validate(self, dataset: list[tuple[torch.Tensor, int]]):
        total_loss = 0
        accuracy = 0
        with torch.no_grad():
            for i, x in enumerate(dataset):
                correct = 0
                data, labels = x[0].to('cuda'), x[1].to('cuda')
                results = self.model(data)
                for j, w in enumerate([torch.nn.functional.softmax(y, dim=0).argmax() for y in results]):
                    if w == labels[j]:
                        correct += 1
                accuracy += correct / self.batch_size
                total_loss += self.loss_function(results, labels).item()
                self.tblog.add_scalar('Validate Loss/Batch', total_loss / (i + 1), self.vbatch_counter)
                self.vbatch_counter += 1
        return total_loss / len(dataset), accuracy / len(dataset)

    def train_model(self, dataset: list[tuple[torch.Tensor, int]], target_epochs: int, batch_size: int, ratio: float, val: bool, splits: int, aug_lr: float, aug_epochs: list[int]):
        self.batch_size = batch_size
        samples = len(dataset)
        train_data, val_data = self.split_data(dataset, ratio, splits)
        for ep in range(target_epochs):
            for i, group in enumerate(train_data):
                self.batches_per_group = int(int(samples * (1 - ratio)) / splits / self.batch_size)
                if ep in aug_epochs:
                    for x in self.optimizer.param_groups:
                        x['lr'] = aug_lr
                    aug_data = aug_imgs(group.copy())
                    tdata = DataLoader(aug_data, batch_size=self.batch_size, shuffle=True, generator=torch.Generator(device='cuda'))
                else:
                    tdata = DataLoader(group, batch_size=self.batch_size, shuffle=True, generator=torch.Generator(device='cuda'))
                self.model.train(True)
                train_loss = self._train(tdata)
                if not val:
                    print(f'Status at batch {self.batches_per_group * (1 + i)},\tTLoss: {round(float(train_loss), 5)} TAccuracy {round(2.7182818**-float(train_loss) * 100, 5)}%')
                if val:
                    vdata = DataLoader(val_data, batch_size=self.batch_size, shuffle=True, generator=torch.Generator(device='cuda'))
                    self.model.train(False)
                    val_loss, val_acc = self._validate(vdata)
                    print(f'Status at batch {self.batches_per_group * (1 + i)},\tTLoss: {round(float(train_loss), 5)}\tTPAccuracy {round(2.7182818**-float(train_loss) * 100, 5)}%\tVLoss: {round(float(val_loss), 5)}\tVGAccuracy {round(float(val_acc) * 100, 5)}%\tVPAccuracy {round(2.7182818**-float(val_loss) * 100, 5)}%')
                    if val_loss < self.save_threshold:
                        self.save_threshold = val_loss
                        torch.save(self.model.state_dict(), f'models/{self.model_name}')
        self.model = self.model.to('cpu')
        del dataset
        self.tblog.close()
        torch.cuda.empty_cache()



In [3]:
av_model = MusCls(32, 64, 128, 128)
av_model.load_state_dict(torch.load('models/mscls_34'))

<All keys matched successfully>

In [5]:
dpack = 'btest140'
with open(f'data/{dpack}', 'rb') as f:
    tdata = pickle.load(f)

with open(f'data/mcm140', 'rb') as f:
    mmtrx = torch.tensor(pickle.load(f), dtype=torch.float32)

predictions = group_predict(tdata, av_model)
real_l = [x[1] for x in tdata]
score = 0
for i, x in enumerate(predictions):
    #print(x, real_l[i])
    if x[1] == real_l[i]:
        score += 1
print(score, '/', len(tdata))
print(100 * (score / len(tdata)), '%')

1378 / 1982
69.52573158425832 %


In [None]:
collections.Counter(real_l)

In [4]:
dpack = 'test140'

with open(f'data/{dpack}', 'rb') as f:
    tdata = pickle.load(f)

with open(f'data/mcm140', 'rb') as f:
    mmtrx = torch.tensor(pickle.load(f), dtype=torch.float32)

predictions = group_predict(tdata, av_model)
filename = '3mod3'

with open(f'predictions/{filename}.csv', 'wt') as f:
    f.writelines(['song_id,genre_id\n'])
    f.writelines([f'{x[0].lstrip("0").rstrip(".ogg")},{x[1]}\n' for x in predictions])
    f.writelines(['24013,0\n', '22612,1\n'])