In [None]:
import mms_msg
import lazy_dataset

In [None]:
# Create an example dataset
input_ds = lazy_dataset.new({
    'example1': {
        'audio_path': '1.wav',
        'speaker_id': 'A',
        'num_samples': 5*8000,
        'dataset': 'test'
    },
    'example2': {
        'audio_path': '1.wav',
        'speaker_id': 'B',
        'num_samples': 6*8000,
        'dataset': 'test'
    },
    'example3': {
        'audio_path': '1.wav',
        'speaker_id': 'A',
        'num_samples': 4*8000,
        'dataset': 'test'
    },
    'example4': {
        'audio_path': '1.wav',
        'speaker_id': 'C',
        'num_samples': 5*8000,
        'dataset': 'test'
    }
})

In [None]:
ds = mms_msg.sampling.source_composition.get_composition_dataset(input_ds, num_speakers=2)

In [None]:
def my_sampling_function(example):
    # Get a deterministic random number generator based on the input example
    # and an additional seed string. The seed string ensures that the RNGs
    # differ between different sampling modules
    rng = mms_msg.sampling.utils.rng.get_rng_example(example, 'my_sampler')
    example['my_random_number'] = rng.random()
    return example
ds.map(my_sampling_function)[0]

## Example: Sample "sequential" (non-overlapping) mixtures

In [None]:
from dataclasses import dataclass

@dataclass(frozen=True)
class SequentialOffsetSampler:
    # Parameters of the sampler
    min_gap: int = 0
    max_gap: int = 8000

    def __post_init__(self):
        # Validate parameters
        assert self.min_gap >= 0, self.min_gap
        assert self.min_gap < self.max_gap, (self.min_gap, self.max_gap)

    def __call__(self, example):
        # Get a deterministic random number generator
        rng = mms_msg.sampling.utils.rng.get_rng_example(example, 'sequential_offset_sampler')

        # Sample the offsets such that the utterances don't overlap.
        # Sample a gap uniformly between the given min and max gap sizes
        offsets = []
        current_offset = 0
        for length in example['num_samples']['original_source']:
            current_offset += rng.uniform(self.min_gap, self.max_gap)
            offsets.append(current_offset)
            current_offset += length
        mms_msg.sampling.pattern.classical.offset.assign_offset(example, offsets)
        return example

In [None]:
mms_msg.visualization.plot.plot_meeting(ds.map(SequentialOffsetSampler())[0])