# Génération de musique par Intelligence Artificielle pour la pratique sportive 

Nous nous proposons d'ajuster/affiner (*"fine-tune"*) l'algorithme MusicGen afin de répondre au mieux au besoin d'utilisateurs de l'application [Decathlon Coach](https://www.decathloncoach.com/fr/home/coaching/programs/sport) désireux de générer des musiques pour leurs séances sportives. Il s'agit d'un cas d'usage fictif mais qui nous permettra de manipuler MusicGen. Nous supposons que les utilisateurs en question souhaitent écouter des musiques qui sont à la fois en accord avec leurs goûts musicaux, mais aussi adaptés à chaque séance (exemples: musiques plus calmes pour du Yoga, plus toniques pour du cardio, etc...).

Avant d'apporter des modifications à MusicGen, il est indispensable de préparer les données que l'on souhaite utilisée (collecte, formatage, etc...). C'est l'objet du présent noteb

## Partie 2: préparation des données

Les 2 cellules ci-dessous chargent quelques modules utilitaires que nous ne détaillerons pas ici. pas ici.

In [None]:
%run utilitaires.ipynb

In [None]:
%load_ext autoreload
%autoreload 2

### Collecte et analyse de la donnée brute

Nous prendrons l'exemple d'un utilisateur adepte de musique latine pour lequel on a collecté un échantillon de plusieurs musiques qu'il a déjà écouté durant ses séances.

In [None]:
paths_to_data = [
    "./data/music/raw/latino",
    "./data/music/raw/classic",
    # "./data/music/raw/electro",
]
music_files = [
    os.sep.join([path_to_data, f]) 
    for path_to_data in paths_to_data 
    for f in os.listdir(path_to_data) 
    if f[-4:] == ".mp3"
]

for f in music_files[:3]:
    print(" ".join(f[:-4].split(os.sep)[-1].split("_") + [":"]))
    display(Audio(f, rate=44100))


### Découpage des fichiers musicaux en "chunks"

Nous devons tout d'abord "découper" les morceaux de musiques en "portions" (*"chunks"*) plus court, et pour des questions d'efficacité computationelles, de e longuesur égales.

Chaque portion, aura une durée de 25 secondes, et nous autorisons un chevauchement de 10 secondes entre chaque morceau.

In [None]:
audio_encoder_name = "facebook/encodec_32khz"

audio_encoder_feature_extractor = AutoFeatureExtractor.from_pretrained(
    audio_encoder_name,
    cache_dir="./models"
)

chunk_duration = 25
chunk_overlap = 10
sampling_rate = audio_encoder_feature_extractor.sampling_rate

In [None]:
chunks = chunk_audio_file(music_files[0], chunk_duration, chunk_overlap, sampling_rate=sampling_rate)
np.testing.assert_allclose(
    chunks[0]["array"][-chunks[0]["sampling_rate"] * chunk_overlap:],
    chunks[1]["array"][:chunks[0]["sampling_rate"] * chunk_overlap]
)
for chunk in chunks:
    display(Audio(chunk["array"], rate=chunk["sampling_rate"]))

Nous utilisons la fonction `prompt_engineering` introduite dans le notebook précédent pour "étiqueter" chaque morceau avec un texte qui le décrit.

Nous obtenons un jeux de données (*"dataset"*) contenant 2 colonnes (et une cinquantaine de lignes):
- `audio` (morceaux de musiques de 25 secondes chacun)
- `description` (description textuelle générée automatiquement en spécifiant uniquement le nom de la session sportive et le style de musique souhaité)

In [None]:
dataset = Dataset.from_list([
    {
        "audio": chunk,
        "description": prompt_engineering(
            ' '.join(f[:-4].split(os.sep)[-1].split('_')[:-1]), 
            f[:-4].split(os.sep)[-1].split('_')[-1]
        ),
    } for chunk in chunk_audio_file(f, chunk_duration, chunk_overlap, sampling_rate=sampling_rate) for f in music_files
    if len(chunk["array"]) == sampling_rate * chunk_duration
])
print(dataset)
all_descriptions = {
    k: len(list(g)) 
    for k, g in itertools.groupby(sorted(dataset["description"]))
}
pprint(all_descriptions)

### "Tokenization" des descriptions textuelles

Comme tous les modèles d'IA, MusicGen ne comprend pas le langage "naturel", il ne sait manipuler que des nombres.

Nous devons donc transformer le texte en nombres "compréhensibles" pour MusicGen. Ces nombre sont également appelés "jetons" (*"tokens"*).

L'algorithme opérant cette transformation est appelé *"tokenizer"*.

In [None]:
tokenizer = AutoProcessor.from_pretrained(
    "facebook/musicgen-medium",
    cache_dir="./models"
).tokenizer
tokenizer

In [None]:
pprint({
    k: tokenizer(k)["input_ids"]
     for k in all_descriptions.keys()
})

On observe que le premier token est toujours 3, et le dernier toujours 1.
Essayons de voir à quoi ces tokens correspondent.

In [None]:
print(tokenizer.decode(1))
print(tokenizer.decode(14098))
print(tokenizer.decode(32))
tokenizer.decode(3)

Pour des raisons d'efficacité computationnelle, toutes les entrées doivent avoir la même "taille" (même nombrer de tokens). 

Pour se faire, on se définit une longueur de référence (`max_length = 20` tokens ci-dessous). On "tronque" (*"truncation"*) les textes trop longs, et on "complète" (*"padding"*) les textes trop courts.

In [None]:
def tokenize(tokenizer, batch, text_column, max_length):
    return tokenizer(
        batch[text_column],
        padding="max_length",
        max_length=max_length,
        truncation=True,
        add_special_tokens=False,
    )

On applique la *"tokenization"* sur l'intégralité de la colonne `description` du jeux de données.

In [None]:
dataset = dataset.map(
    lambda batch: tokenize(tokenizer, batch, text_column="description", max_length=20), 
    num_proc=18,
    desc="Tokenize descriptions",
)
print(dataset)
print(dataset[0]["input_ids"])

### Formatage (encodage) de l'audio

De même que pour le texte, pour l'audio MusicGen attend un format bien spécifique.

Dans un premier temps nous extrayons l'audio sous forme d'une liste de valeurs numériques dans une nouvelle colonne du jeux de données que l'on baptise `labels`.

In [None]:
def extract_audio_features(audio_encoder_feature_extractor, batch):
    audio = batch["audio"]
    labels = audio_encoder_feature_extractor(
        audio["array"], sampling_rate=audio["sampling_rate"]
    )
    batch["labels"] = labels["input_values"]
    return batch

dataset = dataset.map(
    lambda batch: extract_audio_features(audio_encoder_feature_extractor, batch), 
    num_proc=18,
    desc="Extract audio features",
)
print(dataset)
print(np.asarray(dataset[0]["labels"]).shape)
print(dataset[25]["labels"][0][0][:20])

Quelques statistiques pour mieux comprendre la nouvelle colonne `labels`:

In [None]:
stats = query(f"""
SELECT 
  max(length(description)) as max_nb_characters,
  min(length(description)) as min_nb_characters,
  max(length(input_ids)) as max_nb_tokens,
  min(length(input_ids)) as min_nb_tokens,
  max(length(labels[1][1])) as max_length_labels,
  min(length(labels[1][1])) as min_length_labels,
FROM dataset
""", load_from_cache_file=False)
pprint(stats[0])

Dans la cellule suivante, nous allons "encoder" la liste de valeurs numériques contenues dans `labels` sous la forme d'une suite de "vecteurs" de dimension 4. C'est ce format qu'attend MusicGen.

In [None]:
model = AutoModelForTextToWaveform.from_pretrained(
    "facebook/musicgen-medium", 
    cache_dir="./models"
)
audio_encoder = model.audio_encoder
num_codebooks = model.decoder.config.num_codebooks
audio_encoder_pad_token_id = model.config.decoder.pad_token_id

pad_labels = torch.ones((1, 1, num_codebooks, 1)) * audio_encoder_pad_token_id

if torch.cuda.device_count() == 1:
    audio_encoder.to("cuda")

def apply_audio_encoderr(batch):

    with torch.no_grad():
        labels = audio_encoder.encode(
            torch.tensor(batch["labels"]).to(audio_encoder.device)
        )["audio_codes"]

    # add pad token column
    labels = torch.cat(
        [pad_labels.to(labels.device).to(labels.dtype), labels], dim=-1
    )

    labels, delay_pattern_mask = model.decoder.build_delay_pattern_mask(
        labels.squeeze(0),
        audio_encoder_pad_token_id,
        labels.shape[-1] + num_codebooks,
    )

    labels = model.decoder.apply_delay_pattern_mask(labels, delay_pattern_mask)

    # the first timestamp is associated to a row full of BOS, let's get rid of it
    batch["labels"] = labels[:, 1:].cpu()
    return batch

# Encodec doesn't truely support batching
# Pass samples one by one to the GPU
dataset = dataset.map(
    apply_audio_encoderr,
    num_proc=1,
    desc="Apply encodec",
)

In [None]:
print(dataset)
print(type(dataset[0]["input_ids"]))
print(type(dataset[0]["labels"]))
print(np.asarray(dataset[0]["input_ids"]).shape)
print(np.asarray(dataset[0]["labels"]).shape)

### Sauvegarde du jeux de données sur le disque

In [None]:
save_path = "./data/music/processed/all"

os.makedirs(save_path, exist_ok=True)
shutil.rmtree(save_path)

dataset.save_to_disk(save_path)

Vérifions que la lecture du jeux de donnée s'effectue correctement:

In [None]:
dataset.load_from_disk(save_path)