# Optical Music Recognition - Staff Removal
In this Notebook we are going to download the CVC Muscima Dataset, and try to do staff removal, using the ground truth provided in the dataset.
We are going to use Tensorflow as our backend. 

## What is Staff Removal?
Staff Removal is one of the very early steps of preprocessing music scores. The task is to remove the horizontal lines that usually indicate the pitch. Without the staff lines, the classification of symbols (notes, clefs, rests) may be easier. It also might be easier to isolate the symbols, as they are not connected by lines anymore.

In [None]:
import requests
import os
import zipfile
import shutil
import tensorflow as tf
import keras
import matplotlib.pyplot as plt
from PIL import Image
from tqdm.notebook import tqdm

In [None]:
# Convenience Functions

# For sanity Checking of the Hyperparams in cfg make sense
def find_largest_image_resolution(root, valid_extensions = [".png"]):
    max_resolution = (0, 0)
    max_image_path = None

    folder_paths = os.listdir(root)
    # probably mac residuals, not a folder
    folder_paths = remove_macos(folder_paths)

    for folder_path in folder_paths:
        for root, _, files in os.walk(os.path.join(root, folder_path)):
            for file in files:
                if os.path.splitext(file)[1].lower() in valid_extensions:
                    image_path = os.path.join(root, file)
                    try:
                        with Image.open(image_path) as img:
                            width, height = img.size
                            if width * height > max_resolution[0] * max_resolution[1]:
                                max_resolution = (width, height)
                                max_image_path = image_path
                    except Exception as e:
                        print(f"Error reading {image_path}: {e}")

    return max_resolution, max_image_path

# Helper function to remove residuals from zipping
def remove_macos(dir_cont: list) -> list:
    if ".DS_Store" in dir_cont:
        dir_cont.remove(".DS_Store")
    return dir_cont

# Directory crawler for creating data/target file paths.
def crawler(data_root: str = "data", target_root: str = "target") -> tuple[tf.Tensor, tf.Tensor]:
    data_dir = remove_macos(
        sorted(
            os.listdir(data_root)
        )
    )
    target_dir = remove_macos(
        sorted(
            os.listdir(target_root)
        )
    )
    
    data = []
    targets = []

    if not (data_dir == target_dir):
        raise ValueError(f"Content of the directories not identical.\nContent of Data: {data_dir}\nContent of Target: {target_dir}")
    
    for dir in data_dir:
        if os.path.isdir(os.path.join(data_root, dir)) and os.path.isdir(os.path.join(target_root, dir)):
            # grab the contents of both directories
            data_dir_cont = remove_macos(
                sorted(
                    os.listdir(
                        os.path.join(data_root, dir)
                    )
                )
            )
            target_dir_cont = remove_macos(
                sorted(
                    os.listdir(
                        os.path.join(target_root, dir)
                    )
                )
            )

            if data_dir_cont == target_dir_cont:

                # extend the file names to full paths
                data_dir_cont = [
                    os.path.join(data_root, dir, file) for file in data_dir_cont
                ]

                target_dir_cont = [
                    os.path.join(target_root, dir, file) for file in target_dir_cont
                ]

                data.extend(data_dir_cont)
                targets.extend(target_dir_cont)
                
            else:
                raise Exception(f"Contents of data and target do not match.\nData: {data_dir_cont}\nTarget: {target_dir_cont}")
        else:
            raise Exception(f"Not a directory: {dir} in {data_root}.")
    
    return tf.convert_to_tensor(data, dtype = tf.string), tf.convert_to_tensor(targets, dtype = tf.string)


def visualize_images(
        images, 
        titles = None, 
        vmin = -1, 
        vmax = 1
    ):

    n = len(images)
    titles = titles or [f"Image {i+1}" for i in range(n)]

    fig, ax = plt.subplots(1, n, figsize = (5 * n, 5))

    if n == 1:
        ax = [ax]  # ensure ax is iterable

    for i, (img, axis) in enumerate(zip(images, ax)):
        # Convert to numpy and squeeze singleton dims
        img = tf.squeeze(img)
        img = img.numpy() if isinstance(img, tf.Tensor) else img

        axis.imshow(img, cmap='gray', vmin=vmin, vmax=vmax)
        axis.set_title(titles[i])
        axis.axis('off')

    fig.tight_layout()
    fig.show()

