<a href="https://colab.research.google.com/github/buganart/descriptor-transformer/blob/main/descriptor_model_predict.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#@markdown Before starting please save the notebook in your drive by clicking on `File -> Save a copy in drive`

In [None]:
#@markdown Check GPU, should be a Tesla V100
!nvidia-smi -L
import os
print(f"We have {os.cpu_count()} CPU cores.")

GPU 0: Tesla P100-PCIE-16GB (UUID: GPU-5b4df872-b7b4-6bd6-6086-6c3f285303ba)
We have 4 CPU cores.


In [None]:
#@markdown Mount google drive
from google.colab import drive
from google.colab import output
drive.mount('/content/drive')

from pathlib import Path
if not Path("/content/drive/My Drive/IRCMS_GAN_collaborative_database").exists():
    raise RuntimeError(
        "Shortcut to our shared drive folder doesn't exits.\n\n"
        "\t1. Go to the google drive web UI\n"
        "\t2. Right click shared folder IRCMS_GAN_collaborative_database and click \"Add shortcut to Drive\""
    )

def clear_on_success(msg="Ok!"):
    if _exit_code == 0:
        output.clear()
        print(msg)

Mounted at /content/drive


In [None]:
#@markdown Install wandb and log in
%pip install wandb
output.clear()
import wandb
from pathlib import Path
wandb_drive_netrc_path = Path("drive/My Drive/colab/.netrc")
wandb_local_netrc_path = Path("/root/.netrc")
if wandb_drive_netrc_path.exists():
    import shutil

    print("Wandb .netrc file found, will use that to log in.")
    shutil.copy(wandb_drive_netrc_path, wandb_local_netrc_path)
else:
    print(
        f"Wandb config not found at {wandb_drive_netrc_path}.\n"
        f"Using manual login.\n\n"
        f"To use auto login in the future, finish the manual login first and then run:\n\n"
        f"\t!mkdir -p '{wandb_drive_netrc_path.parent}'\n"
        f"\t!cp {wandb_local_netrc_path} '{wandb_drive_netrc_path}'\n\n"
        f"Then that file will be used to login next time.\n"
    )

!wandb login
output.clear()
print("ok!")

ok!


In [None]:
#@title Configuration

#@markdown Directories can be found via file explorer on the left by navigating into `drive` to the desired folders. 
#@markdown Then right-click and `Copy path`.
test_data_path = "/content/drive/My Drive/AUDIO DATABASE/MUSIC TRANSFORMER/Transformer Corpus/\u003CDirEntry \"Copy of 'Seven Words' for Violoncello, Bayan and String - VI. It is Finished!-RUzgq1MQA_g.wav\">.txt" #@param {type:"string"}

#@markdown ### Resumption of previous runs
#@markdown Optional resumption arguments below, leaving both empty will start a new run from scratch. 
#@markdown - The ID can be found on wandb. 
#@markdown - It's 8 characters long and may contain a-z letters and digits (for example `1t212ycn`).

#@markdown Resume a previous run 
resume_run_id = "4gn7g6xq" #@param {type:"string"}

#@markdown the number of predicted descriptors after the test_data
prediction_length = 10 #@param {type:"integer"}

#@markdown the path to save all generated descriptors as json
prediction_output_dir = "/content/drive/My Drive/Descriptor Model/OUTPUTS/" #@param {type:"string"}

import re
from pathlib import Path
from argparse import Namespace

def check_wandb_id(run_id):
    if run_id and not re.match(r"^[\da-z]{8}$", run_id):
        raise RuntimeError(
            "Run ID needs to be 8 characters long and contain only letters a-z and digits.\n"
            f"Got \"{run_id}\""
        )

check_wandb_id(resume_run_id)

prediction_output_dir = Path(prediction_output_dir)
prediction_output_dir.mkdir(parents=True, exist_ok=True)


colab_config = {
    "resume_run_id": resume_run_id,
    "test_data_path": test_data_path,
    "prediction_length": prediction_length,
    "prediction_output_dir": prediction_output_dir,
}

for k, v in colab_config.items():
    print(f"=> {k:20}: {v}")

config = Namespace(**colab_config)
config.seed = 1234

=> resume_run_id       : 4gn7g6xq
=> test_data_path      : /content/drive/My Drive/AUDIO DATABASE/MUSIC TRANSFORMER/Transformer Corpus/<DirEntry "Copy of 'Seven Words' for Violoncello, Bayan and String - VI. It is Finished!-RUzgq1MQA_g.wav">.txt
=> prediction_length   : 10
=> prediction_output_dir: /content/drive/My Drive/IRCMS_GAN_collaborative_database/Experiments/colab-violingan/descriptor-prediction-outputs


In [None]:
%pip install pytorch-lightning
clear_on_success()

Ok!


#model
only simple RNN is implemented.
TODO: add more complicated time-series models (such as transformer)

In [None]:
import pytorch_lightning as pl
from pytorch_lightning.callbacks.base import Callback
from pytorch_lightning.loggers import WandbLogger
import pprint
import torch
import torch.nn as nn
import numpy as np
import json

