In [None]:
# installments
!pip install mamba_ssm

In [None]:
# imports
import torch
import torch.nn as nn
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression, Ridge, LinearRegression
from sklearn.metrics import mean_squared_error, accuracy_score
from sklearn.preprocessing import StandardScaler
from mamba_ssm import Mamba
from torch.utils.data import TensorDataset, DataLoader

In [None]:
# load the dataset
dataset_path = '/kaggle/input/spotifyaudiofeatures/SpotifyAudioFeaturesNov2018.csv'

feature_names = ['acousticness', 'danceability', 'duration_ms', 'energy', 'instrumentalness',
                 'key', 'liveness', 'loudness', 'mode', 'speechiness', 'tempo',
                 'time_signature', 'valence']

songs = pd.read_csv(dataset_path)

# load features and target
x = songs[feature_names].values
y = songs['popularity'].values

x_train, x_temp, y_train, y_temp = train_test_split(x, y, test_size=0.3)
x_val, x_test, y_val, y_test = train_test_split(x_temp, y_temp, test_size=0.5)

x_scaler = StandardScaler()
x_scaler.fit(x_train)
x_train = x_scaler.transform(x_train)
x_test = x_scaler.transform(x_test)

# create TensorDataset from numpy arrays`
songs_tensor_train_ds = TensorDataset(torch.tensor(x_train, dtype=torch.float).unsqueeze(1), torch.tensor(y_train, dtype=torch.float))
songs_tensor_val_ds = TensorDataset(torch.tensor(x_val, dtype=torch.float).unsqueeze(1), torch.tensor(y_val, dtype=torch.float))

In [None]:
# model, hyoer-paramerters
class MambaModel(nn.Module):
    def __init__(self, input_dim, output_dim, d_state = 16, d_conv = 4, expand = 2):
        super(MambaModel, self).__init__()
        self.mamba = Mamba(
        d_model=input_dim,
        d_state=d_state,
        d_conv=d_conv,
        expand=expand
        )
        self.output_layer = nn.Linear(input_dim, output_dim)

    def forward(self, x):
        return self.output_layer(self.mamba(x))

In [None]:
# define hyper-parmeters and create our model
num_features = x_train.shape[1]
output_dim = 1
batch_size = 128
learning_rate = 0.00001
num_epochs = 100
#device
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")  
# loss criterion
criterion = nn.MSELoss()
# model
model = MambaModel(input_dim = num_features, output_dim = output_dim).to(device)
print(model)
# optimizer
optimizer = torch.optim.Adam(model.parameters(), lr = learning_rate)

In [None]:
tensor_train_dataloader = DataLoader(songs_tensor_train_ds, batch_size=batch_size, shuffle=True)
losses = []

for epoch in range(num_epochs):
    model.train()
    epoch_losses = []
        
    for features, target in tensor_train_dataloader:
        # send data to device
        features = features.to(device)
        target = target.to(device)
        # forward pass
        output = model(features)
        # loss
        loss = criterion(output.view(-1), target)
        # backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        epoch_losses.append(loss.item())
    if epoch%50 == 0:
        print(f"epoch: {epoch} loss: {np.mean(epoch_losses)}")
    losses.append(np.mean(epoch_losses))

fig = plt.figure(figsize=(5, 5))
plt.plot(losses)
plt.grid(True)
plt.title("Loss vs. Epochs")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.show()

In [None]:
# testing the model
model.eval()
with torch.no_grad():
    test_outputs = model(torch.tensor(x_test, dtype=torch.float, device=device).unsqueeze(1))
    test_error = criterion(test_outputs.view(-1), torch.tensor(y_test, dtype=torch.float, device=device))
    print(f"Train error: {loss.item()}")
    print(f'Test error: {test_error.item()}')

In [None]:
linReg = LinearRegression()
y_pred = linReg.fit(x_train, y_train).predict(x_test)
mse = mean_squared_error(y_test, y_pred)
print(f'Test error: ', mse)
regs_model = Ridge(alpha=1.0)
y_pred2 = regs_model.fit(x_train, y_train).predict(x_test)
mse = mean_squared_error(y_test, y_pred2)
print(f'Test error: ', mse)

In [None]:
import optuna

# Define the model
def define_model(trial):
    d_state = trial.suggest_int("d_state", 12, 20)
    d_conv = trial.suggest_int("d_conv", 2, 6)
    expand = trial.suggest_int("expand", 1, 4)
    model = MambaModel(
        d_state=d_state,
        d_conv=d_conv,
        expand=expand,
        input_dim = num_features,
        output_dim = 1
    )
    return model

# Define the objective of the trial
def objective(trial):
    # Generate the model.
    model = define_model(trial).to(device)
    # Generate the optimizers.
    lr = trial.suggest_float("lr", 1e-5, 1e-2, log=True)
    batch_size = trial.suggest_categorical("batch_size", [16, 32, 64, 128])
    optimizer_name = trial.suggest_categorical("optimizer", ["Adam", "RMSprop", "SGD"])
    optimizer = getattr(torch.optim, optimizer_name)(model.parameters(), lr=lr)

    criterion = nn.MSELoss()

    # Get the training and the validation data
    train_loader = DataLoader(songs_tensor_train_ds, batch_size=batch_size, shuffle=True)
    valid_loader = DataLoader(songs_tensor_val_ds, batch_size=batch_size, shuffle=False)
    
    # Training of the model.
    for epoch in range(num_epochs):
        model.train()
        for features, target in train_loader:
            features = features.to(device)
            target = target.to(device)
            output = model(features)
            loss = criterion(output.view(-1), target)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
    # Validation of the model
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for features, target in valid_loader:
            features = features.to(device)
            target = target.to(device)
            output = model(features)
            loss = criterion(output.view(-1), target)
            total_loss += loss.item()

    avg_loss = total_loss / len(valid_loader)
    # report back to Optuna how far it is (epoch-wise) into the trial and how well it is doing (accuracy)
    trial.report(avg_loss, epoch)

    # then, Optuna can decide if the trial should be pruned
    # Handle pruning based on the intermediate value
    if trial.should_prune():
        raise optuna.exceptions.TrialPruned()
        
    return avg_loss

# Run the experiment
study = optuna.create_study(study_name="mambaSpotify-fc", direction="minimize", sampler=optuna.samplers.TPESampler())
study.optimize(objective, n_trials=100, timeout=600)

pruned_trials = [t for t in study.trials if t.state == optuna.trial.TrialState.PRUNED]
complete_trials = [t for t in study.trials if t.state == optuna.trial.TrialState.COMPLETE]

print("Study statistics: ")
print(" Number of finished trials: ", len(study.trials))
print(" Number of pruned trials: ", len(pruned_trials))
print(" Number of complete trials: ", len(complete_trials))
                   
print("Best trial:")
trial = study.best_trial
                   
print(" Value: ", trial.value)
                   
print(" Params: ")
for key, value in trial.params.items():
    print(" {}: {}".format(key, value))