Skip to content

Commit

Permalink
feat!: move segmentation to subpackage
Browse files Browse the repository at this point in the history
- update models
- update internal call
- fix import
- rename segmentation classes
  • Loading branch information
Teagum committed Jul 14, 2023
1 parent adec2cf commit 9f5b88c
Show file tree
Hide file tree
Showing 9 changed files with 253 additions and 231 deletions.
32 changes: 0 additions & 32 deletions src/apollon/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,9 @@
"""
common models
"""

from dataclasses import dataclass

from pydantic import BaseModel

from . signal.models import StftParams
from . types import NDArray


class PeakPickingParams(BaseModel):
Expand All @@ -18,34 +14,6 @@ class PeakPickingParams(BaseModel):
delta: float


class LazySegmentParams(BaseModel):
n_perseg: int
n_overlap: int
norm: bool = False
mono: bool = True
expand: bool = True
dtype: str = "float64"


class SegmentationParams(BaseModel):
n_perseg: int = 512
n_overlap: int = 256
extend: bool = True
pad: bool = True


@dataclass
class Segment:
idx: int
start: int
stop: int
center: int
n_frames: int
data: NDArray




class FluxOnsetDetectorParams(BaseModel):
stft_params: StftParams
pp_params: PeakPickingParams
2 changes: 1 addition & 1 deletion src/apollon/onsets/detectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def __init__(self, fps: int, m_dims: int = 3, delay: int = 10,
self.m_dims = m_dims
self.bins = bins
self.delay = delay
self.cutter = aseg.Segmentation(n_perseg, n_overlap)
self.cutter = aseg.ArraySegmentation(n_perseg, n_overlap)

self._params: models.EntropyODParams = models.EntropyODParams(fps=fps, m_dim=m_dims,
delay=delay, bins=bins,
Expand Down
15 changes: 15 additions & 0 deletions src/apollon/segment/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# pylint: disable = C0114

from ._segment import Segments, ArraySegmentation, FileSegmentation
from ._utils import by_samples, by_ms, by_onsets
from . import models

__all__ = (
"Segments",
"ArraySegmentation",
"FileSegmentation",
"by_samples",
"by_ms",
"by_onsets",
"models"
)
175 changes: 12 additions & 163 deletions src/apollon/segment.py → src/apollon/segment/_segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@
Time series segmentation utilities
"""

from typing import Generator, Tuple
from typing import Generator

import numpy as _np
import numpy as np
from numpy.lib.stride_tricks import as_strided

from . audio import AudioFile
from . models import SegmentationParams, Segment
from . signal.tools import zero_padding as _zero_padding
from . types import FloatArray, IntArray, NDArray
from apollon.audio import AudioFile
from apollon.types import FloatArray, NDArray
from apollon.segment.models import SegmentationParams, Segment


class Segments:
Expand Down Expand Up @@ -67,7 +66,7 @@ def center(self, seg_idx: int) -> int:
raise IndexError('Requested index out of range.')
return seg_idx * self.step + self._offset

def bounds(self, seg_idx: int) -> Tuple[int, int]:
def bounds(self, seg_idx: int) -> tuple[int, int]:
"""Return the frame numbers of the lower and upper bound
of segment ``seg_idx``. Lower bound index is inclusive,
upper bound index is exclusive.
Expand Down Expand Up @@ -100,12 +99,12 @@ def get(self, seg_idx: int) -> Segment:

def __iter__(self) -> Generator[FloatArray, None, None]:
for seg in self._segs.T:
yield _np.expand_dims(seg, 1)
yield np.expand_dims(seg, 1)

def __getitem__(self, key: int) -> FloatArray:
out = self._segs[:, key]
if out.ndim < 2:
return _np.expand_dims(out, 1)
return np.expand_dims(out, 1)
return out

def __repr__(self) -> str:
Expand All @@ -115,7 +114,7 @@ def __str__(self) -> str:
return f'<n_segs: {self.n_segs}, len_seg: {self._params.n_perseg}>'


class Segmentation:
class ArraySegmentation:
# pylint: disable = R0903
"""Segementation"""
def __init__(self, n_perseg: int, n_overlap: int, extend: bool = True,
Expand Down Expand Up @@ -173,7 +172,7 @@ def transform(self, data: FloatArray) -> Segments:
if self._pad:
self._pad_len = (-(n_frames-self.n_perseg) % step) % self.n_perseg

data = _np.pad(data.squeeze(), (self._ext_len, self._ext_len+self._pad_len))
data = np.pad(data.squeeze(), (self._ext_len, self._ext_len+self._pad_len))
new_shape = data.shape[:-1] + ((data.shape[-1] - self.n_overlap) // step, self.n_perseg)
# see https://github.com/PyCQA/pylint/issues/7981
# pylint: disable = E1136
Expand Down Expand Up @@ -202,7 +201,7 @@ def _validate_data_shape(self, data: FloatArray) -> None:
raise ValueError(msg)


class LazySegments:
class FileSegmentation:
# pylint: disable = too-many-instance-attributes, too-many-arguments
"""Read segments from audio file."""
def __init__(self, snd: AudioFile, n_perseg: int, n_overlap: int,
Expand All @@ -223,7 +222,7 @@ def __init__(self, snd: AudioFile, n_perseg: int, n_overlap: int,
self.n_perseg = n_perseg
self.n_overlap = n_overlap
self.expand = expand
self.n_segs = int(_np.ceil(self._snd.n_frames / n_overlap))
self.n_segs = int(np.ceil(self._snd.n_frames / n_overlap))
if expand:
self.n_segs += 1
self.offset = -self.n_perseg // 2
Expand Down Expand Up @@ -312,153 +311,3 @@ def iter_bounds(self) -> Generator[tuple[int, int], None, None]:
"""Iterate over segment boundaries"""
for i in range(self.n_segs):
yield self.compute_bounds(i)


def _by_samples(arr: FloatArray, n_perseg: int) -> FloatArray:
"""Split ``arr`` into segments of lenght ``n_perseg`` samples.
This function automatically applies zero padding for inputs that cannot be
split evenly.
Args:
arr: One-dimensional input array
n_perseg: Length of segments in samples
Returns:
Two-dimensional array of segments
"""
if not isinstance(n_perseg, int):
raise TypeError('Param ``n_perchunk`` must be of type int.')

if n_perseg < 1:
raise ValueError('``n_perchunk`` out of range. '
'Expected 1 <= n_perchunk.')

fit_size = int(_np.ceil(arr.size / n_perseg) * n_perseg)
n_ext = fit_size - arr.size
arr = _zero_padding(arr, n_ext)
return arr.reshape(-1, n_perseg)


def _by_samples_with_hop(arr: FloatArray, n_perseg: int, hop_size: int) -> FloatArray:
"""Split `arr` into segments of lenght `n_perseg` samples. Move the
extraction window `hop_size` samples.
This function automatically applies zero padding for inputs that cannot be
split evenly.
Args:
arr: One-dimensional input array
n_perseg: Length of segments in samples
hop_size: Hop size in samples
Returns:
Two-dimensional array of segments
"""
if not (isinstance(n_perseg, int) and isinstance(hop_size, int)):
raise TypeError('Params must be of type int.')

if not 1 < n_perseg <= arr.size:
raise ValueError('n_perseg out of range. '
'Expected 1 < n_perseg <= len(arr).')

if hop_size < 1:
raise ValueError('hop_size out of range. Expected 1 < hop_size.')

n_hops = (arr.size - n_perseg) // hop_size + 1
n_segs = n_hops

if (arr.size - n_perseg) % hop_size != 0 and n_perseg > hop_size:
n_segs += 1

fit_size = hop_size * n_hops + n_perseg
n_ext = fit_size - arr.size
arr = _zero_padding(arr, n_ext)

out = _np.empty((n_segs, n_perseg), dtype=arr.dtype)
for i in range(n_segs):
off = i * hop_size
out[i] = arr[off:off+n_perseg]
return out


def by_samples(arr: FloatArray, n_perseg: int, hop_size: int = 0) -> FloatArray:
"""Segment the input into n segments of length ``n_perseg`` and move the
window ``hop_size`` samples.
This function automatically applies zero padding for inputs that cannot be
split evenly.
If ``hop_size`` is less than one, it is reset to ``n_perseg``.
Overlap in percent is calculated as ``hop_size / n_perseg * 100``.
Args:
arr: One-dimensional input array
n_perseg: Length of segments in samples
hop_size: Hop size in samples. If < 1, ``hop_size`` = ``n_perseg``.
Returns:
Two-dimensional array of segments.
"""
if hop_size < 1:
return _by_samples(arr, n_perseg)
return _by_samples_with_hop(arr, n_perseg, hop_size)


def by_ms(arr: FloatArray, fps: int, ms_perseg: int, hop_size: int = 0) -> FloatArray:
"""Segment the input into n segments of length ``ms_perseg`` and move the
window ``hop_size`` milliseconds.
This function automatically applies zero padding for inputs that cannot be
split evenly.
If ``hop_size`` is less than one, it is reset to ``n_perseg``.
Overlap in percent is calculated as ``hop_size`` / ``n_perseg`` * 100.
Args:
arr: One-dimensional input array
fps: Sample rate in frames per second
n_perseg: Length of segments in milliseconds
hop_size: Hop size in milliseconds. If < 1, ``hop_size`` = ``n_perseg``.
Returns:
Two-dimensional array of segments
"""
n_perseg = fps * ms_perseg // 1000
hop_size = fps * hop_size // 1000
return by_samples(arr, n_perseg, hop_size)


def by_onsets(arr: FloatArray, n_perseg: int, ons_idx: IntArray, off: int = 0
) -> FloatArray:
"""Split input ``arr`` into ``len(ons_idx)`` segments of length ``n_perseg``.
Extraction windos start at ``ons_idx[i]`` + ``off``.
Args:
arr: One-dimensional input array
n_perseg: Length of segments in samples
ons_idx: One-dimensional array of onset positions
off: Length of offset
Returns:
Two-dimensional array of shape ``(len(ons_idx), n_perseg)``.
"""
n_ons = ons_idx.size
out = _np.empty((n_ons, n_perseg), dtype=arr.dtype)

for i, idx in enumerate(ons_idx):
pos = idx + off
if pos < 0:
pos = 0
elif pos >= arr.size:
pos = arr.size - 1

if pos + n_perseg >= arr.size:
buff = arr[pos:]
out[i] = _zero_padding(buff, n_perseg-buff.size)
else:
out[i] = arr[pos:pos+n_perseg]
return out

0 comments on commit 9f5b88c

Please sign in to comment.