In [None]:
#Connect Google Drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import librosa.feature
import matplotlib.pyplot as plt
import numpy as np


def load_melspectrogram(audio_path, plot=False):
    y, sr = librosa.load(audio_path, sr=22050, mono=True)
    melspectrogram = np.zeros((128, 20000), dtype=float)
    melspectrogram_full = librosa.feature.melspectrogram(y=y, sr=sr)
    melspectrogram[:, :melspectrogram_full.shape[1]] = melspectrogram_full[:, :20000]
    # times[i] = frames[i] * hop_length / sr -> 7,739984882842026 min
    if plot:
        fig, ax = plt.subplots()
        S_dB = librosa.power_to_db(melspectrogram, ref=np.max)
        img = librosa.display.specshow(S_dB, x_axis='time',
                                       y_axis='mel', sr=sr, ax=ax)
        fig.colorbar(img, ax=ax, format='%+2.0f dB')
        ax.set(title='Mel-frequency spectrogram')
        plt.show()
    return melspectrogram.transpose()


if __name__ == "__main__":
    path = "C:/Users/Lysandre/Documents/GitHub/OsuMapCreator/MapCreator/datasets/maps/33688 DJ Okawari - Flower " \
           "Dance/Flower Dance.mp3"
    mel_spectro = load_melspectrogram(path, plot=True)
    print(mel_spectro.shape)



In [None]:
import tensorflow as tf
import numpy as np
from MapCreator.Utils.trainingMapParser import load_beatmap_attributes
from pydub import AudioSegment


def flatten(l):
    flat_list = []
    for sublist in l:
        if type(sublist) == list:
            for item in sublist:
                flat_list.append(item)
        else:
            flat_list.append(sublist)
    return flat_list


def prepare_dataset(data, segment_length=5529, max_hitObject=120):
    end_of_sequence = 10002
    start_of_sequence = 10001
    music_segments = []  # List to store the split segments
    diff_segments = []  # List to store the split segments
    for beatmap in data:
        print(beatmap["audio"])
        # spectrogram using stft
        if beatmap["audio"].endswith(".mp3"):
            # convert mp3 to wav
            sound = AudioSegment.from_mp3(beatmap["audio"])
            beatmap["audio"] = beatmap["audio"][:-4] + ".wav"
            sound.export(beatmap["audio"], format="wav")
        audio = tf.io.read_file(beatmap["audio"])
        audio, _ = tf.audio.decode_wav(audio, 1)
        audio = tf.squeeze(audio, axis=-1)
        stfts = tf.signal.stft(audio, frame_length=200, frame_step=80, fft_length=256)
        x = tf.math.pow(tf.abs(stfts), 0.5)
        # normalisation
        means = tf.math.reduce_mean(x, 1, keepdims=True)
        stddevs = tf.math.reduce_std(x, 1, keepdims=True)
        x = (x - means) / stddevs
        audio_len = tf.shape(x)[0]
        # slicing to 10 seconds
        num_segments = int(np.ceil(audio_len / segment_length))

        for i in range(num_segments):
            start_sample = i * segment_length
            end_sample = start_sample + segment_length

            # Split the spectrogram and add the segment to the list
            segment = x[start_sample:end_sample, :]

            # Pad the last segment if necessary
            if tf.shape(segment)[0] < segment_length:
                paddings = tf.constant([[0, segment_length], [0, 0]])
                segment = tf.pad(segment, paddings, "CONSTANT")[:segment_length, :]

            music_segments.append(segment)

        # Slicing of the diff
        hitpoints = beatmap["text"]
        segment = [start_of_sequence]
        segment_id = 1
        for hitpoint in hitpoints:
            if hitpoint[4] >= 10000:
                print("Spinner ignoré :" + str(hitpoint[4]))
                continue
            # TODO : change 10000 in function of segment_length
            if 10000 * segment_id > hitpoint[2]:
                hitpoint[2] -= 10000 * (segment_id - 1)
                segment.append(hitpoint.tolist())
            else:
                for i in range(int(hitpoint[2] / 10000)-(segment_id-1)):
                    segment.append(end_of_sequence)
                    segment = flatten(segment)

                    # Padding the segment
                    segment += [0] * (max_hitObject*13 - len(segment))
                    if len(segment) > max_hitObject*13:
                        print("Warning : the length of the sequence exceeds the fixed limit")
                    diff_segments.append(segment)
                    segment = [start_of_sequence]
                    segment_id += 1
                hitpoint[2] -= 10000 * (segment_id - 1)
                segment.append(hitpoint.tolist())
        """if len(diff_segments[-1]) != max_hitObject*13:
            print("Ajustement")
            diff_segments[-1].append(end_of_sequence)
            diff_segments[-1] += [0] * (max_hitObject*13 - len(diff_segments[-1]) - 1)
            print(len(diff_segments[-1]))"""

        for i in range(num_segments - segment_id+1):
            diff_segments.append([start_of_sequence, end_of_sequence] + [0] * (max_hitObject*13 - 2))

    return music_segments, diff_segments


def create_text_and_audio_ds(data, bs=4):
    music_segments, diff_segments = prepare_dataset(data)
    print("Number of diff segments : " + str(len(diff_segments)))
    print("Number of music segments : " + str(len(music_segments)))
    audio_ds = tf.data.Dataset.from_tensor_slices(music_segments)
    map_ds = tf.data.Dataset.from_tensor_slices(diff_segments)
    ds = tf.data.Dataset.zip((audio_ds, map_ds))
    ds = ds.map(lambda x, y: {"source": x, "target": y})
    ds = ds.batch(bs)
    ds = ds.prefetch(tf.data.AUTOTUNE)
    return ds


