## Dataset 
Dataset DeepScores V2 struttura  

![image.png](attachment:1e980167-d977-403a-af88-cd9f57b472a3.png)

In [9]:
import random
import os
from PIL import Image, ImageColor
from multiprocessing import Process, Queue

import cv2
import numpy as np
import tensorflow as tf
import tensorflow_addons as tfa


from build_label import build_label
from constant import CHANNEL_NUM,CLASS_CHANNEL_MAP,


ImportError: cannot import name 'HALF_WHOLE_NOTE' from 'constant' (C:\Users\simon\Desktop\Computer VIsion\oemer-main\oemer\constant.py)

### Load Dataset Path

In [6]:


def get_deep_score_data_paths(dataset_path):
    imgs = os.listdir(os.path.join(dataset_path, "images"))
    paths = []
    for img in imgs:
        image_path = os.path.join(dataset_path, "images", img)
        seg_path = os.path.join(dataset_path, "segmentation", img.replace(".png", "_seg.png"))
        paths.append((image_path, seg_path))
    return paths

In [7]:
get_deep_score_data_paths('..\\dataset\\ds2_dense')[0]

('..\\dataset\\ds2_dense\\images\\lg-101766503886095953-aug-beethoven--page-1.png',
 '..\\dataset\\ds2_dense\\segmentation\\lg-101766503886095953-aug-beethoven--page-1_seg.png')

### DeepScoresDataLoader

In [10]:
HALF_WHOLE_NOTE = [39, 41, 42, 43, 45, 46, 47, 49]

def build_label(seg_path):
    img = Image.open(seg_path)
    arr = np.array(img)
    color_set = set(np.unique(arr))
    color_set.remove(0)  # Remove background color from the candidates

    total_chs = len(set(CLASS_CHANNEL_MAP.values())) + 2  # Plus 'background' and 'others' channel.
    output = np.zeros(arr.shape + (total_chs,))

    output[..., 0] = np.where(arr==0, 1, 0)
    for color in color_set:
        ch = CLASS_CHANNEL_MAP.get(color, -1)
        if (ch != 0) and color in HALF_WHOLE_NOTE:
            note = fill_hole(arr, color)
            output[..., ch] += note
        else:
            output[..., ch] += np.where(arr==color, 1, 0)
    return output



build_label('..\\dataset\\ds2_dense\\segmentation\\lg-101766503886095953-aug-beethoven--page-1_seg.png')

NameError: name 'HALF_WHOLE_NOTE' is not defined

In [11]:

class DsDataLoader:
    def __init__(self, feature_files, win_size=256, num_samples=100, step_size=0.5, num_worker=4):
        self.feature_files = feature_files
        random.shuffle(self.feature_files)
        self.win_size = win_size
        self.num_samples = num_samples

        if isinstance(step_size, float):
            step_size = max(abs(step_size), 0.01)
            self.step_size = round(win_size * step_size)
        else:
            self.step_size = max(abs(step_size), 2)

        self.file_idx = 0

        self._queue = Queue(maxsize=200)
        self._dist_queue = Queue(maxsize=100)
        self._process_pool = []
        for _ in range(num_worker):
            processor = Process(target=self._preprocess_image)
            processor.daemon = True
            self._process_pool.append(processor)
        self._pdist = Process(target=self._distribute_process)
        self._pdist.daemon = True

    def _distribute_process(self):
        while True:
            paths = self.feature_files[self.file_idx]
            self._dist_queue.put(paths)
            self.file_idx += 1
            if self.file_idx == len(self.feature_files):
                random.shuffle(self.feature_files)
                self.file_idx = 0

    def _preprocess_image(self):
        while True:
            if not self._queue.full():
                inp_img_path, seg_img_path = self._dist_queue.get()

                # Preprocess image with transformations that won't change view.
                image, _ = preprocess_image(inp_img_path)
                label = build_label(seg_img_path)

                # Random resize
                ratio = random.choice(np.arange(0.2, 1.21, 0.1))
                tar_w = int(ratio * image.size[0])
                tar_h = int(ratio * image.size[1])
                trans_func = lambda img: imaugs.resize(img, width=tar_w, height=tar_h)
                image = batch_transform(image, trans_func)
                label = batch_transform(label, trans_func)

                # Random perspective transform
                seed = random.randint(0, 1000)
                perspect_trans = lambda img: imaugs.perspective_transform(img, seed=seed, sigma=70)
                image = np.array(batch_transform(image, perspect_trans))  # RGB image
                label = np.array(batch_transform(label, perspect_trans))

                self._queue.put([image, label, ratio])

    def __iter__(self):
        samples = 0

        if not self._pdist.is_alive():
            self._pdist.start()
        for process in self._process_pool:
            if not process.is_alive():
                process.start()

        while samples < self.num_samples:
            image, label, ratio = self._queue.get()

            # Discard bottom spaces that has no contents.
            staff = label[..., 1]
            yidx, _ = np.where(staff>0)
            if len(yidx) > 0:
                max_y = min(np.max(yidx) + 100, image.shape[0])
            else:
                max_y = image.shape[0]

            max_y = max_y - self.win_size
            max_x = image.shape[1] - self.win_size
            grid_x = range(0, max_x, round(self.step_size*ratio))
            grid_y = range(0, max_y, round(self.step_size*ratio))
            meshgrid = np.meshgrid(grid_x, grid_y, indexing='ij')
            coords = np.dstack(meshgrid).reshape(-1, 2)
            random.shuffle(coords)
            for start_x, start_y in coords:
                y_range = range(start_y, start_y+self.win_size)
                x_range = range(start_x, start_x+self.win_size)
                index = np.ix_(y_range, x_range)

                # Can't use two 'range' inside the numpy array for indexing. Details refer to the following:
                # https://stackoverflow.com/questions/30020143/indexing-slicing-a-2d-numpy-array-using-the-range-arange-function-as-the-argumen
                feat = image[index]
                ll = label[index]
                yield feat, ll

        self._pdist.terminate()
        for process in self._process_pool:
            process.terminate()

    def get_dataset(self, batch_size, output_types=None, output_shapes=None):
        def gen_wrapper():
            for data in self:
                yield data

        if output_types is None:
            output_types = (tf.uint8, tf.float32)

        if output_shapes is None:
            output_shapes = ((self.win_size, self.win_size, 3), (self.win_size, self.win_size, CHANNEL_NUM))

        return tf.data.Dataset.from_generator(
                gen_wrapper, output_types=output_types, output_shapes=output_shapes
            ) \
            .batch(batch_size, drop_remainder=True) \
            .prefetch(tf.data.experimental.AUTOTUNE)


