# Room acoustics example.

Workshop this example a bit, make it so that there is some kind of classification pipeline 

In [5]:
import numpy as np

from datumlib import collect, Datum
import datumlib as dl

from itertools import product
from dataclasses import dataclass

from dataclasses import field


@dataclass(frozen=True)
class TimeSeries(Datum[np.ndarray]):
    sample_rate: float = field(kw_only=True)

    @classmethod
    def create(cls, data: np.ndarray, sample_rate: float, *, tags={}):
        return cls(data, sample_rate=sample_rate, tags=tags)

    @property
    def n_samples(self) -> int:
        return len(self.data)

    @property
    def duration(self) -> float:
        return self.n_samples / self.sample_rate

    @property
    def is_valid(self) -> bool:
        return True if len(self.data.shape) == 1 else False


# Example usage
t = TimeSeries.create(
    np.array([1, 2, 3]),
    44100,
    tags={
        "id": "1234-4321",
        "signal-strength": 10,
    },
)

dl.display_datum(t)

Generate some fake IRs with source and receiver data and 'known' T60.

In [6]:
rng = np.random.default_rng(42)
sample_rate = 32_000

sources: dict[str, tuple[float, float, float]] = {
    "S1: Corner": (0.0, 0.0, 0.0),
    "S2: Listener": (0.5, 0.5, 0.5),
}

receivers: dict[str, tuple[float, float, float]] = {
    "R1": (0.1, 0.2, 0.3),
    "R2": (0.6, 0.7, 0.5),
    "R3": (0.4, 0.2, 0.2),
    "R4": (0.1, 0.5, 0.7),
    "R5": (0.4, 0.5, 1.5),
}


def generate_rir(length=sample_rate * 8, rt60=0.6, noise=1e-4):
    t = np.arange(length) / sample_rate

    # amplitude decay consistent with RT60 definition
    decay = np.exp(-6.91 * t / rt60)

    rir = decay * rng.standard_normal(length)
    rir[0] = 1.0
    rir += noise * rng.standard_normal(length)

    return rir


# make decaying signals
signals = []
for (src_id, src_loc), (rec_id, rec_loc) in product(sources.items(), receivers.items()):
    rt60 = rng.uniform(0.3, 1.1)
    rir = generate_rir(rt60=rt60)

    # append metadata to each thing
    signals.append(
        TimeSeries(
            rir,
            sample_rate=sample_rate,
            tags={
                "rt60_true": rt60,
                "source_id": src_id,
                "source_loc": src_loc,
                "receiver_id": rec_id,
                "receiver_loc": rec_loc,
            },
        )
    )


In [7]:
# collect into `MonoCollection`
collection = collect(*signals, tags={"dataset": "synthetic_rooms"})
dl.display_collection(collection)

Define a preprocessing pipeline

In [8]:
from datumlib import Datum
from datumlib import datum_util as dutil
from datumlib import PipelineDict


@dl.cmap
def truncate(d: TimeSeries) -> TimeSeries:
    trunc_sec = 2
    return d.with_data(d.data[: trunc_sec * d.sample_rate])


# lift data function to operate on datum inputs
@dl.cmap
@dutil.over_data
def normalize_func(x: np.ndarray) -> np.ndarray:
    return x / np.max(np.abs(x))


@dl.cmap
def tag_processed(d: Datum) -> Datum:
    return d.add_tags("processed", True)


# define a preprocessing pipeline. This will sequentially process
# the signals in `collection` and return an updated collection
pipeline = PipelineDict(
    {
        "normalize": normalize_func,
        "truncate": truncate,
        "mark_processed": tag_processed,
    }
)

# Process collection via pipeline and visualize progress
collection = pipeline(collection, progress_meter=True)
dl.display_collection(collection)


Processing DatumCollection:  67%|██████▋   | 2/3 [00:00<00:00, 101.92step/s]


TypeError: Datum.add_tags() takes 2 positional arguments but 3 were given

Ideas, add way to compare collections, or interact (like modulate one collection with another by matching metadata field or sth)