<a href="https://colab.research.google.com/github/atsigman/data_pipeline_tutorial/blob/main/music_data_pipeline_tutorial.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Music Data Pipeline Tutorial**

# **0. Installation Steps**

First, clone the tutorial repo and install the music_data_pipeline package:

In [None]:
!pip install  git+https://github.com/atsigman/data_pipeline_tutorial.git@main

Next, download text and audio data (mounting the Google drive does not always work):

In [None]:
!pip install -q gdown
!gdown 1W2V7YbwBSfeECnda0bVFRmR47xmaZYxr -O /content/dpt_data.zip

Unzip the data archive and save to the appropriate subdirectory:

In [None]:
import os
import zipfile

In [None]:
ZIP_PATH = "/content/dpt_data.zip"   # location of the zip in Colab VM
EXTRACT_TO = "/content"
DATA_DIR = "/content/data"
SUBDIR_NAME = os.path.basename(ZIP_PATH)[:-4]

In [None]:
def unzip_file() -> None:
  """
  Unzips archive to target directory.
  """
  os.makedirs(DATA_DIR, exist_ok=True)

  # Unzip:
  with zipfile.ZipFile(ZIP_PATH, "r") as zip_ref:
    zip_ref.extractall(EXTRACT_TO)

  # Rename subdir to "data":
  if SUBDIR_NAME != "data":
    ORIGINAL_DIR = os.path.join(EXTRACT_TO, SUBDIR_NAME)
    if os.path.exists(DATA_DIR):
      import shutil
      shutil.rmtree(DATA_DIR)  # remove if already exists
      os.rename(ORIGINAL_DIR, DATA_DIR)

  print(f"âœ… Audio and text metadata extracted to: {DATA_DIR}")

In [None]:
unzip_file()

In [None]:
# Imports:

import ast
import json
import pandas as pd
import random
import uuid

import torch

from typing import Tuple

from torch.utils.data import Dataset, Subset, DataLoader
from torchaudio.transforms import MFCC

from music_data_pipeline.audio_dataset import AudioDataset
from music_data_pipeline.util.pipeline_utils import (
  validate_prune_data,
  find_similar_audio,
  add_silent_regions,
  chunk_audio,
  tokenize_metadata,
  extract_blacklisted_genres,
)

Now we are ready to explore the dataset and construct a data pipeline!

# **I. Data Preprocessing Pipeline**

The goal of this module will be to analyse and preprocess the audio data and text metadata.

As we shall see, it may be necessary to a) prune the metadata in the event of invalid entries, b) add metadata, and/or c) generate new audio files.

Ultimately, the input data CSV will be converted to a dictionary, which will be saved as a JSON in the `/content/data` directory.

(In the "real world", this data would be stored to a DB, but for the sake of simplicity, we will just serialise it to a file in this tutorial.)

Let's begin by reading in and inspecting the dataset CSV:

## **A. EDA/Dataframe Operations**

In [None]:
df = pd.read_csv(os.path.join(DATA_DIR, "input_data.csv"))

In [None]:
df.shape

In [None]:
df.columns

In [None]:
df.isna().sum()

In [None]:
df.head()

OK, so it looks as though there are 106 samples, and 7 columns (features). 1 audio path is missing.

What do you notice about the data structures for each column?

The first necessary manipulation: now that the data dir lives under `/content`, "content" should be prepended to each `audio_path`





In [None]:
df["audio_path"] = df["audio_path"].apply(lambda x: "/content/" + x if isinstance(x, str) else "")

In [None]:
df.head()

**1. Blacklist Flag Column**

This is just a repository for any "warnings" about entries
that will assist with training data filtering downstream.

In [None]:
df["blacklist_flags"] = [[] for _ in range(len(df))]

In [None]:
df.head()

**2. Add`_id` column**

Assign each sample a unique  `_id`

In [None]:
df["_id"] = [str(uuid.uuid1()) for _ in range(len(df))]

In [None]:
df.columns

**3. Convert "genres" values from string to list**

In [None]:
df["genres"] = df["genres"].apply(lambda x: ast.literal_eval(x))

# **4. Convert the Dataframe to a List of Dictionaries**

For all subsequent operations, the data should be in dictionary format. (This also avoids dealing with the idiomatic quirks of pandas.)

In [None]:
entries = df.to_dict(orient="records")

In [None]:
type(entries)

In [None]:
entries[0]

## **B. Data Validation and Pruning**

As this preprocessing pipeline is positioned upstream, and given the limited dataset size, it would be best to take a conservative approach to making executive data filtering decisions.

So let's consider: under which conditions is a given entry simply not usable as training data?


1.   No audio filepath (remember: we found one such example)
2.   Absolutely no relevant metadata
3.   Duplicate audio path (put a pin in this for later...)