def load_beatmaps_and_musics(paths, max_nb_musics=1000):
    data = []
    diff = []
    for i, path in enumerate(paths):
        if max_nb_musics and i >= max_nb_musics:
            break
        for beatmap in path[0]:
            df_temp, difficulty = load_beatmap_attributes(beatmap, max_hit_object=None)
            df_temp = df_temp.transpose()
            diff.append(difficulty)
            data.append({"audio": path[1], "text": df_temp})
    return data


In [None]:
import tensorflow as tf
from tensorflow import keras
from keras import layers


class TokenEmbedding(layers.Layer):
    def __init__(self, num_vocab=10002, maxlen=100, num_hid=64):
        super().__init__()
        self.emb = tf.keras.layers.Embedding(num_vocab, num_hid)
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=num_hid)

    def call(self, x):
        maxlen = tf.shape(x)[-1]
        x = self.emb(x)
        positions = tf.range(start=0, limit=maxlen, delta=1)
        positions = self.pos_emb(positions)
        return x + positions


class SpeechFeatureEmbedding(layers.Layer):
    def __init__(self, num_hid=64, maxlen=100):
        super().__init__()
        self.conv1 = tf.keras.layers.Conv1D(
            num_hid, 11, strides=2, padding="same", activation="relu"
        )
        self.conv2 = tf.keras.layers.Conv1D(
            num_hid, 11, strides=2, padding="same", activation="relu"
        )
        self.conv3 = tf.keras.layers.Conv1D(
            num_hid, 11, strides=2, padding="same", activation="relu"
        )
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=num_hid)

    def call(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        return self.conv3(x)


class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, num_heads, feed_forward_dim, rate=0.1):
        super().__init__()
        self.att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.ffn = keras.Sequential(
            [
                layers.Dense(feed_forward_dim, activation="relu"),
                layers.Dense(embed_dim),
            ]
        )
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)

    def call(self, inputs, training):
        attn_output = self.att(inputs, inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)


class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, num_heads, feed_forward_dim, dropout_rate=0.1):
        super().__init__()
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm3 = layers.LayerNormalization(epsilon=1e-6)
        self.self_att = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.enc_att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.self_dropout = layers.Dropout(0.5)
        self.enc_dropout = layers.Dropout(0.1)
        self.ffn_dropout = layers.Dropout(0.1)
        self.ffn = keras.Sequential(
            [
                layers.Dense(feed_forward_dim, activation="relu"),
                layers.Dense(embed_dim),
            ]
        )

    def causal_attention_mask(self, batch_size, n_dest, n_src, dtype):
        """Masks the upper half of the dot product matrix in self attention.

        This prevents flow of information from future tokens to current token.
        1's in the lower triangle, counting from the lower right corner.
        """
        i = tf.range(n_dest)[:, None]
        j = tf.range(n_src)
        m = i >= j - n_src + n_dest
        mask = tf.cast(m, dtype)
        mask = tf.reshape(mask, [1, n_dest, n_src])
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)], 0
        )
        return tf.tile(mask, mult)

    def call(self, enc_out, target):
        input_shape = tf.shape(target)
        batch_size = input_shape[0]
        seq_len = input_shape[1]
        causal_mask = self.causal_attention_mask(batch_size, seq_len, seq_len, tf.bool)
        target_att = self.self_att(target, target, attention_mask=causal_mask)
        target_norm = self.layernorm1(target + self.self_dropout(target_att))
        enc_out = self.enc_att(target_norm, enc_out)
        enc_out_norm = self.layernorm2(self.enc_dropout(enc_out) + target_norm)
        ffn_out = self.ffn(enc_out_norm)
        ffn_out_norm = self.layernorm3(enc_out_norm + self.ffn_dropout(ffn_out))
        return ffn_out_norm


