Dependencies
===============

In [25]:
from __future__ import division

from IPython.display import *

from numpy import *; seterr(all="ignore")
from numpy import linalg
from numpy import random

%matplotlib notebook
from matplotlib.pyplot import *

import wish

import audio.wave as wave
from audio.filters import FIR, AR
import audio.frames
import audio.index; import nltk; nltk.download("timit")
from audio.lp import lp
from audio.quantizers import Quantizer

[nltk_data] Downloading package timit to /home/boisgera/nltk_data...
[nltk_data]   Package timit is already up-to-date!


In [3]:
df = 16000
dt = 1.0 / df

Sandbox
========

FIR
---

In [4]:
fir = FIR([1.0])
print fir(2.0)
print fir(1.0)
print fir([0.0, 7.0, -3.0])
delay = FIR([0.0, 1.0])
fir = delay
print fir(2.0)
print fir(1.0)
print fir([0.0, 7.0, -3.0])
print fir.a

2.0
1.0
[ 0.  7. -3.]
0.0
2.0
[1. 0. 7.]
[0. 1.]


Audio Source (NLTK)
=============

In [46]:
utterances = audio.index.search(type=audio.index.Utterance)
utterance = utterances[0]
display(utterance)
data = utterance.audio
audio.wave.write(data, "voice.wav", df=16000)
Audio("voice.wav")

a crab challenged me but a quick stab vanquished him

In [47]:
for item in utterance:
    print type(item).__name__.lower() + ": " + str(item)

phone: h# (a crab challenged me but a quick stab vanquished him).
word: a [q-ey]
word: crab [kcl-k-r-ae-bcl]
word: challenged [ch-ae-l-ax-n-dcl-jh-dcl-d]
phone: epi (a crab challenged me but a quick stab vanquished him).
word: me [m-iy]
phone: pau (a crab challenged me but a quick stab vanquished him).
word: but [b-eh-dx]
word: a [ix]
word: quick [kcl-k-w-ih-kcl-k]
word: stab [s-tcl-t-ae-bcl-b]
word: vanquished [v-ae-ng-kcl-k-w-ix-sh-tcl-t]
word: him [hh-ih-m]
phone: h# (a crab challenged me but a quick stab vanquished him).


In [48]:
audio.index.search("bcl", type=audio.index.Phone)

   0. bcl (black [bcl-b-l-ae-kcl]).
   1. bcl (betide [bcl-b-iy-tcl-t-ay-dcl]).
   2. bcl (be [bcl-b-iy]).
   3. bcl (both [bcl-b-ow-th]).
   4. bcl (blues [bcl-b-l-ux-z]).
   5. bcl (be [bcl-b-ix]).
   6. bcl (bowl [bcl-b-ow-l]).
   7. bcl (publicity [p-ax-bcl-b-l-ih-s-ix-dx-iy]).
   8. bcl (be [bcl-b-iy]).
   9. bcl (rhubarb [r-uw-bcl-b-aa-r-bcl]).
  10. bcl (rhubarb [r-uw-bcl-b-aa-r-bcl]).
  11. bcl (biblical [b-ih-bcl-b-l-ih-kcl-k-el]).
  12. bcl (maybe [m-ey-bcl-b-iy]).
  13. bcl (blistered [bcl-b-l-ih-s-tcl-t-axr-dcl]).
  14. bcl (underbrush [ah-n-axr-bcl-b-r-ah-sh]).
  15. bcl (arbitrate [q-aa-r-bcl-b-ix-tcl-t-r-ey-tcl-t]).
  16. bcl (tomboy [tcl-t-aa-m-bcl-b-oy]).
  17. bcl (brother [bcl-b-r-ah-dh-er]).
  18. bcl (battery [bcl-b-ae-dx-er-iy]).
  19. bcl (built [bcl-b-ih-l-tcl]).
  20. bcl (books [bcl-b-uh-kcl-s]).
  21. bcl (be [bcl-b-iy]).
  22. bcl (buy [bcl-b-ay]).
  23. bcl (about [ax-h-bcl-b-aw-tcl]).
  24. bcl (honeybees [hv-ah-n-iy-bcl-b-iy-z]).
  25. bcl (be [bcl-b-iy])