### Unet

In [12]:
import tensorflow as tf
import tensorflow.keras.layers as L
from tensorflow.keras import Input, Model
from tensorflow.keras.layers import (
    LayerNormalization,
    Activation,
    Dropout,
    Conv2D,
    Conv2DTranspose,
    Add,
    Concatenate
)

def conv_block(input_tensor, channel, kernel_size, strides=(2, 2), dilation_rate=1, dropout_rate=0.4):
    """Convolutional encoder block of U-net.

    The block is a fully convolutional block. The encoder block does not downsample the input feature,
    and thus the output will have the same dimension as the input.
    """

    skip = input_tensor

    input_tensor = LayerNormalization()(Activation("relu")(input_tensor))
    input_tensor = Dropout(dropout_rate)(input_tensor)
    input_tensor = Conv2D(
        channel, kernel_size, strides=strides, dilation_rate=dilation_rate, padding="same"
    )(input_tensor)

    input_tensor = LayerNormalization()(Activation("relu")(input_tensor))
    input_tensor = Dropout(dropout_rate)(input_tensor)
    input_tensor = Conv2D(
        channel, kernel_size, strides=(1, 1), dilation_rate=dilation_rate, padding="same"
    )(input_tensor)

    if strides != (1, 1):
        skip = Conv2D(channel, (1, 1), strides=strides, padding="same")(skip)
    input_tensor = Add()([input_tensor, skip])

    return input_tensor


def transpose_conv_block(input_tensor, channel, kernel_size, strides=(2, 2), dropout_rate=0.4):
    skip = input_tensor

    input_tensor = LayerNormalization()(Activation("relu")(input_tensor))
    input_tensor = Dropout(dropout_rate)(input_tensor)
    input_tensor = Conv2D(channel, kernel_size, strides=(1, 1), padding="same")(input_tensor)

    input_tensor = LayerNormalization()(Activation("relu")(input_tensor))
    input_tensor = Dropout(dropout_rate)(input_tensor)
    input_tensor = Conv2DTranspose(channel, kernel_size, strides=strides, padding="same")(input_tensor)

    if strides != (1, 1):
        skip = Conv2DTranspose(channel, (1, 1), strides=strides, padding="same")(skip)
    input_tensor = Add()([input_tensor, skip])

    return input_tensor