class Transformer(keras.Model):
    def __init__(
            self,
            num_hid=64,
            num_head=2,
            num_feed_forward=128,
            source_maxlen=100,
            target_maxlen=100,
            num_layers_enc=4,
            num_layers_dec=1,
            num_classes=10,
    ):
        super().__init__()
        self.loss_metric = keras.metrics.Mean(name="loss")
        self.num_layers_enc = num_layers_enc
        self.num_layers_dec = num_layers_dec
        self.target_maxlen = target_maxlen
        self.num_classes = num_classes

        self.enc_input = SpeechFeatureEmbedding(num_hid=num_hid, maxlen=source_maxlen)
        self.dec_input = TokenEmbedding(
            num_vocab=num_classes, maxlen=target_maxlen, num_hid=num_hid
        )

        self.encoder = keras.Sequential(
            [self.enc_input]
            + [
                TransformerEncoder(num_hid, num_head, num_feed_forward)
                for _ in range(num_layers_enc)
            ]
        )

        for i in range(num_layers_dec):
            setattr(
                self,
                f"dec_layer_{i}",
                TransformerDecoder(num_hid, num_head, num_feed_forward),
            )

        self.classifier = layers.Dense(num_classes)

    def decode(self, enc_out, target):
        y = self.dec_input(target)
        for i in range(self.num_layers_dec):
            y = getattr(self, f"dec_layer_{i}")(enc_out, y)
        return y

    def call(self, inputs):
        source = inputs[0]
        target = inputs[1]
        x = self.encoder(source)
        y = self.decode(x, target)
        return self.classifier(y)

    @property
    def metrics(self):
        return [self.loss_metric]

    def train_step(self, batch):
        """Processes one batch inside model.fit()."""
        source = batch["source"]
        target = batch["target"]
        print("source : " + str(source))
        print("target : " + str(target))
        dec_input = target[:, :-1]
        dec_target = target[:, 1:]
        with tf.GradientTape() as tape:
            preds = self([source, dec_input])
            one_hot = tf.one_hot(dec_target, depth=self.num_classes)
            mask = tf.math.logical_not(tf.math.equal(dec_target, 0))
            loss = self.compiled_loss(one_hot, preds, sample_weight=mask)
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))
        self.loss_metric.update_state(loss)
        return {"loss": self.loss_metric.result()}

    def test_step(self, batch):
        source = batch["source"]
        target = batch["target"]
        dec_input = target[:, :-1]
        dec_target = target[:, 1:]
        preds = self([source, dec_input])
        one_hot = tf.one_hot(dec_target, depth=self.num_classes)
        mask = tf.math.logical_not(tf.math.equal(dec_target, 0))
        loss = self.compiled_loss(one_hot, preds, sample_weight=mask)
        self.loss_metric.update_state(loss)
        return {"loss": self.loss_metric.result()}

    def generate(self, source, target_start_token_idx):
        """Performs inference over one batch of inputs using greedy decoding."""
        bs = tf.shape(source)[0]
        enc = self.encoder(source)
        dec_input = tf.ones((bs, 1), dtype=tf.int32) * target_start_token_idx
        dec_logits = []
        for i in range(self.target_maxlen - 1):
            dec_out = self.decode(enc, dec_input)
            logits = self.classifier(dec_out)
            logits = tf.argmax(logits, axis=-1, output_type=tf.int32)
            last_logit = tf.expand_dims(logits[:, -1], axis=-1)
            dec_logits.append(last_logit)
            dec_input = tf.concat([dec_input, last_logit], axis=-1)
        return dec_input


In [None]:
import os
from MapCreator.Utils.trainingMapParser import get_paths
from MapCreator.IA.Transformers.Model import *
from MapCreator.IA.Transformers.Dataset import *


class DisplayOutputs(keras.callbacks.Callback):
    def __init__(
            self, batch, idx_to_token, target_start_token_idx=27, target_end_token_idx=28
    ):
        """Displays a batch of outputs after every epoch

        Args:
            batch: A test batch containing the keys "source" and "target"
            idx_to_token: A List containing the vocabulary tokens corresponding to their indices
            target_start_token_idx: A start token index in the target vocabulary
            target_end_token_idx: An end token index in the target vocabulary
        """
        self.batch = batch
        self.target_start_token_idx = target_start_token_idx
        self.target_end_token_idx = target_end_token_idx
        self.idx_to_char = idx_to_token

    def on_epoch_end(self, epoch, logs=None):
        if epoch % 5 != 0:
            return
        source = self.batch["source"]
        target = self.batch["target"].numpy()
        bs = tf.shape(source)[0]
        preds = self.model.generate(source, self.target_start_token_idx)
        preds = preds.numpy()
        for i in range(bs):
            target_text = "".join([self.idx_to_char[_] for _ in target[i, :]])
            prediction = ""
            for idx in preds[i, :]:
                prediction += self.idx_to_char[idx]
                if idx == self.target_end_token_idx:
                    break
            print(f"target:     {target_text.replace('-', '')}")
            print(f"prediction: {prediction}\n")


class CustomSchedule(keras.optimizers.schedules.LearningRateSchedule):
    def __init__(
            self,
            init_lr=0.00001,
            lr_after_warmup=0.001,
            final_lr=0.00001,
            warmup_epochs=15,
            decay_epochs=85,
            steps_per_epoch=203,
    ):
        super().__init__()
        self.init_lr = init_lr
        self.lr_after_warmup = lr_after_warmup
        self.final_lr = final_lr
        self.warmup_epochs = warmup_epochs
        self.decay_epochs = decay_epochs
        self.steps_per_epoch = steps_per_epoch

    def calculate_lr(self, epoch):
        """ linear warm up - linear decay """
        warmup_lr = (
                self.init_lr
                + ((self.lr_after_warmup - self.init_lr) / (self.warmup_epochs - 1)) * epoch
        )
        decay_lr = tf.math.maximum(
            self.final_lr,
            self.lr_after_warmup
            - (epoch - self.warmup_epochs)
            * (self.lr_after_warmup - self.final_lr)
            / self.decay_epochs,
        )
        return tf.math.minimum(warmup_lr, decay_lr)

    def __call__(self, step):
        epoch = step // self.steps_per_epoch
        return self.calculate_lr(epoch)


max_target_len = 13*120
base_path = "C:/Users/Lysandre/Documents/GitHub/OsuMapCreator/MapCreator/datasets"
paths = get_paths(os.path.join(base_path, "maps"))
data = load_beatmaps_and_musics(paths, max_nb_musics=500)
split = int(len(data) * 0.99)
train_data = data[:split]
test_data = data[split:]
ds = create_text_and_audio_ds(train_data, bs=64)
val_ds = create_text_and_audio_ds(test_data, bs=4)
batch = next(iter(val_ds))