Short-Term Prediction
======================

In [49]:
class STP(Quantizer):
    "Short-Term Predictor"
    def __init__(self, order=16, method="autocorrelation"):
        self.fir = FIR(a=r_[1.0, zeros(order)]) 
        self.ar  = AR(a=zeros(order))
        self.order = order
        self.method = method
    def encode(self, data):
        if self.method == "covariance" and self.order >= len(data):
            raise ValueError("not enough data samples")
        a = lp(data, order=self.order, method=self.method)
        self.fir.a[:] = r_[1.0, -a]
        error = self.fir(data)
        return (a, error) 
    def decode(self, data):
        a, error = data
        self.ar.a[:] = a
        return self.ar(error)

In [43]:
def stp_error(data, T=0.02, order=16, method="autocorrelation"):
    length = len(data)
    n = int(T * df) # number of samples for T s at the given frequency.
    frames = audio.frames.split(data, n, pad=True)
    stp = STP(order=order, method=method)
    error = zeros(n*len(frames))
    for i, frame in enumerate(frames):
        a, error_frame = stp.encode(frame)
        error[i*n:(i+1)*n] = error_frame
    return error[:length]

In [50]:
figure()    
n = len(data)
t = r_[0:n] / df
plot(t,data, "k", alpha=0.1); axis("tight")
error = stp_error(data)
plot(t,error, "r")
xlabel("time [s]")
ylabel("amplitude")
axis("tight")
grid()

<IPython.core.display.Javascript object>

In [51]:
error = stp_error(data)
SNR2 = mean(data*data) / mean(error*error)
print "SNR", 10*log10(SNR2), "dB"

T = 20.0 / 1000 # 20 ms
n = int(df * T)
data_frames = audio.frames.split(data, n, pad=True)
error_frames = audio.frames.split(error, n, pad=True)

fig, (ax1, ax2) = subplots(2,1, sharex=True)
SNRs = [10.0 * log10(mean(d*d) / mean(e*e)) for d,e in zip(data_frames, error_frames)]
ax1.plot(data, "k", alpha=0.1, label="original")
ax1.plot(error, "r", label="stp residual")
ax1.set_title("Waveforms")
ax1.axis("tight")
ax1.legend()
ax1.grid()
ax2.plot(r_[0:len(SNRs)] * n,  SNRs, "b")
ax2.plot([0,len(data_frames)*n], [10*log10(SNR2), 10*log10(SNR2)], "k--")
ax2.set_xlabel("sample index")
ax2.set_ylabel("SNR [dB]")
ax2.set_title("Signal-to-Noise Ratio")
ax2.grid()

SNR 15.446422432113586 dB


<IPython.core.display.Javascript object>

In [52]:
wave.write(error, "voice-error-stp.wav", df=df)
display(Audio("voice.wav"))
display(Audio("voice-error-stp.wav"))

Pitch Estimation
==========

