In [1]:
import math
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import scipy.signal as signal
import skops.io as sio
import torch
from copy import deepcopy
from sklearn.preprocessing import PowerTransformer, QuantileTransformer
from torch import nn
from torch.utils.data import TensorDataset, DataLoader

In [2]:
RND_SEED: int = 12345
np.random.seed(RND_SEED)
pd.core.common.random_state(RND_SEED)
torch.set_default_device("cuda")
torch.manual_seed(RND_SEED)

# Resolution for graph images
WIDTH: int = 1366
HEIGHT: int = 768

In [3]:
df = pd.read_csv("./../../data/Prepared-2023.csv", encoding="utf-8")
df.loc[:, "danceability_%":"speechiness_%"] += 1
df = df[(df["released_year"] >= 2000) & (df["liveness_%"] <= 65) & (df["artist_count"] <= 4) & \
    (df["instrumentalness_%"] <= 40) & (df["speechiness_%"] <= 30)]

In [4]:
scale_list: list[str] = [
    "artist_count", "released_year", "released_month", "released_day", "bpm", "danceability_%", 
    "valence_%", "energy_%", "acousticness_%", "instrumentalness_%", "liveness_%", "speechiness_%"
]
# for scale_name in scale_list:
#     scaler = QuantileTransformer(output_distribution="normal", random_state=RND_SEED, n_quantiles=700)
#     scaler.fit(df[scale_name].to_numpy(dtype=np.float32)[:, np.newaxis])
#     sio.dump(scaler, f"./../../scalers/2023/{scale_name:s}.skops")
# del scale_name
scalers: dict[str, QuantileTransformer] = {}
for scale_name in scale_list:
    scaler: QuantileTransformer = sio.load(f"./../../scalers/2023/{scale_name:s}.skops")
    scalers[scale_name] = scaler
    del scaler
del scale_name

In [5]:
for scale_name in scale_list:
    df[scale_name] = scalers[scale_name].transform(df[scale_name].to_numpy(dtype=np.float32)[:, np.newaxis])[:, 0]
del scale_name

In [6]:
POP: int = 600

features: list[str] = list(filter(lambda x: x != "streams", df.columns))
X_1 = df.loc[df["streams"] >= POP, features].to_numpy(dtype=np.float32)
X_0 = df.loc[df["streams"] < POP, features].to_numpy(dtype=np.float32)

In [7]:
def np_to_tensor(x: np.ndarray) -> torch.Tensor:
    return torch.tensor(x, requires_grad=False).to("cuda")

def split_data_np(x: np.ndarray, test: int) -> tuple[np.ndarray, ...]:
    n_sample: int = x.shape[0]
    n_test: int = math.floor(n_sample * (test / 100))
    n_train: int = n_sample - n_test
    return x[:n_train], x[n_train:]

def split_x_y(x_0: np.ndarray, x_1: np.ndarray, test: int) -> tuple[torch.Tensor, ...]:
    x_0_tr, x_0_te = split_data_np(x_0, test)
    x_1_tr, x_1_te = split_data_np(x_1, test)
    x_train = np.concat([x_0_tr, x_1_tr], axis=0)
    y_train = np.concat([
        np.zeros((x_0_tr.shape[0],), dtype=np.float32),
        np.ones((x_1_tr.shape[0],), dtype=np.float32)
    ],axis=0)[:, np.newaxis]
    x_test = np.concat([x_0_te, x_1_te], axis=0)
    y_test = np.concat([
        np.zeros((x_0_te.shape[0],), dtype=np.float32),
        np.ones((x_1_te.shape[0],), dtype=np.float32)
    ],axis=0)[:, np.newaxis]
    rnd_train = np.random.permutation(x_train.shape[0])
    x_train, y_train = x_train[rnd_train], y_train[rnd_train]
    rnd_test = np.random.permutation(x_test.shape[0])
    x_test, y_test = x_test[rnd_test], y_test[rnd_test]
    return np_to_tensor(x_train), np_to_tensor(x_test), np_to_tensor(y_train), np_to_tensor(y_test)

In [8]:
x_train, x_test, y_train, y_test = split_x_y(X_0, X_1, 20)

In [9]:
BATCH_SZ: int = 12
EPOCHS: int = 300

train_loader = DataLoader(
    TensorDataset(x_train, y_train),
    batch_size=BATCH_SZ,
    shuffle=True,
    generator=torch.Generator(device="cuda")
)
test_loader = DataLoader(
    TensorDataset(x_test, y_test),
    batch_size=BATCH_SZ * 2,
    shuffle=True,
    generator=torch.Generator(device="cuda")
)