# The vocabulary to convert predicted indices into characters
"""display_cb = DisplayOutputs(
    batch, idx_to_char, target_start_token_idx=2, target_end_token_idx=3
)  # set the arguments as per vocabulary index for '<' and '>'"""

model = Transformer(
    num_hid=200,
    num_head=2,
    num_feed_forward=400,
    target_maxlen=max_target_len,
    num_layers_enc=4,
    num_layers_dec=1,
    num_classes=10003,
)
loss_fn = tf.keras.losses.MeanSquaredError(
)

optimizer = keras.optimizers.Adam(learning_rate=1e-4)
model.compile(optimizer=optimizer, loss=loss_fn)

history = model.fit(ds, validation_data=val_ds, epochs=1)
import os
from MapCreator.Utils.trainingMapParser import get_paths
from MapCreator.IA.Transformers.Model import *
from MapCreator.IA.Transformers.Dataset import *


class DisplayOutputs(keras.callbacks.Callback):
    def __init__(
            self, batch, idx_to_token, target_start_token_idx=27, target_end_token_idx=28
    ):
        """Displays a batch of outputs after every epoch

        Args:
            batch: A test batch containing the keys "source" and "target"
            idx_to_token: A List containing the vocabulary tokens corresponding to their indices
            target_start_token_idx: A start token index in the target vocabulary
            target_end_token_idx: An end token index in the target vocabulary
        """
        self.batch = batch
        self.target_start_token_idx = target_start_token_idx
        self.target_end_token_idx = target_end_token_idx
        self.idx_to_char = idx_to_token

    def on_epoch_end(self, epoch, logs=None):
        if epoch % 5 != 0:
            return
        source = self.batch["source"]
        target = self.batch["target"].numpy()
        bs = tf.shape(source)[0]
        preds = self.model.generate(source, self.target_start_token_idx)
        preds = preds.numpy()
        for i in range(bs):
            target_text = "".join([self.idx_to_char[_] for _ in target[i, :]])
            prediction = ""
            for idx in preds[i, :]:
                prediction += self.idx_to_char[idx]
                if idx == self.target_end_token_idx:
                    break
            print(f"target:     {target_text.replace('-', '')}")
            print(f"prediction: {prediction}\n")


class CustomSchedule(keras.optimizers.schedules.LearningRateSchedule):
    def __init__(
            self,
            init_lr=0.00001,
            lr_after_warmup=0.001,
            final_lr=0.00001,
            warmup_epochs=15,
            decay_epochs=85,
            steps_per_epoch=203,
    ):
        super().__init__()
        self.init_lr = init_lr
        self.lr_after_warmup = lr_after_warmup
        self.final_lr = final_lr
        self.warmup_epochs = warmup_epochs
        self.decay_epochs = decay_epochs
        self.steps_per_epoch = steps_per_epoch

    def calculate_lr(self, epoch):
        """ linear warm up - linear decay """
        warmup_lr = (
                self.init_lr
                + ((self.lr_after_warmup - self.init_lr) / (self.warmup_epochs - 1)) * epoch
        )
        decay_lr = tf.math.maximum(
            self.final_lr,
            self.lr_after_warmup
            - (epoch - self.warmup_epochs)
            * (self.lr_after_warmup - self.final_lr)
            / self.decay_epochs,
        )
        return tf.math.minimum(warmup_lr, decay_lr)

    def __call__(self, step):
        epoch = step // self.steps_per_epoch
        return self.calculate_lr(epoch)


max_target_len = 13*120
base_path = "C:/Users/Lysandre/Documents/GitHub/OsuMapCreator/MapCreator/datasets"
paths = get_paths(os.path.join(base_path, "maps"))
data = load_beatmaps_and_musics(paths, max_nb_musics=500)
split = int(len(data) * 0.99)
train_data = data[:split]
test_data = data[split:]
ds = create_text_and_audio_ds(train_data, bs=64)
val_ds = create_text_and_audio_ds(test_data, bs=4)
batch = next(iter(val_ds))

# The vocabulary to convert predicted indices into characters
"""display_cb = DisplayOutputs(
    batch, idx_to_char, target_start_token_idx=2, target_end_token_idx=3
)  # set the arguments as per vocabulary index for '<' and '>'"""

model = Transformer(
    num_hid=200,
    num_head=2,
    num_feed_forward=400,
    target_maxlen=max_target_len,
    num_layers_enc=4,
    num_layers_dec=1,
    num_classes=10003,
)
loss_fn = tf.keras.losses.MeanSquaredError(
)

optimizer = keras.optimizers.Adam(learning_rate=1e-4)
model.compile(optimizer=optimizer, loss=loss_fn)

history = model.fit(ds, validation_data=val_ds, epochs=1)


In [None]:
from enum import Enum
from typing import List, Optional
import abc


class SectionName(Enum):
    General = "[General]"
    Editor = "[Editor]"
    Metadata = "[Metadata]"
    Difficulty = "[Difficulty]"
    Events = "[Events]"
    TimingPoints = "[TimingPoints]"
    Colours = "[Colours]"
    HitObjects = "[HitObjects]"


class Section:

    def value(self, value):
        if value.strip().isnumeric():
            return int(value)
        else:
            try:
                return float(value)
            except ValueError:
                return value

    def parse_line(self, line: str):
        ...


