Skip to content
Permalink
Browse files

eml_signal: Support IIR filter cascades

Based on Second Order Sections of transposed dfII biquads

Can for instance be used to construct 1/3 octave filterbanks,
as alternative to mel-spectrogram features
  • Loading branch information...
jonnor committed Dec 30, 2018
1 parent 6f54d16 commit faa61b145a01bad103f6ff16bb64703cbf2ffacd
Showing with 252 additions and 0 deletions.
  1. +61 −0 bindings/eml_signal.cpp
  2. +54 −0 bindings/emlpy_common.hpp
  3. +66 −0 emlearn/eml_iir.h
  4. +11 −0 setup.py
  5. +60 −0 test/test_filters.py
@@ -0,0 +1,61 @@

#include <stdio.h>

#include <vector>
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
#include <pybind11/numpy.h>

#define EML_DEBUG 1

#include "emlpy_common.hpp"
#include <eml_iir.h>

namespace py = pybind11;



py::array_t<float>
iirfilter_py(py::array_t<float, py::array::c_style | py::array::forcecast> sos,
py::array_t<float, py::array::c_style | py::array::forcecast> data)

{
EMLPY_PRECONDITION(sos.ndim() == 2, "SOS coefficients must have dimensions 2");
EMLPY_PRECONDITION(data.ndim() == 1, "data must have dimensions 1");

const int n_stages = sos.shape(0);

// Setup cascade
std::vector<float> coefficients(sos.data(), sos.data() + 6*n_stages);
std::vector<float> states(4*n_stages, 0.0);
EmlIIR filter = {
n_stages,
states.data(),
(int)states.size(),
coefficients.data(),
(int)coefficients.size(),
};

EMLPY_CHECK_ERROR(eml_iir_check(filter));

// Storing output
auto ret = py::array_t<float>(data.shape(0));
float *retdata = (float *)ret.data();

// Perform filter
for (int i=0; i<data.shape(0); i++) {
retdata[i] = eml_iir_filter(filter, data.data()[i]);
}

return ret;
}



PYBIND11_MODULE(eml_signal, m) {
m.doc() = "Signal processing for emlearn";

m.def("iirfilter", iirfilter_py);

}

@@ -0,0 +1,54 @@

#ifndef EMLPY_COMMON
#define EMLPY_COMMON

#include <eml_common.h>

#include <exception>

namespace emlpy {

class PreconditionFailed : public std::runtime_error {
public:
PreconditionFailed(const char *str)
: std::runtime_error(str)

{
}

};


class Error : public std::runtime_error {
public:
Error(EmlError err)
: std::runtime_error(eml_error_str(err))
{
}

};


} // end namespace emlpy



// Throw if expr is false
#define EMLPY_PRECONDITION(expr, msg) \
do { \
if (!(expr)) { \
throw new emlpy::PreconditionFailed(msg); \
} \
} while (0);

// Throw if expr gives error
#define EMLPY_CHECK_ERROR(expr) \
do { \
const EmlError _e = (expr); \
if (_e != EmlOk) { \
throw new emlpy::Error(_e); \
} \
} while (0);


#endif // EMLPY_COMMON
@@ -0,0 +1,66 @@


#ifndef EML_IIR
#define EML_IIR

#include "eml_common.h"

// Each stage is implemented using a Biquad filter
// https://en.wikipedia.org/wiki/Digital_biquad_filter
// TransporedDirectForm2
// preferred for floating-point
float
eml_biquad_tdf2(float s1[2], float s2[2],
const float a[3], const float b[3],
float in)
{
const float out = s1[1] + b[0]*in;
s1[0] = s2[1] + b[1]*in - a[1]*out;
s2[0] = b[2]*in - a[2]*out;
s1[1] = s1[0];
s2[1] = s2[0];
return out;
}

// TODO: implement DirectForm1 for fixed-point.
// Should possibly also use first-order noise shaping


