# Fast Data Pipeline with Threading

The GIL means that threads are not always helpful in Python, but there are two special cases where Python threading can be helpful:
* I/O operations can occur without blocking the GIL, and
* Calls to C++/Rust libraries (eg, many librosa operations) release the GIL while doing the heavy lifting.

In both of these cases, we can improve execution speed by parallelizing effectively. 

This is particularly important for *data pipelines*. When training a model, we typically need to feed it subsets of our (gigantic, too big to fit in memory) dataset. Without threading, we need to load the next batch of data, wait for the model to evaluate and update, then load the next batch of data, and so on. This means that our fancy GPU is going to waste time while we load the data!

Instead, we can load data *while* the model is running, and (hopefully) have the next batch of data ready and waiting. This greatly improves throughput. At the same time, we can make the data loader multi-threaded, to load data from more files in parallel (ie, faster).

Let's start with a very simple example of how to use a `ThreadPoolExecutor`.

To use the executor:
1) Set it up, with some number of `max_workers`.
2) Use `executor.submit(fn, arg1, arg2, ...)` to give the workers some work. The worker will run the submitted function, and evaluate it with the provided arguments - `fn(arg1, arg2, ...)`. This submit call returns a *future* object, which you should store for later to get the results.
3) Get results from the future objects.

Here's some example code to get you started.

In [1]:
from concurrent.futures import ThreadPoolExecutor

# The function we want the workers to evaluate.
fun = lambda a: a**2

# Create an `executor` that we can pass work to.
with ThreadPoolExecutor(max_workers=2) as executor:
  results = []
  for i in range(10):
    # Each result is a `future` object, which can tell us whether
    # it is still running, and eventually provide a result.
    results.append(executor.submit(fun, i))
  results = [r.result() for r in results]
  print(results)

# We can also use a `map` approach more concisely.
with ThreadPoolExecutor(max_workers=2) as executor:
  # map_results is a generator, yielding results as they are ready.
  map_results = executor.map(fun, range(10))
  print(list(map_results))


[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]


In [None]:
import dataclasses
from typing import Generator
from etils import epath
import numpy as np
import librosa
import tqdm

@dataclasses.dataclass
class AudioExample:
  audio: np.ndarray
  file_id: str
  offset: float


In [None]:
# ------------------------------------------------------------
# 📝 Exercise 0 – Plain Old Data Loader
# ------------------------------------------------------------
# First, write a data loader which loads audio data from a
# directory. Feel free to re-use your answer from Exercise 2
# in the `03_anuraset_train` notebook.
# ------------------------------------------------------------

def audio_chunk_dataloader(
      file_glob, target_sample_rate, window_size_s, shuffle=False):
    """
    Args:
        file_glob: str, glob pattern for the audio files
        target_sample_rate: int, sample rate to load the audio files
        window_size_s: float, size of the audio chunks in seconds
        shuffle: bool, whether to shuffle the order of the audio files in the glob; use a fixed seed for reproducibility
    Yields:
        audio_window: np.ndarray, audio chunk
        file: str, path to the audio file
        offset: float, start time of the audio chunk in seconds

    Hint:
    for file in glob:
      for audio_window in file:
        yield audio_window, file, offset
    """
    raise NotImplementedError

## Test:
DATA_DIR   = Path('/mnt/class_data/anuraset')
RAW_DIR    = DATA_DIR / 'raw_data'         # long recordings (.wav)

file_glob = str(RAW_DIR / '**/*.wav')
target_sample_rate = 16000
window_size_s = 5.0
dataloader = audio_chunk_dataloader(file_glob, target_sample_rate, window_size_s, shuffle=True)
num_to_test = 15
for i, (audio_chunk, file, offset) in tqdm(enumerate(dataloader)):
    print(f"Loaded {file} at {offset:.2f}s")
    display(Audio(audio_chunk, rate=target_sample_rate))
    if i > num_to_test: break

In [None]:
# ------------------------------------------------------------
# 📝 Exercise 1 – Threaded Data Loader
# ------------------------------------------------------------
# Next, write a threaded version of the data loader.
# Once you have an implementation that is working, play with
# different values of `max_workers` to see how it affects
# the speed of loading the data. What's optimal?
# ------------------------------------------------------------

def audio_chunk_dataloader_threaded(
      file_glob, target_sample_rate, window_size_s, 
      shuffle=False, max_workers=2):
  """Threaded version of the audio chunk data loader."""
  raise NotImplementedError(
      'Implement the threaded_data_loader function to load audio files from a directory.'
  )

file_glob = str(RAW_DIR / '**/*.wav')
target_sample_rate = 16000
window_size_s = 5.0
dataloader = audio_chunk_dataloader(file_glob, target_sample_rate, window_size_s, shuffle=True)
num_to_test = 15
for i, (audio_chunk, file, offset) in tqdm(enumerate(dataloader)):
    print(f"Loaded {file} at {offset:.2f}s")
    display(Audio(audio_chunk, rate=target_sample_rate))
    if i > num_to_test: break

In [None]:
# ------------------------------------------------------------
# 📝 Exercise 2 – Fix a Bug!
# ------------------------------------------------------------
# Our current data iterator is pretty good, but has a subtle 
# failure mode: If we process the data slower than the 
# iterator loads the data, we can wind up with an ever-growing 
# backlog of loaded data, until we run out of memory. Refactor 
# the data loader so that it only preloads a fixed number of 
# examples at a time.
# ------------------------------------------------------------