def semantic_segmentation(win_size=256, multi_grid_layer_n=1, multi_grid_n=5, out_class=2, dropout=0.4):
    """Improved U-net model with Atrous Spatial Pyramid Pooling (ASPP) block."""
    input_score = Input(shape=(win_size, win_size, 3), name="input_score_48")
    en = Conv2D(2**7, (7, 7), strides=(1, 1), padding="same")(input_score)

    en_l1 = conv_block(en, 2**7, (3, 3), strides=(2, 2))
    en_l1 = conv_block(en_l1, 2**7, (3, 3), strides=(1, 1))

    en_l2 = conv_block(en_l1, 2**7, (3, 3), strides=(2, 2))
    en_l2 = conv_block(en_l2, 2**7, (3, 3), strides=(1, 1))
    en_l2 = conv_block(en_l2, 2**7, (3, 3), strides=(1, 1))

    en_l3 = conv_block(en_l2, 2**7, (3, 3), strides=(2, 2))
    en_l3 = conv_block(en_l3, 2**7, (3, 3), strides=(1, 1))
    en_l3 = conv_block(en_l3, 2**7, (3, 3), strides=(1, 1))
    en_l3 = conv_block(en_l3, 2**7, (3, 3), strides=(1, 1))

    en_l4 = conv_block(en_l3, 2**8, (3, 3), strides=(2, 2))
    en_l4 = conv_block(en_l4, 2**8, (3, 3), strides=(1, 1))
    en_l4 = conv_block(en_l4, 2**8, (3, 3), strides=(1, 1))
    en_l4 = conv_block(en_l4, 2**8, (3, 3), strides=(1, 1))
    en_l4 = conv_block(en_l4, 2**8, (3, 3), strides=(1, 1))

    feature = en_l4
    for _ in range(multi_grid_layer_n):
        feature = LayerNormalization()(Activation("relu")(feature))
        feature = Dropout(dropout)(feature)
        m = LayerNormalization()(Conv2D(2**9, (1, 1), strides=(1, 1), padding="same", activation="relu")(feature))
        multi_grid = m
        for ii in range(multi_grid_n):
            m = LayerNormalization()(
                Conv2D(2**9, (3, 3), strides=(1, 1), dilation_rate=2**ii, padding="same", activation="relu")(feature)
            )
            multi_grid = Concatenate()([multi_grid, m])
        multi_grid = Dropout(dropout)(multi_grid)
        feature = Conv2D(2**9, (1, 1), strides=(1, 1), padding="same")(multi_grid)

    feature = LayerNormalization()(Activation("relu")(feature))

    feature = Conv2D(2**8, (1, 1), strides=(1, 1), padding="same")(feature)
    feature = Add()([feature, en_l4])
    de_l1 = transpose_conv_block(feature, 2**7, (3, 3), strides=(2, 2))

    skip = de_l1
    de_l1 = LayerNormalization()(Activation("relu")(de_l1))
    de_l1 = Concatenate()([de_l1, LayerNormalization()(Activation("relu")(en_l3))])
    de_l1 = Dropout(dropout)(de_l1)
    de_l1 = Conv2D(2**7, (1, 1), strides=(1, 1), padding="same")(de_l1)
    de_l1 = Add()([de_l1, skip])
    de_l2 = transpose_conv_block(de_l1, 2**7, (3, 3), strides=(2, 2))

    skip = de_l2
    de_l2 = LayerNormalization()(Activation("relu")(de_l2))
    de_l2 = Concatenate()([de_l2, LayerNormalization()(Activation("relu")(en_l2))])
    de_l2 = Dropout(dropout)(de_l2)
    de_l2 = Conv2D(2**7, (1, 1), strides=(1, 1), padding="same")(de_l2)
    de_l2 = Add()([de_l2, skip])
    de_l3 = transpose_conv_block(de_l2, 2**7, (3, 3), strides=(2, 2))

    skip = de_l3
    de_l3 = LayerNormalization()(Activation("relu")(de_l3))
    de_l3 = Concatenate()([de_l3, LayerNormalization()(Activation("relu")(en_l1))])
    de_l3 = Dropout(dropout)(de_l3)
    de_l3 = Conv2D(2**7, (1, 1), strides=(1, 1), padding="same")(de_l3)
    de_l3 = Add()([de_l3, skip])
    de_l4 = transpose_conv_block(de_l3, 2**7, (3, 3), strides=(2, 2))

    de_l4 = LayerNormalization()(Activation("relu")(de_l4))
    de_l4 = Dropout(dropout)(de_l4)
    out = Conv2D(out_class, (1, 1), strides=(1, 1), activation='softmax', padding="same", name="prediction")(de_l4)

    return Model(inputs=input_score, outputs=out)