class HitSample:
    normalSet: int = 0  # SampleSet
    additionSet: int = 0  # SampleSet
    index: int = 0
    volume: int = 0
    filename: Optional[str] = None

    def set(self, normalSet: int, additionSet: int, index: int, volume: int, filename: Optional[str] = ""):
        self.normalSet = normalSet
        self.additionSet = additionSet
        self.index = index
        self.volume = volume
        self.filename = filename

    def __str__(self):
        if self.filename is not None:
            return str(f"{self.normalSet}:{self.additionSet}:{self.index}:{self.volume}:{self.filename}:")
        else:
            return str(f"{self.normalSet}:{self.additionSet}:{self.index}:{self.volume}:")


class General(Section):

    def __init__(self,
                 AudioFilename: Optional[str] = None,
                 AudioLeadIn: Optional[int] = 0,
                 AudioHash: Optional[str] = None,
                 PreviewTime: Optional[int] = -1,
                 Countdown: Optional[int] = 1,
                 SampleSet: Optional[str] = "Normal",  # SampleSet.Normal.value
                 StackLeniency: Optional[float] = 0.7,
                 Mode: Optional[int] = 0,
                 LetterboxInBreaks: Optional[int] = 0,
                 StoryFireInFront: Optional[int] = 1,
                 UseSkinSprites: Optional[int] = 0,
                 AlwaysShowPlayfield: Optional[int] = 0,
                 OverlayPosition: Optional[str] = "NoChange",
                 SkinPreference: Optional[str] = None,
                 EpilepsyWarning: Optional[int] = 0,
                 CountdownOffset: Optional[int] = 0,
                 SpecialStyle: Optional[int] = 0,
                 WidescreenStoryboard: Optional[int] = 0,
                 SamplesMatchPlaybackRate: Optional[int] = 0):
        self.AudioFilename = AudioFilename
        self.OverlayPosition = OverlayPosition
        self.EpilepsyWarning = EpilepsyWarning
        self.SpecialStyle = SpecialStyle
        self.SamplesMatchPlaybackRate = SamplesMatchPlaybackRate
        self.WidescreenStoryboard = WidescreenStoryboard
        self.CountdownOffset = CountdownOffset
        self.SkinPreference = SkinPreference
        self.UseSkinSprites = UseSkinSprites
        self.AlwaysShowPlayfield = AlwaysShowPlayfield
        self.LetterboxInBreaks = LetterboxInBreaks
        self.StoryFireInFront = StoryFireInFront
        self.Mode = Mode
        self.StackLeniency = StackLeniency
        self.SampleSet = SampleSet
        self.Countdown = Countdown
        self.PreviewTime = PreviewTime
        self.AudioLeadIn = AudioLeadIn
        self.AudioHash = AudioHash

    def parse_line(self, line: str):
        members = line.split(':')
        self.__setattr__(members[0], self.value(members[1]))


class Editor(Section):
    def __init__(self,
                 Bookmarks: Optional[List[int]] = None,
                 DistanceSpacing: Optional[float] = None,
                 BeatDivisor: Optional[int] = None,
                 GridSize: Optional[int] = None,
                 TimelineZoom: Optional[float] = None):
        self.GridSize = GridSize
        self.BeatDivisor = BeatDivisor
        self.DistanceSpacing = DistanceSpacing
        self.Bookmarks = Bookmarks
        self.TimelineZoom = TimelineZoom

    def parse_line(self, line: str):
        members = line.split(':')
        if members[0] == "Bookmarks":
            self.Bookmarks = [self.value(x) for x in members[1].split(",")]
        else:
            self.__setattr__(members[0], self.value(members[1]))


class Metadata(Section):
    def __init__(self,
                 Title: Optional[str] = None,
                 TitleUnicode: Optional[str] = None,
                 Artist: Optional[str] = None,
                 ArtistUnicode: Optional[str] = None,
                 Creator: Optional[str] = None,
                 Version: Optional[str] = None,
                 Source: Optional[str] = None,
                 Tags: Optional[List[str]] = None,
                 BeatmapID: Optional[int] = None,
                 BeatmapSetID: Optional[int] = None):

        self.Tags = Tags
        self.BeatmapSetID = BeatmapSetID
        self.BeatmapID = BeatmapID
        self.Source = Source
        self.Version = Version
        self.Creator = Creator
        self.ArtistUnicode = ArtistUnicode
        self.Artist = Artist
        self.TitleUnicode = TitleUnicode
        self.Title = Title

    def parse_line(self, line: str):
        members = line.split(':')
        if members[0] == "Tags":
            self.Tags = [x for x in members[1].split(" ")]
        else:
            self.__setattr__(members[0], self.value(members[1]))


class Difficulty(Section):
    HPDrainRate: float
    CircleSize: float
    OverallDifficulty: float
    ApproachRate: float
    SliderMultiplier: float
    SliderTickRate: float

    def parse_line(self, line: str):
        members = line.split(':')
        self.__setattr__(members[0], self.value(members[1]))


class EventParams:
    pass


class Event(Section):
    eventType: str
    startTime: int
    eventParams: List[EventParams]


class Background(EventParams):
    filename: str
    xOffset: int
    yOffset: int


class Video(EventParams):
    Video: 1
    startTime: int
    filename: str
    xOffset: int
    yOffset: int


class Pause(EventParams):
    # 2:Break TODO check wiki because sintaxe is strange
    Break: 2
    startTime: int
    endTime: int


#  TODO
class Storyboard(EventParams):
    pass