In [None]:
entries = validate_prune_data(entries)

In [None]:
print(f"{len(entries)} remaining entries")

## **C. Duplicate Detection**

Text metadata by itself cannot be fully trusted as the source of truth for data ontology, but the audio can (with a few caveats).

There are multiple approaches to this--e.g., comparing hashes of audio file bytes, or taking the mean absolute distance between 2 audio arrays.

Given the time complexity of this problem (comparing each audio file with every other audio file in the dataset is by default O(N^2), it would make sense to compare compact but rich representations of the audio. In this case, we will use Mel spectrogram embeddings, and compute cosine similarity.

Step 1: extract and cache all embeddings


Step 2: Iterate over inputs
        

*   if 2 entries point to the same audio filepath,
mark one for deletion

*    if 2 entries' similarity score exceeds a given threshold, flag one as a duplicate (but do not delete)


Audio file duration is also computed at this stage.

        


In [None]:
entries = find_similar_audio(entries)

## **D. Text Tokenization**

Now that invalid samples have been removed, the focus can shift to text metadata. The first step is to tokenize all text input.

Since the metadata for this dataset is already relatively tidy, the "tokenization" process in this case will merely consist of converting all text to lower case, and removing hyphens and related characters.

In [None]:
entries = tokenize_metadata(entries)

In [None]:
# Example entry:
entries[0]

## **E. Blacklisted Genre Extraction**

Let's say that certain genre tags strongly correlate with either low-quality or irrelevant data (e.g., podcasts or environmental field recordings).

For this stage, if any "blacklisted" genre tag is detected for a given entry, the flag `bad genre` will be appended to a list of `blacklist_flags`.





In [None]:
entries = extract_blacklisted_genres(entries)

In [None]:
# Entries for which "bad_genre" flag exists:
bad_genre_entries = [e for e in entries if "bad_genre" in e["blacklist_flags"]]
print(bad_genre_entries)

## **F. Chunk/Segment Entries with Long Audio Tracks**
It is not uncommon in certain genres for individual tracks to extend for long durations (e.g., the movement of a symphony, or a meditation track). This poses practical issues in the model training context.

For any samples whose audio duration exceeds a given threshold, the audio will be segmented into subfiles which will be saved to the audio data directory.

For each segment, the "source" entry metadata will be copied, but the audio path and duration will be overwritten. In addition, the source entry `_id ` and partition index will be added.

(Do you see why this step follows text metadata preprocessing?)

In [None]:
entries = chunk_audio(entries)

In [None]:
entries[-6:]

## **G. Silent Region Detection**
Given that the model will be trained on (random) crops of a particular duration, it would be best to avoid exposing it to segments that consist primarily of silence.

As such, silent regions are detected, collected, and logged for each entry. (A silent region is defined as an inter-onset interval > a given threshold.)

(Do you see why this step follows audio segmentation?)  

As for how these regions are handled: this will be outsourced to downstream stages...

In [None]:
entries = add_silent_regions(entries)

In [None]:
silent_region_entries = [e for e in entries if e["silent_regions"]]
len(silent_region_entries)

In [None]:
silent_region_entries

## **H. Serialize to JSON**

In [None]:
with open("/content/data/training_data.json", "w") as f:
  json.dump(entries, f, indent=4)

In [None]:
# (Run this cell in case of runtime disconnection, etc., or just to validate serialization):
with open("/content/data/training_data.json", "r") as f:
  entries = json.load(f)

**Data Pipeline TODOs:**

**Audio**


1.   Music vs. non-music regions/ratio
2.   Instrumental vs. vocal regions
3.   ?Audio fingerprinting?

**Text**
1. Filtering tags by top genres/artists (keep only the most frequent)
2. Named entity resolution (e.g., 2+ renderings of same artist or genre name)
3. Genre name "smoothing" (i.e., mapping multiple genre names to 1 meta-category)
4. ?API pinging for additional metadata?



# **II. Audio Dataset/DataLoader**

Now that we have preprocessed the training data, the next step is to determine how data samples will be varied, transformed, and represented during model training and validation.

## **A. AudioDataset**
Let's construct an AudioDataset. Each `__getitem__()` call returns a waveform tensor and a `TextCondition`.

 The only required argument is the list of entries (training data collection).


In [None]:
ds = AudioDataset(entries)

To inspect output properties, let's print the shape of the audio tensor and the `TextCondition` for one `__getitem__()` call. (Note that audio is mixed down to mono, as the dataset *may* contain a combination of mono and stereo files.)





In [None]:
audio, cond = ds[0]
print(f"Audio tensor shape: {audio.shape}, Text condition: {cond}")

There is a script in tests called `test_dataset.py`, which iterates thrrough the dataset, and prints/collects any errors, but we can )(partially) replicate the functionality here:

