# Transform

In [None]:
#| default_exp transform

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| export
import torch
import numpy as np
import soundfile as sf
from fastcore.all import *
from fasttransform import DisplayedTransform, Pipeline
from transformers import AutoFeatureExtractor

from auditus.core import AudioArray

In [None]:
from IPython.display import Audio

## AudioLoader Transform

The `AudioLoader` transform reads in audio file paths with a given sampling rate. The file is loaded into an `AudioArray` object, which contains a 1D NumPy array of the audio signal and the sampling rate.

In [None]:
#| export
class AudioLoader(DisplayedTransform):
    def __init__(self, sr: int = None): store_attr()
    def encodes(self, x:str) -> AudioArray: return self.load_audio(x, self.sr)

    @staticmethod
    def load_audio(path, sr=None): 
        with sf.SoundFile(path) as f: return AudioArray(f.read(), sr if sr else f.samplerate)

Our test files are `.ogg` files with a sampling rate of 32kHz (`32_000`).

In [None]:
sr = 32_000
al = AudioLoader(sr=sr)
test_eq(al.sr, sr)

In [None]:
test_dir = "../test_files"
file_paths = globtastic(test_dir, file_glob="*.ogg")
file_paths

In [None]:
test_path = file_paths[-1]
test_path

### str -> AudioArray

