<div style="margin: 0 auto 10px; height: 70px; border: 2px solid gray; border-radius: 6px;">
  <div style="float: left; margin: 5px 10px 5px 10px; "><img src="img/bfh.jpg" /></div>
  <div style="float: right; margin: 20px 30px 0; font-size: 15pt; font-weight: bold; color: #98b7d2;"><a href="https://moodle.bfh.ch/course/view.php?id=39255" style="color: #98b7d2;">BTE5476 - Project-Oriented Digital Signal Processing </a></div>
</div>
<div style="clear: both; font-size: 30pt; font-weight: bold; color: #64788b; margin-left: 30px;">
    DTMF Signaling
</div>

In [None]:
%matplotlib inline
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import scipy.signal as sp
import IPython
from scipy.io import wavfile
import os
import csv

In [None]:
plt.rcParams["figure.figsize"] = (9,2.5)

# Introduction

<div style="float: right; margin: 10px;"><img src="img/phone.jpg" width="250"></div>

DTMF (Dual-Tone Multi-Frequency) is a signaling protocol used to transmit simple numeric information over the frequency band provided by analog telephone lines, that is, between 300 Hz and 3400 Hz. When you use the keypad of an analog phone such as the one shown on the right, the sequence of dialed digits is transmitted to the phone company's switches in the form of audible _dial tones_. Today, cell phones and office phones are directly connected to digital networks and therefore no longer use DTMF for dialing. But DTMF is still frequently used in automated attendant systems (i.e., those phone menus where you are told to "press 1 to talk to customer service" etc.)


## DTMF specifications

In DTMF the phone's keypad is arranged in a $4\times 3$ grid and each key is associated to a unique *pair* frequencies, as shown by this table:


|            | **1209 Hz** | **1336 Hz** | **1477 Hz** |
|------------|:-----------:|:-----------:|:-----------:|
| **697 Hz** |      1      |      2      |      3      |
| **770 Hz** |      4      |      5      |      6      |
| **852 Hz** |      7      |      8      |      9      |
| **941 Hz** |      *      |      0      |      #      |


When a key is pressed, two oscillators operating at the frequencies associated to the key send their output over the phone line. For instance, if the digit '1' is pressed, the oscillators will produce the following continuous-time signal
$$
    x(t) = \sin(2\pi\cdot 1209\cdot t) + \sin(2\pi\cdot697\cdot t)
$$

When dialing a multi-digit number, successive dial tones must be separated by a silent gap; although the official standard does not set standard timings, a DTMF receiver should be designed according to the following specifications:
 * valid dial tones can be as short as 40ms 
 * the silent gap between tones can also be as short as 40ms
 * actual tone frequencies can deviate up to $\pm 1.5\%$ from their nominal values 

# Digital DTMF

DTMF was developed in the late 1950s and the first commercial DTMF phones hit the market in the 1960s. At the time, the system was implemented using analog hardware and the various frequencies were generated by a set of individual electronic oscillators.

<div style="float: left; margin: 0px;"><img src="img/mt8870.jpg" width="150"></div>