In [None]:
def validate_dataset(ds: Dataset, max_idx = 10) -> None:
  """
  Iterates through dataset. Prints audio shapes and text conditions. If there are any
  exceptions thrown, prints the exception.
  """
  for i in range(max_idx):
    try:
      audio, cond = ds[i]
      print(f"{i}: Audio tensor shape: {audio.shape}, Text condition: {cond}")
      if audio.shape[0] != 1:
        print(f"n_channel mismatch at index {i}: should be mono.")
    except Exception as e:
      print(e)

  print("Dataset tests passed!")

In [None]:
validate_dataset(ds)

## **B. AudioDataset with Transform**

There is an option to apply an audio transform to the input. Let's experiment with an MFCC transform.

In [None]:
mfcc_transform = MFCC(
  sample_rate=44100,
  n_mfcc=13,
  melkwargs={"n_fft": 2048, "hop_length": 512, "n_mels": 23}
)


In [None]:
mfcc_ds = AudioDataset(entries, transform=mfcc_transform)

In [None]:
mfcc_spect, cond = mfcc_ds[0]
print(f"Audio tensor shape: {mfcc_spect.shape}, Text condition: {cond}")

**Dataset TODOs:**


1.   Support silent region-aware cropping
2.   Chain augmentations (i.e., select > 1)
3.   ?Other audio representations?


*   Discrete (precomputed?) codebooks?
*   Continuous or discretized embeddings? (Precomputed, or extracted on-the-fly?)












## **C. Train/Test Split**
The Dataset will be split in train and validation subsets. (Be sure that sample are randomly selected for each.)


In [None]:
def train_test_split(ds: Dataset, split_ratio: float = 0.8) -> Tuple[Subset, Subset]:
  """
  Splits a Dataset into training and validation subsets.
  """
  all_idxs = range(len(ds))
  train_len = int(0.8 * len(ds))
  rand_idxs = random.sample(range(len(ds)), k=train_len)
  train_ds = Subset(ds, rand_idxs)

  val_idxs = list(set(all_idxs) - set(rand_idxs))
  val_ds = Subset(ds, val_idxs)

  return train_ds, val_ds


In [None]:
train_ds, val_ds = train_test_split(ds)

In [None]:
print(f"Train dataset size: {len(train_ds)}")
print(f"Validation dataset size: {len(val_ds)}")

## **D. DataLoaders**

In order to transfer data efficiently from CPU (and not load all data into RAM), a `DataLoader` is recommended.

Let's create one for each `Dataset`.

**Collate function**

For correct data batching, a `collate_fn` is necessary to define. As the audio = tensors of uniform dimensions, these can be stacked. The `TextConditions`, are not tensors, however, and should just be concatenated to a list:

In [None]:
def collate_fn(data):
    audio, conditions = zip(*data)
    return torch.stack(audio), list(conditions)

**Train and Validation DataLoaders**

The main difference between the training and validation DataLoader kwargs is that, for the validation set, shuffling data is not required. For the purposes of Colab notebook execution, be sure to set `num_workers` to 0.





In [None]:
train_dl = DataLoader(train_ds, batch_size=4, collate_fn=collate_fn, num_workers=0, pin_memory=False, shuffle=True)
val_dl = DataLoader(val_ds, batch_size=4, collate_fn=collate_fn, num_workers=0, pin_memory=False, shuffle=False)

In [None]:
def validate_dataloader(dl: DataLoader, max_iter: int = 5) -> None:
  for i in range(max_iter):
    audio_batch, cond_batch = next(iter(dl))
    print(f"Audio dims: {audio_batch.shape}, Text batch size: {len(cond_batch)}")

  print("DataLoader tests passed!")

In [None]:
validate_dataloader(val_dl)

# **Conclusion/Next Steps**

With the initialized DataLoaders, one could run a model training/validation loop over some number of epochs, reading samples from CPU in batche and transferring to GPU.

For each epoch, for a given sample, a unique crop and augmentation type will be selected, thereby both expanding the training dataset and teaching the model to generate to and from any valid onset, and be robust to imperceptible (or scarcely perceptible) but computationally distinct representations of the "same" input. (As discussed earlier, the applied augmentations should not degrade the audio samples--this is "ground truth" audio, and the model should be steered towards generating high-quality outputs.)

In the "real world", the dataset would be vastly larger (in most cases, at least 20K hours ++), with greater variance in input duration and data source, but the data pipeline stages/mechanics and Dataset design process covered in this tutorial would be fundamentally equivalent.

Please feel free to experiment with your own data, pipeline enhancements, and Dataset modifications!