def my_conv_block(inp, kernels, kernel_size=(3, 3), strides=(1, 1)):
    inp = L.Conv2D(kernels, kernel_size, strides=strides, padding='same', dtype=tf.float32)(inp)
    out = L.Activation("relu")(L.LayerNormalization()(inp))
    out = L.SeparableConv2D(kernels, kernel_size, padding='same', dtype=tf.float32)(out)
    out = L.Activation("relu")(L.LayerNormalization()(out))
    out = L.Dropout(0.3)(out)
    out = L.Add()([inp, out])
    out = L.Activation("relu")(L.LayerNormalization()(out))
    return out


def my_trans_conv_block(inp, kernels, kernel_size=(3, 3), strides=(1, 1)):
    inp = L.Conv2DTranspose(kernels, kernel_size, strides=strides, padding='same', dtype=tf.float32)(inp)
    out = L.Activation("relu")(L.LayerNormalization()(inp))
    out = L.Conv2D(kernels, kernel_size, padding='same', dtype=tf.float32)(out)
    out = L.Activation("relu")(L.LayerNormalization()(out))
    out = L.Dropout(0.3)(out)
    out = L.Add()([inp, out])
    out = L.Activation("relu")(L.LayerNormalization()(out))
    return out


def u_net(win_size=288, out_class=3):
    inp = L.Input(shape=(win_size, win_size, 3))
    tensor = L.SeparableConv2D(128, (3, 3), activation="relu", padding='same')(inp)

    l1 = my_conv_block(tensor, 64, (3, 3), strides=(2, 2))  # 128
    l1 = my_conv_block(l1, 128, (3, 3))
    l1 = my_conv_block(l1, 128, (3, 3))

    skip = my_conv_block(l1, 128, (3, 3), strides=(2, 2))  # 64
    l2 = my_conv_block(skip, 128, (3, 3))
    l2 = my_conv_block(l2, 128, (3, 3))
    l2 = my_conv_block(l2, 128, (3, 3))
    l2 = my_conv_block(l2, 128, (3, 3))
    l2 = L.Concatenate()([skip, l2])

    l3 = my_conv_block(l2, 256, (3, 3))
    l3 = my_conv_block(l3, 256, (3, 3))
    l3 = my_conv_block(l3, 256, (3, 3))
    l3 = my_conv_block(l3, 256, (3, 3))
    l3 = my_conv_block(l3, 256, (3, 3))
    l3 = L.Concatenate()([l2, l3])

    bot = my_conv_block(l3, 256, (3, 3), strides=(2, 2))  # 32
    st1 = L.SeparableConv2D(256, (3, 3), padding='same', dtype=tf.float32)(bot)
    st1 = L.Activation("relu")(L.LayerNormalization()(st1))
    st2 = L.SeparableConv2D(256, (3, 3), dilation_rate=(2, 2), padding='same', dtype=tf.float32)(bot)
    st2 = L.Activation("relu")(L.LayerNormalization()(st2))
    st3 = L.SeparableConv2D(256, (3, 3), dilation_rate=(6, 6), padding='same', dtype=tf.float32)(bot)
    st3 = L.Activation("relu")(L.LayerNormalization()(st3))
    st4 = L.SeparableConv2D(256, (3, 3), dilation_rate=(12, 12), padding='same', dtype=tf.float32)(bot)
    st4 = L.Activation("relu")(L.LayerNormalization()(st4))
    st = L.Concatenate()([st1, st2, st3, st4])
    st = L.Conv2D(256, (1, 1), padding='same', dtype=tf.float32)(st)
    norm = L.Activation("relu")(L.LayerNormalization()(st))
    bot = my_trans_conv_block(norm, 256, (3, 3), strides=(2, 2))  # 64

    tl3 = L.Conv2D(256, (3, 3), padding='same', dtype=tf.float32)(bot)
    tl3 = L.Activation("relu")(L.LayerNormalization()(tl3))
    tl3 = L.Concatenate()([tl3, l3])
    tl3 = my_trans_conv_block(tl3, 128, (3, 3))

    # Head 1
    tl2 = L.Conv2D(128, (3, 3), padding='same', dtype=tf.float32)(tl3)
    tl2 = L.Activation("relu")(L.LayerNormalization()(tl2))
    tl2 = L.Concatenate()([tl2, l2])
    tl2 = my_trans_conv_block(tl2, 128, (3, 3), strides=(2, 2))  # 128

    tl1 = L.Conv2D(128, (3, 3), padding='same', dtype=tf.float32)(tl2)
    tl1 = L.Activation("relu")(L.LayerNormalization()(tl1))
    tl1 = L.Concatenate()([tl1, l1])
    tl1 = my_trans_conv_block(tl1, 128, (3, 3), strides=(2, 2))  # 256

    out1 = L.Conv2D(out_class, (1, 1), activation='softmax', padding='same', dtype=tf.float32)(tl1)

    # Head 2
    # tl2 = L.Conv2D(256, (3, 3), padding='same', dtype=tf.float32)(bot)
    # tl2 = L.Activation("relu")(L.LayerNormalization()(tl2))
    # tl2 = L.Concatenate()([tl2, l2])
    # tl2 = my_trans_conv_block(tl2, 256, (3, 3), strides=(2, 2))  # 128

    # tl1 = L.Conv2D(128, (3, 3), padding='same', dtype=tf.float32)(tl2)
    # tl1 = L.Activation("relu")(L.LayerNormalization()(tl1))
    # tl1 = L.Concatenate()([tl1, l1])
    # tl1 = my_trans_conv_block(tl1, 128, (3, 3), strides=(2, 2))  # 256

    # out2 = L.Conv2D(out_class, (1, 1), activation='softmax', padding='same', dtype=tf.float32)(tl1)
    return tf.keras.Model(inputs=inp, outputs=out1)