In [None]:
# Downalad an unzip dataset
def download_extract_zip(filename, extract_to):
    try:
        url = "http://datasets.cvc.uab.es/muscima/CVCMUSCIMA_WI.zip"
        response = requests.get(url, stream=True)
        response.raise_for_status()
        
        # Download with progress bar
        total_size = int(response.headers.get('content-length', 0))
        if os.path.exists(filename) and os.path.getsize(filename) >= 984118102:
            print("File exists and is of correct size (or bigger)")
        else:
            with open(filename, 'wb') as f, tqdm(
                desc = f"Downloading {filename}",
                total = total_size,
                unit = 'B',
                unit_scale = True,
                unit_divisor = 1024,
            ) as bar:
                for chunk in response.iter_content(chunk_size=8192):
                    if chunk:
                        f.write(chunk)
                        bar.update(len(chunk))
            print(f"Downloaded {filename}.")

    except Exception as e:
        print(f"Failed to download {filename}: {e}")
        
        
    # Check if the folder already exists, and create it if not
    if not os.path.exists(extract_to) or not os.path.isdir(extract_to):
        os.makedirs(extract_to)
        print(f"Created directory {extract_to}.")


    # Extract only if the files are not already present
    if not ("PNG_GT_Gray" and "PNG_GT_NoStaff") in os.listdir(extract_to):
        with zipfile.ZipFile(filename, 'r') as zip_ref:
            zip_ref.extractall(extract_to)
        print(f"Extracted {filename} to {extract_to}.")
    
        # Make folder structure more accessible
        for filename in os.listdir(os.path.join(extract_to, 'CVCMUSCIMA_WI')):
            shutil.move(
                os.path.join(extract_to, 'CVCMUSCIMA_WI', filename),
                os.path.join(extract_to, filename)
            )
        os.rmdir(os.path.join(extract_to, 'CVCMUSCIMA_WI'))
        print(f"Moved files from {os.path.join(extract_to, 'CVCMUSCIMA_WI')} to {extract_to} and removed the empty directory.")

        # We want to train on Grayscale Images, not on Black/White
        shutil.rmtree(
            os.path.join(
                extract_to,
                "PNG_GT_BW"
                )
            )  
        print(f"Removed {os.path.join(extract_to, 'PNG_GT_BW')} directory and its contents.")

@tf.function
def load_and_preprocess(
        data_path: tf.Tensor, 
        target_path: tf.Tensor,
        width: int = 1280,
        height: int = 720
    ):
    # Read and decode images
    data_img = tf.io.read_file(data_path)
    data_img = tf.io.decode_png(
        contents = data_img,
        channels = 1,
        dtype = tf.uint8
    )
    # Invert, since resice_with_pad fills pad with 0s
    data_img = tf.uint8.max - data_img
    
    # No Inversion of target, since the file is 0 (background) and 255 (Notes)
    target_img = tf.io.read_file(target_path)
    target_img = tf.io.decode_png(
        contents = target_img, 
        channels = 1,
        dtype = tf.uint8
    )

    # Resize
    data_img = tf.image.resize_with_pad(
        image = data_img,
        target_height = height,
        target_width = width,
        # Not using Anti-Aliasing makes the model perform worse 
        # and the input image a lot more incoherent 
        antialias = True
        )
    # Revert
    data_img = tf.uint8.max - data_img

    target_img = tf.image.resize_with_pad(
        image = target_img,
        target_height = height,
        target_width = width,
        antialias = True
        )
    # And make target similar to Data (255 = Background)
    target_img = tf.uint8.max - target_img

    data_img = tf.cast(
        x = data_img, 
        dtype = tf.float32
        )
    target_img = tf.cast(
        x = target_img,
        dtype = tf.float32
    )

    # Shift to [-1, 1]
    data_img = data_img / 127.5 - 1
    target_img = target_img / 127.5 - 1

    return data_img, target_img