Obviously this is no longer the case and today DTMF signals are generated and decoded by dedicated (and extremely inexpensive) [DSP chips](https://pdf.datasheetcatalog.com/datasheets/228/268107_DS.pdf). In this notebook we will implement our own digital DTMF algorithms but, before anything else, let's review the relationship between the DTMF frequency values in Hz as specified by the standard and the digital frequencies that we will need to use in discrete time. This is relatively straightforward even without any formal knowledge of sampling and interpolation since all the signals involved are pure sinusoids.

## Digital to analog 

A digital-to-analog (D/A) converter, such as the soundcard in your PC, creates its analog output by interpolating the incoming digital samples at a rate of $F_s$ samples per second; this rate is the "clock" of D/A converter and, although it is an _interpolation_ rate, it is usually referred to as the _sampling rate_ or sampling _frequency_ of the system, using the same term that we use for an analog-to-digital converter.

When a soundcard with interpolation rate $F_s$ "plays" a discrete-time sequence of the form $x[n] = \cos(\omega_0 n)$ (that is, a discrete-time sinusoid with digital frequency $\omega_0 \in [-\pi, \pi]$), it outputs the continuous-time sinusoid $x(t) = \cos(2\pi f_0 t)$ where

$$
    f_0 = \frac{\omega_0}{2\pi}F_s. \tag{1}
$$

This means that the analog frequency of the output depends _both_ on the frequency of the discrete-time sinusoid _and_ on the interpolation rate of the soundcard, which is usually a design parameter. In general, we want to keep all sampling rates as low as possible since the power consumption of an D/A chip is approximately proportional to $F_s^2$. 

As an example, here you can listen to how the pitch changes when the _same_ discrete-time sinusoid is played by the soundcard at different interpolation rates (and note how the duration of the audio also changes, obviously): 

In [None]:
w = 2 * np.pi * 0.05 
x = np.sin(w * np.arange(0, 8000))

for Fs in [8000, 16000, 4000]:
    print(f'Using an interpolation rate of {Fs} samples per second:')
    display(IPython.display.Audio(x, rate=Fs, normalize=False))

**Exercise: minimal rate.** What is the minimal interpolation rate required by to implement a digital DTMF transmitter?

**SOLUTION:** 

Since the fastest possible digital frequency is $\omega = \pi$, as per eq. (1) the highest frequency that an D/A can generate is $F_s/2$. The largest frequency value in the DTMF table is 1477 Hz, and therefore we need 
$$
    F_s > 2954.
$$

## Analog to digital

The soundcard in your PC also works as an analog-to-digital (A/D) converter: it records an incoming audio signal by measuring (that is, by _sampling_) its amplitude $F_s$ times per second. 

If the input is a sinusoid of the form $x(t) = \sin(2\pi f_0 t)$ the resulting discrete-time signal will be 

$$
    x[n] = \sin(\omega_0 n) \qquad \text{with} \qquad \omega_0 = 2\pi\frac{f_0}{F_s}.
$$

As long as the a sampling frequency is larger than _twice_ the frequency of the input sinusoid, the sequence of samples is a perfect representation of the analog waveform in the sense that $x[n]$ can be interpolated back into $x(t)$ _exactly_ by a D/A converter also operating at $F_s$ samples per second.

**Exercise: aliasing.** Consider D/A converter connected in cascade to a A/D converter; both converters operate at the same rate $F_s$. Assume that the input to the cascade is the signal $x(t) = \sin(2\pi f_0 t)$ with $f_0 = 1.6F_s$.

What is the analog signal at the output of the cascade?

**SOLUTION:** 

The A/D will produce a discrete time sinusoid $x[n] = \sin(\omega_0 n)$ whose digital frequency is
$$
    \omega_0 = 2\pi\frac{f_0}{F_s} = 3.2\pi.
$$
Since $\omega_0 > 2\pi$ we can use trigonometry to bring the frequency in the $[-\pi,\pi]$ interval:
$$
    x[n] = \sin(3.2\pi n) = \sin(1.2\pi n + 2\pi n) = \sin(1.2\pi n)
$$
When this signal enters the D/A, it produces an analog sinusoid with frequency
$$
    f'_0 = \frac{1.2\pi}{2\pi}F_s = 0.6F_s,
$$
that is, the original frequency has been aliased to a completely different value.

## Final design parameter

Although in theory we could use a lower value, in the rest of the notebook we will use $F_s = 8000$:
 * since the telephone channel is "natuarally" bandlimited to 4000 Hz, with a sampling frequency of 8 kHz no additional anti-aliasing filter is needed
 * in most soundcards, this is the lowest available sampling rate.

In [None]:
Fs = 8000

# The encoder

In the next exercise, you task will be to implement a DTMF encoder as a Python function: the function takes a string of key values as input and returns their DTMF encoding as discrete-time audio signal that can be played at a rate of $F_s$ samples/second.

To get you started, here is a partial implementation where:
 * the DTMF frequency pairs are available as a dictionary, indexed by the key values
 * the durations (in seconds) of the tones and the silence gap are also specified.

**Exercise: implement a DTMF encoder.** Complete the function below so that it returns the DTMF encoding of a series of key values, passed as a string. The encoding should be padded with 250 milliseconds of silence both at the beginning and at the end.

In [None]:
def DTMF_encode(digits: str, Fs=8000) -> np.ndarray: 
    PADDING_SEC = 0.25
    TONE_SEC, SPACE_SEC = 0.2, 0.1
    DTMF_FREQS = {
        '1': (697, 1209), '2': (697, 1336), '3': (697, 1477),
        '4': (770, 1209), '5': (770, 1336), '6': (770, 1477),
        '7': (852, 1209), '8': (852, 1336), '9': (852, 1477),
        '*': (941, 1209), '0': (941, 1336), '#': (941, 1477),        
    }
    
    # index range for tone intervals
    #n = np.arange(...)
    
    #  output signal, start with initial silence
    #x = np.zeros(...))
    
    for k in digits:
        try:
            # select the DTMF frequencies
            ... 
            # append tones and space to output
            # x = np.r_[ x, ... ]
        except KeyError:
            print(f'invalid key: {k}')
            return None
    # append final silence and return
    return #...

In [None]:
# SOLUTION:

def DTMF_encode(digits: str, Fs=8000) -> np.ndarray: 
    PADDING_SEC = 0.25
    TONE_SEC, SPACE_SEC = 0.2, 0.1
    DTMF_FREQS = {
        '1': (697, 1209), '2': (697, 1336), '3': (697, 1477),
        '4': (770, 1209), '5': (770, 1336), '6': (770, 1477),
        '7': (852, 1209), '8': (852, 1336), '9': (852, 1477),
        '*': (941, 1209), '0': (941, 1336), '#': (941, 1477),        
    }

    # index range for tone intervals
    n = np.arange(0, int(TONE_SEC * Fs))
    # output signal, start with initial silence
    x = np.zeros(int(PADDING_SEC * Fs))
    
    for k in digits:
        try:
            w_lo, w_hi = 2 * np.pi * np.array(DTMF_FREQS[k]) / Fs 
            x = np.r_[ x, np.sin(w_lo * n) + np.sin(w_hi * n), np.zeros(int(SPACE_SEC * Fs)) ]
        except KeyError:
            print(f'invalid key: {k}')
            return None
    return np.r_[ x, np.zeros(int((PADDING_SEC - SPACE_SEC) * Fs)) ]

Let's test it and evaluate it "by ear":

In [None]:
x = DTMF_encode('123##45', Fs=Fs)
IPython.display.Audio(x, rate=Fs)

# Designing a decoder: prep work

As is always the case in telecommunication systems, the encoder is the easy part; but now we need to build a decoder. Designing a robust decoder is difficult because there are a lot of things that can degrade the quality of the received signals; for instance:
 * there will always be some noise in the signal, and sometimes LOTS of noise
 * the signal could be affected by nonlinear distortion
 * the durations of tones and gaps can vary a lot
 * if the intenal clocks at encoder and decoder are running at slightly different speed, the DTMF frequencies will deviate from their nominal values.

This week, your take-home exercise is the implementation of a DTMF decoder. To get you started, let's look at the DTMF signal in more detail and see how we can process it. 

## Some helper functions

### Something from last week

Here is a function that returns the magnitude spectrum of a signal over the positive frequency axis together with the axis labels in Hz; we implemented this in last week's notebook, where you can review the details if needed:

In [None]:
def spectral_power(x: np.ndarray, Fs: float) -> (np.ndarray, np.ndarray):
    L = len(x) // 2
    return np.linspace(0, Fs/2, L+1), np.abs(np.fft.fft(x)[:L+1]) ** 2 / len(x)

### Realistic test files

Later in the notebook we will use a set of pre-recorded DTMF test files available in the `data` folder. Here are two helper functions to load and play the data

In [None]:
def read_test_file(filename: str) -> np.ndarray:
    # helper function to load a DTMF test file
    fs, x = wavfile.read(os.path.join('data', filename))    
    # normalize audio data to [-1, 1] if necessary
    if x.dtype is np.dtype(np.int16) :   
        x = x / 32767.0
    return fs, x

In [None]:
def hear_test_file(basename: str, ix: int):
    file = f'{basename}{ix}.wav'
    fs, s = read_test_file(file)
    print(file)
    display(IPython.display.Audio(s, rate=fs, normalize=False))

## Time-Frequency Analysis of a DTMF signal

A DTMF signal carries information both in time and in frequency: key values are encoded by frequency pairs while the sequence of keys is encoded in time. But this creates a problem: if we look at the spectrum of a DTMF signal we can easily see what keys have been pressed but we can't determine their order or their multiplicity:

In [None]:
x1 = DTMF_encode('159', Fs)
x2 = DTMF_encode('915915', Fs)

plt.subplot(1,2,1)
plt.plot(*spectral_power(x1, Fs))
plt.subplot(1,2,2)
plt.plot(*spectral_power(x2, Fs));

On the other hand, if we look at the signals in time we can see the number of key presses and their order but we cannot easily see the DTMF frequencies:

In [None]:
plt.subplot(1,2,1)
plt.plot(np.arange(0, len(x1)) / Fs, x1)
plt.subplot(1,2,2)
plt.plot(np.arange(0, len(x2)) / Fs, x2);

The solution is **time-frequency** analysis and the most common tool for this is the *spectrogram*. In a spectrogram we split the input signal into shorter segments (or *chunks*) and we compute the magnitude of the DFT for each segment independently. The result is a 2D matrix where each magnitude value is indexed by the chunk number and by the DFT index. To visualize it, the magnitude values are *color-coded* so that low values appear dark and high values appear bright.

Here are the spectrograms of the previous signals, and you can see how both the time and the frequency information are clearly visible.

In [None]:
for i, s in enumerate([x1, x2]):
    plt.subplot(1,2,i+1)
    f, t, S = sp.spectrogram(s, Fs)
    plt.pcolormesh(t, f, S);

## Signal segmentation

Our time-frequency analysis suggests that, in order to decode a DTMF signal, we need to isolate each tone interval before we look at its frequency content. Since we know that tone intervals are separated by silent gaps, a simple segmentation strategy is the following:
 * at each new input sample compute the *instantaneous power* of the signal
 * look for power level transitions from low (silence gap) to high (tone intervals).

To compute the instantaneous power, remember that power is the time average of energy and energy is the sum of the squared samples. With this, the simplest and most common method to compute the instantaneous power is to filter the _squared_ signal with a narrowband lowpass such as a moving average or leaky integrator:

In [None]:
def plot_inst_pow(x, b, a):
    plt.plot(x)
    plt.plot(sp.lfilter(b, a, np.abs(x) ** 2));

In [None]:
# using a moving average with 5ms delay
M = int(Fs * 0.01)
plot_inst_pow(x1, np.ones(M) / M, 1)

In [None]:
# using a leaky integrator with the same delay
lam = M / (M + 2) 
plot_inst_pow(x1, 1 - lam, [1, -lam])

**Food for thought.**  Now that we have a way to estimate the power of the signal, how can we use this information to segment our audio file? Visually it seems obvious that we need to look for power transitions from low to high and vice-versa but, in practice, to determine if there was a transition we need to use a _threshold_, that is, a reference power value that we compare the current power level to. 

How can we choose the value of the threshold? Things to think about:
 * the signal could be noisy, so the power in gaps between tones can be almost as large as the power in tones
 * in real life, we don't know the "volume" of the received signal: attenuation or amplification can arbitrarily change the amplitude of the sinusoids

In [None]:
fs, s = read_test_file('test3.wav')
plot_inst_pow(s, 1 - lam, [1, -lam])
display(IPython.display.Audio(s, rate=fs, normalize=False))

## State machines

Most digital signal processing applications need to work in real time, that is, they need to process an input stream of samples continuously, one sample at a time. Last week, for instance, we saw how to implement filters as Python *classes* so that a filter could be called repeatedly on individual samples; using classes allowed the filter to preserve the contents of its delay blocks across calls (that is, the filter's internal *state* was persistent).  

In more complex systems that implement some decision logic, the processing algorithm needs to behave differently according to the *current* properties of the input; for instance, when decoding DTMF, we want to look at the frequency content in tone intervals but do nothing for the silence gaps. 

The most common **design pattern** in this case is a *state machine*; every time a new sample arrives:
 * we first perform the required pre-processing steps (e.g. compute the local energy)
 * we then apply a different set of processing steps that depend on an internal *state*
 * finally, we check if we neet to switch to a different state for the next iteration.

For example, if we want to isolate the tone segments in a DTMF signal, we can use a machine with three states:
 1. waiting for the input power to increase
 2. checking that the power increase lasts long enough to indicate a tone
 3. waiting for the power to decrease and signal the end of the tone

Here is an example that produces a segmentation of a DTMF signal based on a user-defined power threshold:

In [None]:
def tone_intervals(x: np.ndarray, t_pow: float, gain: float, Fs=8000) -> (list[tuple[int, int]], np.ndarray):
    # gain: gain for leaky integrator to compute power level 
    # minimum length (ms) for tone detection
    MIN_TONE_LEN_MS = 40    
    
    clips = []
    min_tone_len = int(MIN_TONE_LEN_MS * Fs / 1000) 
    delay = (1 - gain) / gain   # delay of leaky integrator
    power = np.zeros(len(x))
    
    # state machine with three states
    WAIT_TONE, CHECK_TONE, WAIT_END = 0, 1, 2
    state= WAIT_TONE
    
    for n in range(0, len(x)):
        # compute power with leaky integrator
        power[n] = (1 - gain) * power[n-1] + gain * x[n] * x[n]
        # check if power is above threshold
        power_high = power[n] > t_pow

        if state == WAIT_TONE:  # we are waiting for the power to go high
            if power_high:
                count = 0
                state = CHECK_TONE
        elif state == CHECK_TONE:  # make sure power stays high before switching
            if not power_high:
                state = WAIT_TONE  # false alarm: go back to waiting
            else:
                count += 1
                if count > min_tone_len:
                    state = WAIT_END  # power was high for more than 40ms, it's a tone
        else:  # wait for the end of the tone interval
            if power_high:
                count += 1
            else:
                clips.append( (int(n-count-delay), int(n-delay)) )
                state = WAIT_TONE
    return clips, power

In [None]:
def show_tone_intervals(x: np.ndarray, t_pow=0.15, gain=0.02, Fs=8000):
    tones, power = tone_intervals(x, t_pow, gain, Fs)
    plt.plot(x);
    plt.plot(power);
    plt.axhline(t_pow, color='C4')
    for clip in tones:
        plt.axvline(clip[0], color='green')        
        plt.axvline(clip[1], color='red')

In [None]:
show_tone_intervals(x)

## Frequency identification

Once we have identified where the tone intervals are, we can compute the magnitude DFT of each interval, map the DFT bins to frequency values in Hz, and find the DTMF frequencies that are closest to the peaks of the DFT magnitude. 

The "low" DTMF frequencies are in the 697 Hz to 941 Hz range, while the high frequencies in the 1209 Hz to 1477 Hz range, so we will look for a DFT peak in each of those intervals. For instance, let's look at the first tone, and let's look at the peaks in the DFT: they are very close to the frequencies 697 Hz and 1209 Hz, which correspond to the key "1".

In [None]:
f, X = spectral_power(x[slice(*tone_intervals(x, t_pow=0.15, gain=0.02, Fs=8000)[0][0])], Fs)
plt.plot(f, X);
print(f[np.argsort(X)[-2:]])

# A basic DTMF decoder

Using the elements from the previous section, here is a functional DTMF decoder:

In [None]:
def DTMF_decode(x: np.ndarray, t_pow=0.15, gain=0.02, Fs=8000) -> list[str]:
    # minimum length (ms) for tone detection
    MIN_TONE_LEN_MS = 40    
    
    LO_FREQS = np.array([697.0, 770.0, 852.0, 941.0]) # DTMF rows
    HI_FREQS = np.array([1209.0, 1336.0, 1477.0])     # DTMF columns
    KEYS = [['1', '2', '3'], 
            ['4', '5', '6'], 
            ['7', '8', '9'], 
            ['*', '0', '#']] 

    decoded = ''
    min_tone_len = int(MIN_TONE_LEN_MS * Fs / 1000) 
    delay = (1 - gain) / gain 
    power = 0
    
    WAIT_TONE, CHECK_TONE, WAIT_END = 0, 1, 2
    state= WAIT_TONE
    
    for n in range(0, len(x)):
        power = (1 - gain) * power + gain * x[n] * x[n]
        power_high = power > t_pow

        if state == WAIT_TONE:  
            if power_high:
                state, count = CHECK_TONE, 0

        elif state == CHECK_TONE:  
            if power_high:
                count += 1
                state = WAIT_END if count > min_tone_len else CHECK_TONE
            else:
                state = WAIT_TONE  
     
        else:  # WAIT_END
            if power_high:
                count += 1
            else:
                # compute tone magnitude spectrum
                X = np.abs(np.fft.fft(x[slice(int(n-count-delay), int(n-delay))]))
                N = len(X)
        
                k = KEYS
                for freqs in ([LO_FREQS, HI_FREQS]):
                    # map frequency range to DFT indexes
                    r_min, r_max = int(N / Fs * np.min(freqs - 100)), int(N / Fs * np.max(freqs + 100))
                    # find frequency peak
                    ix = r_min + np.argmax(X[slice(r_min, r_max)])
                    # find closest DTMF frequency within range
                    f_ix = int(np.argmin(np.abs(freqs - Fs * ix / N)))
                    # first iteration extract row, second iteration extract column
                    k = k[f_ix]
        
                decoded += k                 
                state = WAIT_TONE
    return decoded            

This works very well on clean signals:

In [None]:
keys = '123##45'
k = DTMF_decode(DTMF_encode(keys, Fs=Fs), Fs=Fs)
print('good job!' if k == keys else k + '  ** oups, try again!')

## But life is not so simple...

Although the decoder works on a clean, synthetic DTMF signal, in the real world it will have to deal with problems such as noise, attenuation, frequency drifts, and varying pulse lengths. A few test cases illustrating these impairments are available in the `data` directory of this notebook.

We can test our basic decoder using the following script and see how many it gets right:

In [None]:
#for n in range(0, 14):
#    hear_test_file('test', n)

In [None]:
def test_decoder(basename: str, decoder, **kwargs):
    # decode all test files with the same basename and check the results
    with open(os.path.join('data', f'{basename}.log'), 'r') as csvfile:
        test_files = csv.DictReader(csvfile, fieldnames=['ix', 'keys'], delimiter=';')
        total, ok = 0, 0
        for file in test_files:
            fs, s = read_test_file(f'{basename}{file["ix"]}.wav')
            total += 1
            res = decoder(s, **kwargs)
            if str(res).strip() == str(file["keys"]).strip():
                flag = 'OK'
                ok += 1
            else:
                flag = 'ERROR'
            print(f'{file["ix"]}: {flag}   (encoded: {file["keys"]}, decoded: {res})')
    print(f'\nYou got {ok} out of {total} correct')

In [None]:
test_decoder('test', DTMF_decode)

## Why does it fail?

Decoding mistakes are due essentially to signal segmentation errors. For instance, in the following example, some tones are missed while others are split into multiple clips, leading to both missed digits and duplicate digits:

In [None]:
fs, noisy = read_test_file('test3.wav')
IPython.display.Audio(noisy, rate=fs)

In [None]:
DTMF_decode(noisy)

This is apparent if we plot the segmentation results: 

In [None]:
show_tone_intervals(noisy)

The segmentation algorithm is parametrized via:
 * the gain of the leaky integrator, which trades off smoothing power with delay
 * the threshold for the power level
 
We could try to play with these two parameters and, with patience and some luck, we could find a pair of values that "fix" the decoder _for this particular case_:

In [None]:
params = {
    't_pow': .146,
    'gain': .006
}
show_tone_intervals(noisy, **params, Fs=fs)

In [None]:
DTMF_decode(noisy, **params, Fs=fs)

The values we just used, however, do not "magically" work for all test files (although they make things better). 

In [None]:
test_decoder('test', DTMF_decode, **params)

# A robust DTMF decoder

We will now develop a more robust DTMF decoder that does not rely on hard-coded thresholds. We want to estimate and adapt the segmentation thresholds from the input signal so that we can handle both additive noise and unknown attenuation. 

The key idea is to separate the power associated to the DTMF tones from the total power of the signal; if we move to the frequency domain, we know exactly _where_ to look for tone energy in the spectrum since there are only 7 possible DTMF tone frequencies. If the signal is corrupted by additive white noise, whose power spectral density is more or less flat:

 * when there is no tone, the average energy will be more or less the same tone and non-tone frequencies;
 * when a tone appears, only the tone energy should increase.

The trick is to compute the tone detection threshold from the signal's tone and non-tone power levels as estimated from the spectral energy distribution. To do so, we will move to the frequency domain by taking the DFT of successive chunks of the signal. The first question, therefore, is finding a good value for the size of the chunks.

## Optimal DFT length

An $N$-point DFT has the following properties:
 * each DFT coeficient is associated to a center frequency $\omega_k = (2\pi/N)k$
 * the frequency resolution, that is, the difference between adjacent bin frequencies, is $\Delta_N = 2\pi/N$
 * each DFT coefficient covers the frequency interval $[\omega_k - \Delta_N/2, \omega_k + \Delta_N/2]$

If we know the sampling frequency, 
 * the frequency resolution is $\Delta_f = F_s/N$ Hz
 * each DFT coefficient corresponds to the interval $[(F_s/N)(k-1/2), (F_s/N)(k+1/2)]$ Hz.

This means that if we take $N$ samples of a signal containing two sinusoids, we will be able to see two distinct spectral lines only if the DFT frequency resolution is smaller than the frequency difference between the sinusoid. By "distinct", we mean that there is at least one DFT bin separating the lines. For example:

In [None]:
N, w = 60, 0.3 * np.pi 

D, n = 2 * np.pi / N, np.arange(0, N)
for i in [1, 2]:
    plt.subplot(1,2,i)
    plt.stem(*spectral_power(np.sin(w * n) + np.sin((w + i * D) * n), N));

Since the decoder needs to locate the spectral peaks corresponding to the DTMF frequencies, we would like to use a DFT size such that 
  * neighboring DTMF frequencies always appear as distinct lines
  * the DTMF frequencies fall as close as possible to one of the DFT center frequency values (in Hz)

With respect to the first condition, the smallest distance between DTMF frequencies is 73 Hz so the DFT size should be at least 14 ms; at the same time, the DFT size cannot be too large and certainly it cannot be larger than 40 ms, the minimum tone length allowed by the DTMF standard.

The following function computes the optimal DFT lenght for a given sampling frequency and a maximum size. 

In [None]:
def optimal_DFT_size(max_len_ms: float, Fs: float) -> (int, np.ndarray):
    DTMF_FREQS = np.array((697, 770, 852, 941, 1209, 1336, 1477))

    # minimum DFT size to guarantee frequency separation
    min_delta = np.min(np.abs(DTMF_FREQS[1:] - DTMF_FREQS[:-1]))
    min_len = int(Fs / min_delta)
    max_len = int(max_len_ms * Fs / 1000)

    ret = (0, [])
    min_err = 1e100
    for dft_len in range(min_len, max_len + 1):
        # compute the difference between nominal DTMF frequencies and closest DFT center bin frequencies
        bins = np.round(DTMF_FREQS / Fs * dft_len).astype(int)
        offsets = bins * Fs / dft_len - DTMF_FREQS
        # Mean Square Error for offsets
        err = np.sqrt(np.sum(offsets ** 2) / Fs * dft_len)
        if err < min_err:
            min_err = err
            ret = (dft_len, bins)
    return ret

Interestingly, the optimal size is not the maximum size, as shown in the following plot. In particular, for $F_s = 8000$, a 40 ms DFT window is 320 samples long; yet, the optimal choice is a 299-sample DFT because it minimizes the approximation error for the DTMF frequencies.

In [None]:
k = np.arange(15, 40)
plt.plot(k, [optimal_DFT_size(t, Fs)[0] for t in k]);

In [None]:
optimal_DFT_size(40, Fs)

## Sliding window

Once we have chosen the DFT size, another way to make the decoder more robust is to use overlapping windows, that is, shift the analysis window by a fraction of the window length. The amount of displacement is called _stride_ and we will choose 5 ms, that is, about 1/8 of the analysis window. The redundance of an overlapped analysis provides better time resolution and noise immunity 

## Tone and non-tone energy

At each time $n$ we compute $X_n[k]$, the DFT of the data vector $[x[n-N+1], \ldots, x[n]]$. There are seven possible DTMF frequency values, $f_0$ to $f_6$, and let $k_i$ be the DFT bin corresponding to $f_i$; we compute the spectral energy for $f_i$ as the average energy over three neighboring DFT bins:

$$
    E(i; n) = \frac{1}{3}\sum_{m=-1}^{1}|X_n[k_i + m]|.
$$

Conversely, the non-tone energy will be the averaged energy over all DFT bins not used for tone energy.

Both tone and non-tone energies will be time-averaged using leaky integrators; since the estimated average tone energy should be estimated only when a tone is present, we stabilize the estimation process by reducing the leaky integrator gain when we assume there is no tone in the input.

## Key decoding

Once we have detected that a tone is present, the spectral energy for the 7 DTMF frequencies is accumulated for _all_ 7 frequencies in a 7-bin histogram. When we detect the end of a tone interval, we determine the two largest histogram values within the low and high DTMF frequency ranges to decode the key value. 


## The final design

In [None]:
def freqs2key(bin_energy: np.ndarray) -> (int, str):
    # the function takes a vector of accumulated energy values for the DTMF frequency "slots"
    #  and returns the key value corresponding to the largest lo and hi values
    DTMF_KEYS = '123456789*0#'
    ix = 3 * np.argmax(bin_energy[:4]) + np.argmax(bin_energy[4:])
    return DTMF_KEYS[ix]

In [None]:
def DTMF_dplus(x: np.ndarray, Fs=8000, debug=False) -> list[str]:
    # step for analysis windows
    STRIDE_MS = 5
    # gains for leaky integrators
    GAIN_SLOW, GAIN_FAST = 0.005, 0.02

    # find optimal DFT size and tone bins (max len 40 ms)
    M, bins = optimal_DFT_size(40, Fs)
    # binary mask for all non-tone frequency bins in DFT
    non_tone_bins = np.full(M, True)
    non_tone_bins[bins] = non_tone_bins[bins-1] = non_tone_bins[bins+1] = False
    
    stride = int(STRIDE_MS * Fs / 1000)
    min_tone_len = int(40 / STRIDE_MS)
    min_gap_len = int(20 / STRIDE_MS)
        
    # initial estimate for noise floor (assuming leading silence)
    #  (we "cheat" a bit by looking at future data, but we could obtain the same
    #    result using an extra state in the processing state machine)
    nont_level = np.sum(x[:M] ** 2)
    # initial estimate for tone energy
    tone_level = nont_level * 2
    n_gain, t_gain = GAIN_FAST, GAIN_SLOW

    # debug data
    steps = len(x) // stride + 1
    debug_data = {
        'ept': np.zeros((steps, len(bins))),
        'tae': np.zeros(steps),
        'nae': np.zeros(steps),
        'avg': np.zeros((steps, 3))
                        }
    # state machine
    WAIT_TONE, CHECK_TONE, WAIT_GAP, CHECK_GAP = 0, 1, 2, 3
    count, state = 0, WAIT_TONE
    decoded = ''

    for n in range(0, len(x)-M, stride):
        spectral_energy = np.abs(np.fft.fft(x[n:n+M])) ** 2
        # energy for each tone frequency (avg of center bin and two neighboring bins)
        #  this is a length-7 vector, one element per DTMF frequency value
        energy_per_tone = (spectral_energy[bins] + spectral_energy[bins-1] + spectral_energy[bins+1]) / 3
        # avg energy over all tone and non-ton bins
        mean_tone_erg = np.mean(energy_per_tone)
        mean_nont_erg = np.mean(spectral_energy[non_tone_bins])

        # time averaging of energy levels
        tone_level = (1 - t_gain) * tone_level + t_gain * mean_tone_erg
        nont_level = (1 - n_gain) * nont_level + n_gain * mean_nont_erg
        
        # check if current mean tone energy is 1/3 of the way up between non-tone and tone energy levels
        threshold = nont_level + (tone_level - nont_level) * 0.3
        tone_present = mean_tone_erg > threshold
        
        if state == WAIT_TONE:  
            if tone_present:
                tone_energy_histogram = energy_per_tone
                t_gain = GAIN_FAST  # update tone level quicker now
                count, state = 1, CHECK_TONE
        
        elif state == CHECK_TONE:  
            if tone_present:
                tone_energy_histogram += energy_per_tone
                count += 1
                if count > min_tone_len:
                    state = WAIT_GAP 
            else:
                # false alarm, let's go back to waiting
                t_gain = GAIN_SLOW
                state = WAIT_TONE  
        
        elif state == WAIT_GAP: 
            if tone_present:
                tone_energy_histogram += energy_per_tone
            else:
                decoded += freqs2key(tone_energy_histogram)
                t_gain = GAIN_SLOW
                count, state = 1, CHECK_GAP
        
        elif state == CHECK_GAP: 
            if tone_present:
                count = 0
            else:
                count += 1
                if count > min_gap_len:
                    state = WAIT_TONE
                    
        else:
            raise "non-existing state"
        
        ix = n // stride
        debug_data['ept'][ix] = energy_per_tone
        debug_data['tae'][ix] = mean_tone_erg
        debug_data['nae'][ix] = mean_nont_erg
        debug_data['avg'][ix] = [tone_level, nont_level, threshold]       

    if debug:
        plt.plot(debug_data['ept']);
        plt.plot(debug_data['tae'], 'C0', label="avg tone energy (dash: trend)");    
        plt.plot(debug_data['avg'][:,0], 'C0:');
        plt.plot(debug_data['nae'], 'C1', label="avg non-tone energy (dash: trend)");    
        plt.plot(debug_data['avg'][:,1], 'C1:');
        plt.plot(debug_data['avg'][:,2], 'C3:', label="threshold");
        plt.legend()

    return decoded

In [None]:
test_decoder('test', DTMF_dplus)

You can also look at the various signals in the decoder:

In [None]:
fs, noisy = read_test_file('test4.wav')
DTMF_dplus(noisy, Fs=fs, debug=True)

In [None]:
plt.plot(noisy);