Our test file is a bird song from [Xeno Canto](https://xeno-canto.org/) of approximately 20 seconds. The length should be nearly $32000 \times 20 = 640000$ samples.

In [None]:
audio_arr = al(test_path)
test_eq(audio_arr.sr, sr)
test_eq(audio_arr.shape, (632790,))
audio_arr

In [None]:
audio_arr

## Resampling

The AST (Audio Transformer) model we use requires 16kHz audio. We can use `Resampling` to get audio with the correct sampling rate.

In [None]:
#| export
class Resampling(DisplayedTransform):
    def __init__(self, target_sr: int):
        store_attr()
    def encodes(self, audio: AudioArray) -> AudioArray: return self.process_audio_array(audio)
    
    def process_audio_array(self, audio: AudioArray) -> AudioArray:
        if audio.sr == self.target_sr: return audio
        indices = np.linspace(0, len(audio.a) - 1, self._new_length(audio, self.target_sr))
        resampled = np.interp(indices, np.arange(len(audio.a)), audio.a)
        return AudioArray(resampled, self.target_sr)

    def _new_length(self, audio: AudioArray, target_sr: int) -> int:
        return int(len(audio.a) * (target_sr / audio.sr))

In [None]:
target_sr = 16_000
r = Resampling(target_sr=target_sr)
r

The new length is:

$$l_{new} = l_{old} \frac{sr_{new}}{sr_{old}}$$

, where $l$ is the NumPy array length and $sr$ is the sampling rate. 

In our example:

$$632790 \frac{16000}{32000} = 632790 * 0.5 = 316395

In [None]:
expected_length = 316395
test_eq(r._new_length(audio_arr, target_sr), expected_length)

In [None]:
resampled = r(audio_arr)
test_eq(resampled.sr, target_sr)
test_eq(resampled.shape, (expected_length,))
resampled

In [None]:
Audio(resampled, rate=target_sr)

## AudioEmbedding

`AudioEmbedding` allows us to use HuggingFace Audio models as feature extractors. A great baseline model is the [Audio SpectrogramTransformer](https://huggingface.co/MIT/ast-finetuned-audioset-10-10-0.4593) model, which is the default in `auditus`.

In [None]:
#| export
class AudioEmbedding(DisplayedTransform):
    def __init__(self, model_name: str = "MIT/ast-finetuned-audioset-10-10-0.4593", return_tensors: str = "np", **kwargs): 
        store_attr()
        self.model = AutoFeatureExtractor.from_pretrained(model_name, **kwargs)

    def encodes(self, x:AudioArray): return self.call_model(x.a, x.sr)
    
    def call_model(self, x, sr: int):
        return self.model(x, sampling_rate=sr, return_tensors=self.return_tensors)['input_values']

In [None]:
ae = AudioEmbedding(num_mel_bins=256)
test_eq(ae.model.num_mel_bins, 256)
test_eq(ae.model.sampling_rate, 16_000)
ae.model

### NumPy

In [None]:
emb = ae(resampled)
test_eq(emb.shape, (1, 1024, 256))
emb[0][0][:5]

### Torch

In [None]:
torch_ae = AudioEmbedding(num_mel_bins=256, return_tensors="pt")
torch_emb = torch_ae(resampled)
test_eq(torch_emb.shape, torch.Size([1, 1024, 256]))
torch_emb[0][0][:5]

### Custom model

Any audio model on the HuggingFaceHub can be used to get audio embeddings. Here we use a [fine-tuned AST model](https://huggingface.co/xpariz10/ast-finetuned-audioset-10-10-0.4593_ft_env_aug_0-2).

In [None]:
custom_ae = AudioEmbedding(model_name="xpariz10/ast-finetuned-audioset-10-10-0.4593_ft_env_aug_0-2", num_mel_bins=256, return_tensors="np")
custom_emb = custom_ae(resampled)
test_eq(custom_emb.shape, (1, 1024, 256))
custom_emb[0][0][:5]


In [None]:
emb[0].shape

In [None]:
custom_emb[0].shape

## Pooling

In [None]:
#| export
class Pooling(DisplayedTransform):
    def __init__(self, pooling: str = None):
        assert pooling in [None, "mean", "max"], "Pooling must be either None (no pooling), 'mean' or 'max'."
        store_attr()

    def encodes(self, x:np.ndarray) -> np.ndarray: 
        if self.pooling is None: return x
        elif self.pooling == "mean": return x.mean(axis=1)
        elif self.pooling == "max": return x.max(axis=1)

    def encodes(self, x:torch.Tensor) -> torch.Tensor: 
        if self.pooling is None: return x
        # Torch aggregation also returns a tuple with max indices, so we need to unpack it
        elif self.pooling == "mean": return x.mean(dim=1)[0]
        elif self.pooling == "max": return x.max(dim=1)[0]

In [None]:
mean_pooled = Pooling(pooling="mean")
mean_pooled

In [None]:
test_emb = np.array([[
    [0.1, 0.2, 0.1],
    [0.1, 0.2, 0.9],
    [0.8, 0.6, 0.0]
]])
test_emb.shape

If `pooling=None`, the input is returned unchanged.

In [None]:
none_pooler = Pooling()
none_pooled = none_pooler(test_emb)
test_eq(none_pooled, test_emb)
none_pooled

If `pooling="mean"`, the mean of each embedding is taken.

In [None]:
mean_pooler = Pooling(pooling="mean")
mean_pooled = mean_pooler(test_emb)
test_eq(mean_pooled, np.array([[1/3, 1/3, 1/3]]))
mean_pooled

If `pooling="max"`, the maximum of each embedding is taken.

In [None]:
max_pooler = Pooling(pooling="max")
max_pooled = max_pooler(test_emb)
test_eq(max_pooled, np.array([[0.8, 0.6, 0.9]]))
max_pooled

The Pooler can handle Torch tensors as well.

In [None]:
torch_emb = torch.tensor(test_emb)
torch_pooled = Pooling(pooling="mean").encodes(torch_emb)
test_eq(torch_pooled, torch.tensor([[1/3, 1/3, 1/3]], dtype=torch.float64))
torch_pooled

The Pooler can handle multiple embeddings at once.

In [None]:
multi_emb = np.array([test_emb[0]] * 2)
test_eq(multi_emb.shape, (2, 3, 3))

In [None]:
multi_pooled = Pooling(pooling="mean").encodes(multi_emb)
test_eq(multi_pooled, np.array([[1/3, 1/3, 1/3], 
                                [1/3, 1/3, 1/3]]))
test_eq(multi_pooled.shape, (2, 3))
multi_pooled

## Pipeline

We can now compose a pipeline that loads an audio file with a sampling rate of 32kHz, resamples it to 16kHz, embeds it and max-pools the result.

In [None]:
pipe = Pipeline([al, r, ae, max_pooler])

In [None]:
emb = pipe(test_path)
test_eq(emb.shape, (1, 256))
emb[0][:5]

In [None]:
#| export
class AudioPipeline(Pipeline):
    def __init__(self, 
                 model_name: str = "MIT/ast-finetuned-audioset-10-10-0.4593", 
                 return_tensors: str = "np",
                 target_sr: int = 16_000, 
                 pooling: str = "max", 
                 **kwargs):
        super().__init__([
            AudioLoader(),
            Resampling(target_sr),
            AudioEmbedding(model_name, return_tensors, **kwargs),
            Pooling(pooling)
        ])

In [None]:
pipe = AudioPipeline(num_mel_bins=256, return_tensors="pt")
emb = pipe(test_path)
test_eq(emb.shape, torch.Size([1, 256]))
emb[0][:5]

In [None]:
# Multiple audio files in Torch
multi_emb = torch.stack([pipe(f).squeeze(0) for f in file_paths])
test_eq(multi_emb.shape, torch.Size([2, 256]))
multi_emb[:, :5]

In [None]:
# Multiple audio files in NumPy
pipe = AudioPipeline(num_mel_bins=256, return_tensors="np")
multi_emb = np.stack([pipe(f).squeeze(0) for f in file_paths])
test_eq(multi_emb.shape, (2, 256))
multi_emb[:, :5]