// IIR filters using cascades of Second Order Sections (SOS)
// Follows conventions of scipy.signal.sosfilt
//
// A single second-order filter is just a special case with n_stages=1
typedef struct _EmlIIR {
int n_stages;
float *states;
int states_length;// n_stages * 4
const float *coefficients; // In scipy.signal.sosfilt convention. [0..2]: numerator, [3..5]: denominator
int coefficients_length; // n_stages * 6.
} EmlIIR;


EmlError
eml_iir_check(EmlIIR filter) {
EML_PRECONDITION(filter.n_stages >= 1, EmlUninitialized);
EML_PRECONDITION(filter.states, EmlUninitialized);
EML_PRECONDITION(filter.coefficients, EmlUninitialized);
EML_PRECONDITION(filter.states_length == filter.n_stages*4, EmlSizeMismatch);
EML_PRECONDITION(filter.coefficients_length == filter.n_stages*6, EmlSizeMismatch);
return EmlOk;
}

float
eml_iir_filter(EmlIIR filter, float in) {

float out = in;
for (int stage=0; stage < filter.n_stages; stage++) {
const float *num = filter.coefficients + ((6*stage)+0);
const float *den = filter.coefficients + ((6*stage)+3);
float *s1 = filter.states + ((4*stage)+0);
float *s2 = filter.states + ((4*stage)+2);
out = eml_biquad_tdf2(s1, s2, den, num, out);
}
return out;
}

#endif // EML_IIR
@@ -120,6 +120,17 @@ def build_extensions(self):
],
language='c++'
),
Extension(
'eml_signal',
['bindings/eml_signal.cpp'],
include_dirs=[
'emlearn/',
# Path to pybind11 headers
get_pybind_include(),
get_pybind_include(user=True)
],
language='c++'
),
]

def read_requirements():
@@ -0,0 +1,60 @@

from scipy import signal
import numpy
import pandas

import pytest

import eml_signal


def plot_freq_response(noise, a, b, fs=44100):

def spec(sig):
return signal.periodogram(sig, fs=fs, window='hann', scaling='spectrum')

f, noise_s = spec(noise)
print('noise', noise_s.shape)

df = pandas.DataFrame({
'f': f,
'original': noise_s,
'a': spec(a)[1],
'b': spec(b)[1],
})
df.plot(x='f', logy=True, logx=True, xlim=(10, fs/2))


def noisy_chirp(fs=44100, seconds=1.0):
t = numpy.linspace(0, 1, seconds*fs, False)
chirp = signal.chirp(t, f0=6, f1=fs/2.0, t1=seconds, method='linear')
noise = numpy.random.normal(0, 1.0, size=len(t))
sig = chirp + 0.05*noise
return t, sig


FS = 32000
FILTERS = {
'cheby1-order1-lowpass': signal.cheby1(N=1, rp=3, Wn=1000, btype='lp', fs=FS, output='sos'),
'cheby1-order2-highpass': signal.cheby1(N=2, rp=3, Wn=2000, btype='hp', fs=FS, output='sos'),
'cheby1-order4-highpass': signal.cheby1(N=4, rp=3, Wn=2000, btype='hp', fs=FS, output='sos'),
'cheby1-order5-bandpass': signal.cheby1(N=5, rp=12, Wn=[2000, 4000], btype='bp', fs=FS, output='sos'),
'cheby1-order12-highpass': signal.cheby1(N=12, rp=12, Wn=4000, btype='hp', fs=FS, output='sos'),
}


@pytest.mark.parametrize('filtername', FILTERS.keys())
def test_iir_filter(filtername):
sos = FILTERS[filtername]
assert len(sos.shape) == 2
assert sos.shape[1] == 6, sos.shape

t, noise = noisy_chirp(fs=FS, seconds=1.0)

ref = signal.sosfilt(sos, noise)
out = eml_signal.iirfilter(sos, noise)
rel_err = numpy.abs((out-ref)/ref)

assert ref.shape == out.shape, out.shape
numpy.testing.assert_allclose(ref, out, atol=1e-4)
assert numpy.median(rel_err) < 1e-4

0 comments on commit faa61b1

Please sign in to comment.
You can’t perform that action at this time.