In [10]:
def init_weights(m):
    if isinstance(m, nn.Linear):
        torch.nn.init.xavier_uniform_(m.weight)
        m.bias.data.fill_(0.01)

class Basic_ANN(nn.Module):
    def __init__(self) -> None:
        super().__init__()
        self.mish_ann = nn.Sequential(
            nn.Linear(273, 128, device="cuda"),
            nn.Mish(),
            nn.Dropout(0.3),
            nn.Linear(128, 64, device="cuda"),
            nn.Mish(),
            nn.Dropout(0.2),
            nn.Linear(64, 16, device="cuda"),
            nn.Mish(),
            nn.Dropout(0.1),
            nn.Linear(16, 1, device="cuda"),
            nn.Sigmoid(),
        )
        self.mish_ann.apply(init_weights)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.mish_ann(x)

In [11]:
def train_loop(loader: DataLoader, model, loss_fn, optimizer) -> None:
    full_data: int = len(loader.dataset)
    model.train()
    loss_sum: float = 0.0
    for batch, (x, y) in enumerate(loader):
        optimizer.zero_grad()
        pred = model(x)
        cross_loss = loss_fn(pred, y)
        l2_regularization = 0.015 * torch.norm(torch.cat([x.view(-1) for x in model.parameters()]), 2)
        loss = cross_loss + l2_regularization
        loss.backward()
        optimizer.step()
    
        loss_sum += loss.item()
        if (batch + 1) % 10 == 0:
            current = batch * BATCH_SZ + len(x)
            print(f"loss: {loss_sum / 10:>7f}  [{current:>5d}/{full_data:>5d}]")
            loss_sum = 0.0

def test_loop(loader: DataLoader, model, loss_fn) -> None:
    model.eval()
    num_batches = len(loader)
    test_loss: float = 0.0
    test_correct: int = 0
    n_samples: int = 0

    with torch.no_grad():
        for x, y in loader:
            pred = model(x)
            l2_regularization = 0.015 * torch.norm(torch.cat([x.view(-1) for x in model.parameters()]), 2)
            test_loss += (loss_fn(pred, y) + l2_regularization).item()
            pred_cls = (pred >= 0.5)
            test_correct += (pred_cls == y).sum()
            n_samples += y.shape[0]

    test_loss /= num_batches
    print(f"Test Error: \n Avg loss: {test_loss:>8f} Accuracy: {test_correct / n_samples:>8f}\n")

In [12]:
model = Basic_ANN().to("cuda")
loss_fn = nn.BCELoss().to("cuda")
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4, weight_decay=0.025)
scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.985)

for epoch in range(EPOCHS):
    print(f"Epoch {epoch+1}\n-------------------------------")
    train_loop(train_loader, model, loss_fn, optimizer)
    scheduler.step()
    if (epoch + 1) % 5 == 0:
        test_loop(test_loader, model, loss_fn)
del epoch
print("Training Finished\nTest Result:")
test_loop(test_loader, model, loss_fn)

Epoch 1
-------------------------------
loss: 0.962414  [  120/  571]
loss: 0.886769  [  240/  571]
loss: 0.891614  [  360/  571]
loss: 0.884862  [  480/  571]
Epoch 2
-------------------------------
loss: 0.879697  [  120/  571]
loss: 0.919138  [  240/  571]
loss: 0.829851  [  360/  571]
loss: 0.841512  [  480/  571]
Epoch 3
-------------------------------
loss: 0.851992  [  120/  571]
loss: 0.861932  [  240/  571]
loss: 0.867177  [  360/  571]
loss: 0.834961  [  480/  571]
Epoch 4
-------------------------------
loss: 0.841480  [  120/  571]
loss: 0.828753  [  240/  571]
loss: 0.854864  [  360/  571]
loss: 0.828871  [  480/  571]
Epoch 5
-------------------------------
loss: 0.783658  [  120/  571]
loss: 0.877021  [  240/  571]
loss: 0.744530  [  360/  571]
loss: 0.836671  [  480/  571]
Test Error: 
 Avg loss: 0.807122 Accuracy: 0.746479

Epoch 6
-------------------------------
loss: 0.789720  [  120/  571]
loss: 0.835195  [  240/  571]
loss: 0.824831  [  360/  571]
loss: 0.787084  [

KeyboardInterrupt: 

In [13]:
with torch.no_grad():
    pred = (model(torch.tensor(np.concat([X_0, X_1], axis=0), device="cuda")) >= 0.5).to("cpu")
    print(pred.sum())

tensor(209)