def create_dataset(
        data_paths: tf.Tensor, 
        target_paths: tf.Tensor, 
        img_width: int = 1280,
        img_height: int = 720,
    ) -> tf.data.Dataset:

    dataset = tf.data.Dataset.from_tensor_slices((data_paths, target_paths))

    dataset = dataset.map(
        lambda x, y: load_and_preprocess(
                data_path = x, 
                target_path = y, 
                width = img_width, 
                height = img_height
            ),
        num_parallel_calls = tf.data.AUTOTUNE
    )

    return dataset

## Define Hyperparameter
The image resolution, and the low batch size are stemming from my low-VRAM (8GB) GPU. Adjust these to make it better on your system

In [None]:
cfg = {
    "path": "data",
    "img_width": int(3479 / 4),
    "img_height": int(2466 / 4),
    "batch": 8,
    "shuffle_buffer": 2,
    "epochs": 5
}

## Downloading the dataset
In this task we are going to use the CVC-Muscima dataset. After downloading it from the [Autonomous University of Barcelona](http://datasets.cvc.uab.es/muscima/CVCMUSCIMA_WI.zip), the dataset will be present as `cvc.zip`. The function `download_extract_zip()` will thereafter extract it to a new folder `./data`, and then shuffle some files around.

If the download fails, you can download it manually, put it in the same folder as this notebook and execute the `download_extract_zip()`, passing the filename as it is on your system as first parameter. The function will recognize the file and unzip it, and ensure proper folder structure for the rest of the code.

### Regarding Splits
Standard would be splitting the dataset into training and testing - if not also validation - sets. However, this requires to load the full dataset into memory, and this is not possible on many systems with low to mid (maybe even high) capacity (tested with 32GB). This is mainly because the images are pretty high-resolution.

In [None]:
# Download and extract the dataset
download_extract_zip("cvc.zip", "data")

# Checking if our image dimensions make sense
print(find_largest_image_resolution("data/PNG_GT_Gray"))
# Sanity Check, should equal the other
print(find_largest_image_resolution("data/PNG_GT_NoStaff"))

data, target = crawler(
        data_root = os.path.join("data", "PNG_GT_Gray"),
        target_root = os.path.join("data", "PNG_GT_NoStaff")
    )
ds = create_dataset(
    data_paths = data,
    target_paths = target,
    img_width = cfg["img_width"],
    img_height = cfg["img_height"],
)

ds = ds.shuffle(cfg["shuffle_buffer"]).batch(cfg["batch"]).prefetch(tf.data.AUTOTUNE)

# Model Definition
Below a Model with 3 CNN layers is defined. To see how it would look like with e.g. a fourth layer, uncomment the code. \
The decreasing kernel size with increasing network depth is inspired by https://doi.org/10.1007/978-3-319-58838-4_31. Or paper [51]

In [None]:
inputs = keras.Input(
    shape = (cfg["img_height"], cfg["img_width"],1),
    batch_size = cfg["batch"],
    dtype = tf.float32
    )
x = keras.layers.Conv2D(
        filters = 64,
        kernel_size = 15,
        strides = 1,
        padding = "same",
        activation = "tanh"
    )(inputs)
# x = keras.layers.Conv2D(
#         filters = 32,
#         kernel_size = 5,
#         strides = 1,
#         padding = "same",
#         activation = "tanh"
#     )(x)
x = keras.layers.Conv2D(
        filters = 16,
        kernel_size = 5,
        strides = 1,
        padding = "same",
        activation = "tanh"
    )(x)
out = keras.layers.Conv2D(
        filters = 1,
        kernel_size = 3,
        strides = 1,
        padding = "same",
        activation = "tanh"
    )(x)

model = keras.Model(
    inputs = inputs,
    outputs = out
)
model.compile(
    optimizer = "adam",
    loss = "mse",
    # metrics = ["accuracy"]
)
model.summary()
#

In [None]:
model.fit(ds, epochs = cfg["epochs"])

Let us take a look on how good our model is by now!

In [None]:
for x, y in ds.take(1):
    pred = model(x)
    visualize_images(
        images = [x[0], pred[0], y[0]],
        titles = ["Original", "Prediction", "Target"]
    )