class TimingPoint(Section):
    time: int
    beatLength: float
    meter: int
    sampleSet: int = 1  # SampleSet = SampleSet.Normal.value
    sampleIndex: int = 0
    volume: int = 1
    uninherited: int
    effects: int = 0  # Effect = None
    bpm: int

    def parse_line(self, line: str):
        members = line.split(",")
        self.time = self.value(members[0])
        self.beatLength = self.value(members[1])
        self.meter = self.value(members[2])
        self.sampleSet = self.value(members[3])
        self.sampleIndex = self.value(members[4])
        self.volume = self.value(members[5])
        self.uninherited = self.value(members[6])
        self.effects = self.value(members[7])
        self.calculate_bpm()

    def calculate_bpm(self):
        self.bpm = round(60000 / self.beatLength)


# TODO check wiki for colours
class ColourObject(Section):
    Combo: int
    color: List[int]

    # SliderTrackOverride
    # SliderBorder
    def parse_line(self, line):
        pass


class HitObject(Section):
    # x: int
    # y: int
    # time: int
    # type: int
    # hitSound: int = 0
    # hitSample: str  # Optional[HitSample]

    def __init__(self,
                 x: Optional[int] = 0,
                 y: Optional[int] = 0,
                 time: Optional[int] = 0,
                 type: Optional[int] = 0,  # Type
                 hitSound: Optional[int] = 0,
                 hitSample: Optional[str] = None):
        self.x = x
        self.y = y
        self.time = time
        self.type = type
        self.hitSound = hitSound
        if hitSample is None:
            self.hitSample = HitSample().__str__()
        else:
            self.hitSample = hitSample

    def __str__(self):
        return f"{self.x},{self.y},{self.time},{self.type},{self.hitSound},{self.hitSample}"

    def get_hit_sample(self, line) -> str:
        if self.has_hit_sample(line):
            return line
        return "0:0:0:0:0:"

    def has_hit_sample(self, line) -> bool:
        if type(line) == int or type(line) == float:
            return False
        else:
            return True

    def get(self, _type):
        return self.__dict__.get(str(_type))

    def get_type(self, _type):
        if _type & 1:
            print("circle")
        elif _type & 2:
            print("slider")
        elif _type & 8:
            print("spinner")
        # elif _type & 128:
        #     print("mania")
        else:
            print("unknown type:", _type)

    def is_slider(self, _type) -> bool:
        if _type & 2:
            return True
        return False

    def is_spinner(self, _type) -> bool:
        if _type & 8:
            return True
        return False

    def is_circle(self, _type) -> bool:
        if _type & 1:
            return True
        return False


class Cercle(HitObject):

    def __init__(self,
                 x: Optional[int] = 0,
                 y: Optional[int] = 0,
                 time: Optional[int] = 0,
                 type: Optional[int] = 0,  # Type
                 hitSound: Optional[int] = 0,
                 hitSample: Optional[str] = None):
        super().__init__(x, y, time, type, hitSound, hitSample)

    def parse_line(self, line):
        members = line.split(",")
        self.x = self.value(members[0])
        self.y = self.value(members[1])
        self.time = self.value(members[2])
        self.type = self.value(members[3])
        self.hitSound = self.value(members[4])
        self.hitSample = self.get_hit_sample(self.value(members[-1]))


class Spinner(HitObject):
    endTime: int

    def parse_line(self, line):
        members = line.split(",")
        self.x = self.value(members[0])
        self.y = self.value(members[1])
        self.time = self.value(members[2])
        self.type = self.value(members[3])
        self.hitSound = self.value(members[4])
        self.endTime = self.value(members[5])

        self.hitSample = self.get_hit_sample(self.value(members[-1]))


class CurvePoint:
    x: int
    y: int

    def __str__(self):
        return f"{self.x}:{self.y}"


class Slider(HitObject):
    curveType: str
    curvePoints: List[CurvePoint]
    slides: int
    length: float
    edgeSounds: str
    edgeSets: str

    def parse_line(self, line):
        members = line.split(",")
        self.x = self.value(members[0])
        self.y = self.value(members[1])
        self.time = self.value(members[2])
        self.type = self.value(members[3])
        self.hitSound = self.value(members[4])

        # Parse slider points
        points = (members[5] or '').split('|')
        self.curveType = points[0]
        self.curvePoints = []
        if len(points):
            for i in range(1, len(points)):
                coordinates = points[i].split(':')
                curve_point = CurvePoint()
                curve_point.x = self.value(coordinates[0])
                curve_point.y = self.value(coordinates[1])
                # self.curvePoints.append(curve_point)
                self.curvePoints.append(curve_point.__str__())

        # Parse repeat slides bumber & length
        self.slides = int(members[6])
        self.length = int(round(float(members[7])))

        # Parse edgeSounds
        if len(members) > 9:
            if members[8]:
                self.edgeSounds = members[8]

            # Parse edgeSets
            if members[9]:
                self.edgeSets = members[9]

        self.hitSample = self.get_hit_sample(self.value(members[-1]))


In [None]:
import codecs
import os
import re
from typing import List

from MapCreator.Utils.models.models import General, Editor, Metadata, Difficulty, Event, TimingPoint, ColourSection, \
    HitObject, SectionName, Slider, Spinner, Cercle