class DescriptorModel(pl.LightningModule):

    def __init__(self, config):
        super().__init__()
        self.config = config
        self.save_hyperparameters("config")
        descriptor_size = config.descriptor_size=5
        hidden_size = config.hidden_size=100
        num_layers = config.num_layers=3

        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(descriptor_size, hidden_size, num_layers=num_layers, batch_first=True)
        self.linear = nn.Linear(hidden_size, descriptor_size)
        self.loss_function = nn.MSELoss()

    def forward(self, x):
        batch_size = x.shape[0]
        h = (torch.zeros(self.num_layers, batch_size, self.hidden_size),
            torch.zeros(self.num_layers, batch_size, self.hidden_size))
        x, _ = self.lstm(x, h)
        x = self.linear(x)
        return x

    def training_step(self, batch, batch_idx):
        data, target = batch
        output = self(data)
        pred = output[:,-1,:].unsqueeze(1)

        loss = self.loss_function(pred, target)
        return loss

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=self.config.learning_rate)

    def predict(self, data, step):
        all_descriptors = data
        batch_size, window_size, des_size = data.shape
        for i in range(step):
            input = all_descriptors[:,i:,:]
            # print("input", input)
            pred = self(input)
            new_descriptor = pred[:,1,:].reshape(batch_size, 1, des_size)
            # print("new_descriptor", new_descriptor)
            all_descriptors = torch.cat((all_descriptors, new_descriptor), 1)
        return all_descriptors.detach().cpu().numpy()[:,-step:,:]

#helper function

In [None]:
def load_checkpoint_from_cloud(checkpoint_path="model_dict.pth"):
    checkpoint_file = wandb.restore(checkpoint_path)
    return checkpoint_file.name

def get_resume_run_config(resume_id):
    # all config will be replaced by the stored one in wandb
    api = wandb.Api()
    previous_run = api.run(f"demiurge/descriptor_model/{resume_id}")
    config = Namespace(**previous_run.config)
    return config

def load_test_data(config, test_data_path):
    test_data_path = Path(test_data_path)
    window_size = config.window_size

    attribute_list = []
    des_array = None

    with open(test_data_path) as json_file:
        data = json.load(json_file)
        data_list = []
        for des in data:
            timestamp = next(iter(des))
            descriptor = des[timestamp]
            if len(attribute_list) == 0:
                attribute_list = descriptor.keys()
                attribute_list = sorted(attribute_list)
            values = []
            for k in attribute_list:
                values.append(float(descriptor[k]))
            data_list.append((int(timestamp), values))
        #sort value by timestamp
        sorted_data = sorted(data_list)
        #convert data into descriptor array
        des_array = [j for (i,j) in sorted_data]
    des_array = np.array(des_array)
    #   cut according to the window size
    des_array = des_array[np.newaxis, -window_size:, :]
    #   also need to record attribute_list and timeframe for saving
    last_timestamp = sorted_data[-1][0]
    interval = sorted_data[-1][0] - sorted_data[-2][0]
    return torch.tensor(des_array, dtype=torch.float32), (int(last_timestamp), int(interval), attribute_list)

def save_descriptor_as_json(save_path, data, audio_info):
    last_timestamp, interval, attribute_list = audio_info
    current_timestamp = last_timestamp + interval
    num_data, prediction_length, _ = data.shape
    for i in range(num_data):
        #for each data, save 1 json file
        stored = []
        for j in range(prediction_length):
            descriptor_info = {}
            descriptor_values = {}
            for k in range(len(attribute_list)):
                descriptor_values[str(attribute_list[k])] = str(data[i,j,k])
            descriptor_info[str(current_timestamp)] = descriptor_values
            stored.append(descriptor_info)
            #increment timestamp for next descriptor
            current_timestamp = current_timestamp + interval

        #save to json
        json_path = save_path / (str(resume_run_id) + "_predicted_" + str(i) + ".txt")
        with open(json_path, "w") as json_file:
            json.dump(stored, json_file)



#######################         train functions


def init_wandb_run(config, run_dir="./", mode="run"):
    resume_run_id = config.resume_run_id
    entity = "demiurge"
    run_dir = Path(run_dir).absolute()

    if resume_run_id:
        run_id = resume_run_id
    else:
        run_id = wandb.util.generate_id()

    run = wandb.init(
        project="descriptor_model",
        id=run_id,
        entity=entity,
        resume=True,
        dir=run_dir,
        mode=mode,
    )

    print("run id: " + str(wandb.run.id))
    print("run name: " + str(wandb.run.name))
    wandb.watch_called = False
    # run.tags = run.tags + (selected_model,)
    return run

def setup_model(config, run):
    checkpoint_path = str(Path(run.dir).absolute() / "checkpoint.ckpt")

    if config.resume_run_id:
        # Download file from the wandb cloud.
        load_checkpoint_from_cloud(checkpoint_path="checkpoint.ckpt")
        extra_trainer_args = {"resume_from_checkpoint": checkpoint_path}
        model = DescriptorModel.load_from_checkpoint(checkpoint_path)
    else:
        extra_trainer_args = {}
        model = DescriptorModel(config)

    return model, extra_trainer_args

# generate

In [None]:
config = get_resume_run_config(resume_run_id)
config.resume_run_id = resume_run_id
run = init_wandb_run(config, run_dir="./", mode="offline")
model,_ = setup_model(config, run)
model.eval()
#construct test_data
test_data, audio_info = load_test_data(config, test_data_path)

prediction = model.predict(test_data, prediction_length)

save_descriptor_as_json(prediction_output_dir, prediction, audio_info)
print("ok!")

[34m[1mwandb[0m: Offline run mode, not syncing to the cloud.
[34m[1mwandb[0m: W&B syncing is set to `offline` in this directory.  Run `wandb online` to enable cloud syncing.


run id: 4gn7g6xq
run name: None
ok!