In [54]:
def ltp_parameters(history, frame,
                   offset_min=1, offset_max=None,
                   gain_min=0.0, gain_max=inf,
                   SNR_min=1.0, SNR_max=inf,
                   select=argmax,
                   returns="offset, gain"):
    p = len(history)
    data = r_[history, frame] # full data
    m = len(frame)
    n = len(data)
    nxcorrs = zeros(p+1)
    gains = zeros(p+1)
    SNRs = zeros(p+1)
    valids = zeros(p+1, dtype=bool)
    frame_norm = linalg.norm(frame)
    normed_frame = frame / frame_norm
    for i in range(p + 1):
        windowed_data = data[n-i-m:n-i]
        windowed_data_norm = linalg.norm(windowed_data)
        normed_windowed_data = windowed_data / windowed_data_norm
        nxcorr = nxcorrs[i] = dot(normed_frame, normed_windowed_data)
        SNR = SNRs[i] = 1.0 / sqrt(1 - nxcorr*nxcorr)
        #print ">", SNR
        gain = gains[i] = nxcorr / windowed_data_norm * frame_norm
        valid = True
        if offset_min is not None:
            valid = valid and (offset_min <= i)
        if offset_max is not None:
            valid = valid and (i <= offset_max)
        valid = valid and (gain_min <= gain <= gain_max)
        valid = valid and (SNR_min <= SNR <= SNR_max)
        valids[i] = valid

    criteria = SNRs.copy() 
    criteria[logical_not(valids)] = -inf
    offset = select(criteria)
    if not valids[offset]: # everything is invalid!
        raise ValueError("no valid set of parameters")
    else:
        gain = gains[offset]
        nxcorr = nxcorrs[offset]
        SNR = SNRs[offset]

    return wish.grant(returns)

In [76]:
N = 100
data_ = (0.7 * sin(r_[0:N]/N * 2*pi*4) + 0.05 * random.uniform(-1,1,N)) * (1.0 + 0.2*r_[0:1:1.0/N])
#history[::7] = 1.0
history, frame = data_[:-25], data_[-25:]
m = len(history)
n = len(frame)
offset_min = 5
def select(factor):
    def _select(SNRs):
        i = argmax(SNRs)
        is_ = r_[0:len(SNRs)]
        SNRi = max(SNRs)
        js = [int(ceil(i/2.0)), int(floor(i/2.0)), int(ceil(i/3.0)), int(floor(i/3.0)), int(ceil(i/4.0)), int(floor(i/4.0))]
        ij = argmax(SNRs[js])
        if SNRs[js[ij]] >= factor * SNRi:
            return js[ij]
        else:
            return i
    return _select
offset, gain, nxcorrs, SNRs, valids = ltp_parameters(history, frame, 
                                                     offset_min=offset_min, SNR_min=1.0, select=argmax, #select(0.5),
                                                     returns="offset, gain, nxcorrs, SNRs, valids")
print "offset:", offset, "gain:", gain

figure()
plot(data_, "k", alpha=0.5, label="data")
plot(arange(0,n)+m, frame, "b", label="reference")
plot(arange(m-offset, m-offset+n), frame/gain, "r", label="matched")
axis("tight")
legend(loc=0)

offset: 75 gain: 1.1689857342477985


<IPython.core.display.Javascript object>

<matplotlib.legend.Legend at 0x7fa07b71ac50>

In [77]:
figure()
m = arange(len(SNRs))
plot(m[offset_min:],SNRs[offset_min:], "r", alpha=0.25, label="SNR")
n = arange(len(SNRs))
plot(n[valids],SNRs[valids], "bx", linewidth=1.0,label="valid")
xlabel("offset")
ylabel("SNR (linear scale)")
axis("tight")
legend(loc=0)

<IPython.core.display.Javascript object>

<matplotlib.legend.Legend at 0x7fa07b62e510>

Long-Term Prediction
====================

In [78]:
class LTP(Quantizer):
    def __init__(self, order, **options):
        self.fir = FIR(a=r_[1.0, zeros(order)])
        self.history = zeros(order)
        self.ar = AR(a=zeros(order))
        self.order = order
        self.options = options
    def encode(self, frame):
        a = zeros_like(self.fir.a)
        a[0] = 1.0
        try:
            offset, gain = ltp_parameters(self.history, frame, **self.options)
            a[offset] = - gain
        except ValueError:
            offset, gain = 0, 0.0
        self.fir.a[:] = a 
        error = self.fir(frame)
        self.history = r_[self.history[len(frame):], frame]
        return (offset, gain), error
    def decode(self, data):
        (offset, gain), error = data
        a = zeros_like(self.ar.a)
        a[offset-1] = gain
        self.ar.a[:] = a
        return self.ar(error)