class Parser:
    def __init__(self):
        self.file_format = ""
        self.general = General()
        self.editor = Editor()
        self.metadata = Metadata()
        self.difficulty = Difficulty()
        self.events: List[Event] = []
        self.timing_points: List[TimingPoint] = []
        self.colours: List[ColourSection] = []
        self.hit_objects: List[HitObject] = []

        self.osu_section = ""

    def parse_hit_object_type(self, line):
        _type = int(line.split(",")[3].strip())
        # https://osu.ppy.sh/wiki/fr/Client/File_formats/Osu_%28file_format%29#type
        # convert in bit
        # 0: Cercle
        # 1: Slider
        # 3:Spinner
        # 7 osu mania
        if _type & 1:
            cercle = Cercle()
            cercle.parse_line(line)
            return cercle
        elif _type & 2:
            slider = Slider()
            slider.parse_line(line)
            return slider
        elif _type & 8:
            spinner = Spinner()
            spinner.parse_line(line)
            return spinner
        # elif _type & 128:
        #     print("mania")
        else:
            cercle = Cercle()
            cercle.parse_line(line)
            print("unknown type:", _type)
            return cercle

    def parse_line(self, line: str):
        line = line.strip()
        if not line:
            return

        match = re.search(r"\[(.*?)\]", line)
        if match:
            self.osu_section = match.group(0)
            return
        match = re.match('^osu file format (v[0-9]+)$', line)
        if match:
            # self.file_format = line
            self.file_format = match.group(1)
            return
        if self.osu_section == SectionName.General.value:
            self.general.parse_line(line)
        elif self.osu_section == SectionName.Editor.value:
            self.editor.parse_line(line)
        elif self.osu_section == SectionName.Metadata.value:
            self.metadata.parse_line(line)
        elif self.osu_section == SectionName.Difficulty.value:
            self.difficulty.parse_line(line)
        # elif self.osu_section == SectionName.Events.name:
        #     self.events_section.append(line)
        elif self.osu_section == SectionName.TimingPoints.value:
            timing_point = TimingPoint()
            timing_point.parse_line(line)
            self.timing_points.append(timing_point)
        # elif self.osu_section == SectionName.Colours.name:
        #     self.colours_section.append(line)
        elif self.osu_section == SectionName.HitObjects.value:
            hit_obj = self.parse_hit_object_type(line)
            self.hit_objects.append(hit_obj)

    def parse_file(self, file):
        if os.path.isfile(file):
            with codecs.open(file, 'r', encoding="utf-8") as file:
                line = file.readline()
                while line:
                    self.parse_line(line)
                    line = file.readline()


if __name__ == "__main__":

    PATH = "C:/Users/Lysandre/Documents/GitHub/OsuMapCreator/MapCreator/datasets/maps/67565 DragonForce - Valley of the " \
           "Damned/DragonForce - Valley of the Damned (Kayne) [Apocalypse].osu"
    parser = Parser()
    parser.parse_file(PATH)
    # print(parser.timing_points[0].time)
    for o in parser.hit_objects:
        if isinstance(o, Cercle):
            print("true")
        else:
            print("false")
    # for obj in parser.hit_objects:
    #     # print(type(obj),obj.__dict__)
    #     print(obj.__dict__)


In [None]:
import os.path
from typing import List

import numpy as np

from MapCreator.Utils.models.models import Spinner, Cercle, Slider, HitObject
from MapCreator.Utils.parser import Parser
from MapCreator.Utils.audio import load_melspectrogram


def scale_beatmap(hitpoints: List[HitObject]):
    # we take 7min30s for each beatmap
    duration = 7.739984882842026 * 60
    new_hitpoints = []
    for h in hitpoints:
        if h.time <= duration * 1000:
            new_hitpoints.append(h)
    # hitpoints = [x for x in hitpoints if x[2] <= duration * 1000]
    return new_hitpoints


def load_beatmap_attributes(path, max_hit_object=4000):
    cols = ["x", "y", "time", "type", "endtime", "x2", "y2", "x3", "y3", "x4", "y4", "slide", "length"]
    parser = Parser()
    parser.parse_file(path)

    hitpoints = scale_beatmap(parser.hit_objects)

    if max_hit_object is None:
        max_hit_object = len(hitpoints)
    data = np.zeros((13, max_hit_object), dtype=int)

    for (i, o) in enumerate(hitpoints):

        if i < max_hit_object:

            data[0][i] = o.x
            data[1][i] = o.y
            data[2][i] = o.time
            data[3][i] = o.type

            if isinstance(o, Cercle):
                pass
            elif isinstance(o, Spinner):
                data[4][i] = o.endTime
            elif isinstance(o, Slider):
                data[5][i] = o.curvePoints[0].x
                data[6][i] = o.curvePoints[0].y

                if len(o.curvePoints) > 1:
                    data[7][i] = o.curvePoints[1].x
                    data[8][i] = o.curvePoints[1].y

                if len(o.curvePoints) > 2:
                    data[9][i] = o.curvePoints[2].x
                    data[10][i] = o.curvePoints[2].y

                data[11][i] = o.slides
                data[12][i] = o.length

    return data, parser.difficulty.OverallDifficulty


def load_beatmaps_and_spectrograms(paths: List, max=1000):
    arr = []
    diff = []
    spectrograms = []
    for i, path in enumerate(paths):
        if max and i >= max:
            break
        spectrogram = path_to_audio(path[1])
        for beatmap in path[0]:
            df_temp, difficulty = load_beatmap_attributes(beatmap)
            df_temp = df_temp.transpose()
            arr.append(df_temp)
            diff.append(difficulty)
            spectrograms.append(spectrogram)
    diff = np.array(diff, dtype=float)
    return arr, spectrograms, diff