In [None]:

class WarmUpLearningRate(tf.keras.optimizers.schedules.LearningRateSchedule):
    def __init__(self, init_lr=0.1, warm_up_steps=1000, decay_step=3000, decay_rate=0.25, min_lr=1e-8):
        self.init_lr = init_lr
        self.warm_up_steps = warm_up_steps
        self.decay_step = decay_step
        self.decay_rate = decay_rate
        self.min_lr = min_lr

        self.warm_step_size = (init_lr - min_lr) / warm_up_steps

    def __call__(self, step):
        warm_lr = self.min_lr + self.warm_step_size * step

        offset = step - self.warm_up_steps
        cycle = offset // self.decay_step
        start_lr = self.init_lr * tf.pow(self.decay_rate, cycle)
        end_lr = start_lr * self.decay_rate
        step_size = (start_lr - end_lr) / self.decay_step
        lr = start_lr - (offset - cycle * self.decay_step) * step_size
        true_lr = tf.where(offset > 0, lr, warm_lr)
        return tf.maximum(true_lr, self.min_lr)

    def get_config(self):
        return {
            "warm_up_steps": self.warm_up_steps,
            "decay_step": self.decay_step,
            "decay_rate": self.decay_rate,
            "min_lr": self.min_lr
        }
    
def train_model(
    dataset_path,
    win_size=288,
    train_val_split=0.1,
    learning_rate=5e-4,
    epochs=15,
    steps=1000,
    batch_size=8,
    val_steps=200,
    val_batch_size=8,
    early_stop=8
):
    # feat_files = get_cvc_data_paths(dataset_path)
    feat_files = get_deep_score_data_paths(dataset_path)
    random.shuffle(feat_files)
    split_idx = round(train_val_split * len(feat_files))
    train_files = feat_files[split_idx:]
    val_files = feat_files[:split_idx]

    print(f"Loading dataset. Train/validation: {len(train_files)}/{len(val_files)}")
    train_data = DsDataLoader(
            train_files,
            win_size=win_size,
            num_samples=epochs*steps*batch_size
        ) \
        .get_dataset(batch_size)
    val_data = DsDataLoader(
            val_files,
            win_size=win_size,
            num_samples=epochs*val_steps*val_batch_size
        ) \
        .get_dataset(val_batch_size)

    print("Initializing model")
    #model = naive_conv(win_size=win_size)
    model = u_net(win_size=win_size, out_class=CHANNEL_NUM)
    #model = semantic_segmentation(win_size=win_size, out_class=CHANNEL_NUM)
    optim = tf.keras.optimizers.Adam(learning_rate=learning_rate)
    #loss = tf.keras.losses.BinaryCrossentropy(label_smoothing=0.1)
    #loss = tf.keras.losses.CategoricalCrossentropy()
    loss = tfa.losses.SigmoidFocalCrossEntropy()
    model.compile(optimizer=optim, loss=loss, metrics=['accuracy'])

    callbacks = [
        tf.keras.callbacks.EarlyStopping(patience=early_stop, monitor='val_accuracy'),
        tf.keras.callbacks.ModelCheckpoint("seg_unet", save_weights_only=False, monitor='val_accuracy')
    ]
    print(train_data)
    """
    print("Start training")
    model.fit(
        train_data,
        validation_data=val_data,
        epochs=epochs,
        steps_per_epoch=steps,
        validation_steps=val_steps,
        callbacks=callbacks,
        verbose = 1
    )
    
    """
    return model

In [None]:
train_model('..\\dataset\\ds2_dense')

In [3]:
[...,1]

[Ellipsis, 1]