In [79]:
f_min = 50.0
f_max = 400.0
order_ltp = int(df/f_min)
print order_ltp
offset_min = int(df/f_max)
print offset_min

320
40


In [80]:
def ltp_error(data, T=0.005, order=order_ltp, **options):
    length = len(data)
    n = int(T * df) # number of samples for T s at the given sampling frequency.
    frames = audio.frames.split(data, n, pad=True)
    ltp = LTP(order=order, **options)
    error = zeros(n*len(frames))
    offset = zeros_like(error)
    gain = zeros_like(error)
    for i, frame in enumerate(frames):
        (offset_, gain_), error_frame = ltp.encode(frame)
        error[i*n:(i+1)*n] = error_frame
        offset[i*n:(i+1)*n] = ones_like(error_frame) * offset_
        gain[i*n:(i+1)*n] = ones_like(error_frame) * gain_
    error = error[:length]
    offset = offset[:length]
    gain = gain[:length]
    return error, offset, gain

In [85]:
stp_error_ = stp_error(data)
ltp_error_, offset, gain = ltp_error(stp_error_, offset_min=offset_min, SNR_min=1.1, select=select(0.5))
stp_SNR2 = mean(data*data) / mean(stp_error_*stp_error_)
ltp_SNR2 = mean(data*data) / mean(ltp_error_*ltp_error_)

T = 20.0 / 1000 # 20 ms
n = int(df * T)
data_frames = audio.frames.split(data, n, pad=True)
stp_error_frames = audio.frames.split(stp_error_, n, pad=True)
ltp_error_frames = audio.frames.split(ltp_error_, n, pad=True)


fig, (ax1, ax2, ax3) = subplots(3,1, sharex=True)
stp_SNRs = [10.0 * log10(mean(d*d) / mean(e*e)) for d,e in zip(data_frames, stp_error_frames)]
ltp_SNRs = [10.0 * log10(mean(d*d) / mean(e*e)) for d,e in zip(data_frames, ltp_error_frames)]
ax1.plot(data, "k", alpha=0.1, label="original")
ax1.plot(stp_error_, "r", alpha=0.5, label="stp residual")
ax1.plot(ltp_error_, "g", alpha=1.0, label="stp+ltp residual")
ax1.set_title("Waveforms")
ax1.axis("tight")
ax1.legend()
ax1.grid()
ax2.plot(r_[0:len(stp_SNRs)] * n,  stp_SNRs, "r", alpha=0.5, label="stp SNR")
ax2.plot([0,len(data_frames)*n], [10*log10(stp_SNR2), 10*log10(stp_SNR2)], "r--", alpha=0.5)
ax2.plot(r_[0:len(ltp_SNRs)] * n,  ltp_SNRs, "g", label="stp+ltp SNR")
ax2.plot([0,len(data_frames)*n], [10*log10(ltp_SNR2), 10*log10(ltp_SNR2)], "g--")
ax2.set_xlabel("sample index")
ax2.set_ylabel("SNR [dB]")
ax2.set_title("Signal-to-Noise Ratio")
ax2.legend()
ax2.grid()
#n = len(data)
#t = r_[0:n] / df
ax3.plot(df / offset)
ax3.plot(2*df / offset, "k.", alpha=0.5, ms=0.25)
ax3.plot(3*df / offset, "k.", alpha=0.25, ms=0.25)
ax3.axis([0, len(data), 0, 400.0])
ax3.set_ylabel("frequency [Hz]")
ax3.set_yticks(r_[0:400:100])
ax3.grid()

<IPython.core.display.Javascript object>

In [87]:
wave.write(ltp_error_, "voice-error-stp-ltp.wav", df=df)
display(Audio("voice.wav"))
display(Audio("voice-error-stp.wav"))
display(Audio("voice-error-stp-ltp.wav"))