def normalize(img):
    '''
    Normalizes an array
    (subtract mean and divide by standard deviation)
    '''
    eps = 0.001
    if np.std(img) != 0:
        img = (img - np.mean(img)) / np.std(img)
    else:
        img = (img - np.mean(img)) / eps
    return img


def contains_any_index(root, a_list):
    for i, c in enumerate(a_list):
        if c.startswith(root):
            return i + 1
    return 0


def get_paths(dir_path, max=1000):
    file_paths = []

    for i, dir in enumerate(os.listdir(dir_path)):
        if max and i >= max:
            break
        audio = ""
        beatmaps = []
        for file in os.listdir(os.path.join(dir_path, dir)):
            if file.endswith(".mp3") or file.endswith(".wav"):
                audio = os.path.join(dir_path, dir, file)
            elif file.endswith(".osu"):
                beatmaps.append(os.path.join(dir_path, dir, file))
        file_paths.append((beatmaps, audio))

    return file_paths


if __name__ == "__main__":
    base_path = "C:/Users/Lysandre/Documents/GitHub/OsuMapCreator/MapCreator/datasets"
    paths = get_paths(os.path.join(base_path, "maps"))
    data = load_beatmap_attributes(paths[0][0][0])
    print(data)


In [None]:
from keras import layers
from tensorflow import keras
from MapCreator.Utils.trainingMapParser import *

base_path = "/MapCreator/datasets"
paths = get_paths(os.path.join(base_path, "maps"))
df, spectrograms, diff = load_beatmaps_and_spectrograms(paths)
x_train = spectrograms
x_train = np.array(x_train, dtype=float)
y_train = df
y_train = np.array(y_train, dtype=float)
decoder_input = np.zeros((len(y_train), 4001, 13))

# Ajout des tokens de début et de fin de séquence
index = 0
start_of_sequence = np.array([-1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0])
end_of_sequence = np.array([-2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2]).reshape((1, 1, -1))
end_of_sequence = np.repeat(end_of_sequence, len(y_train), axis=0)

for sublist in decoder_input:
    sublist[0] = start_of_sequence
    sublist[1:] = y_train[index]

y_train = np.append(y_train, end_of_sequence, axis=1)
print(y_train[0][-1])

print("Taille de l'input d'entraînement : " + str(x_train.shape))
print("Taille de l'output d'entraînement : " + str(y_train.shape))

input_dim = 128
decoder_input_shape = 13
latent_dim = 256

# Model's input
input_spectrogram = keras.Input((20000, 128, 1), name="input_spectrogram")
# Convolution layer 1
x = layers.Conv2D(
    filters=64,
    kernel_size=[11, 41],
    strides=[2, 2],
    padding="same",
    use_bias=False,
    name="conv_1",
)(input_spectrogram)
x = layers.BatchNormalization(name="conv_1_bn")(x)
x = layers.ReLU(name="conv_1_relu")(x)
# Convolution layer 2
x = layers.Conv2D(
    filters=64,
    kernel_size=[11, 21],
    strides=[1, 2],
    padding="same",
    use_bias=False,
    name="conv_2",
)(x)
x = layers.BatchNormalization(name="conv_2_bn")(x)
x = layers.ReLU(name="conv_2_relu")(x)

encoder = keras.layers.LSTM(latent_dim, return_state=True)
encoder_outputs, state_h, state_c = encoder(x)

# We discard `encoder_outputs` and only keep the states.
encoder_states = [state_h, state_c]

# Set up the decoder, using `encoder_states` as initial state.
decoder_inputs = keras.Input(shape=(None, decoder_input_shape), name="input_teacher_forcing")

# We set up our decoder to return full output sequences,
# and to return internal states as well. We don't use the
# return states in the training model, but we will use them in inference.
decoder_lstm = keras.layers.LSTM(latent_dim, return_sequences=True, return_state=True)
decoder_outputs, _, _ = decoder_lstm(decoder_inputs, initial_state=encoder_states)
decoder_dense = layers.Dense(13, activation='relu')
decoder_outputs = decoder_dense(decoder_outputs)

# Define the model that will turn
# `encoder_input_data` & `decoder_input_data` into `decoder_target_data`
model = keras.Model([input_spectrogram, decoder_inputs], decoder_outputs)

model.compile(
    optimizer="adam",
    loss=keras.losses.MeanAbsoluteError(),
)
# model.summary()


# model.fit(
#     [x_train, decoder_input],
#     y_train,
#     batch_size=1,
#     epochs=50,
#     # validation_split=0.2
# )

# Save model
model.save("MapCreator")
print("Sauvegarde du modèle terminée")


/content/drive/MyDrive/1151466 MIMI - Nanimo nai Youna/audio.mp3
[-2. -2. -2. -2. -2. -2. -2. -2. -2. -2. -2. -2. -2.]
Taille de l'input d'entraînement : (5, 20000, 128)
Taille de l'output d'entraînement : (5, 4001, 13)
Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_spectrogram (InputLayer)  [(None, 20000, 128)  0          []                               
                                ]                                                                 
                                                                                                  
 expand_dim (Reshape)           (None, 20000, 128,   0           ['input_spectrogram[0][0]']      
                                1)                                                                
                                                                        



Sauvegarde du modèle terminée
