In [44]:
import numpy as np
import matplotlib.pyplot as plt
from wave import open as open_wave

%matplotlib inline

In [45]:
# Config parameters
N_fft = 256
N = 3
td_scalar = 5
fd_scalar = 3
tone_duration = 0.04

DTMF_FREQ = [697, 770, 852, 941, 1209, 1336, 1477] # 1633Hz for A, B, C, D column tones
DTMF_KEYPAD = [["1", "2", "3"],
               ["4", "5", "6"],
               ["7", "8", "9"],
               ["*", "0", "#"]]

In [46]:
def plot_time(signal, title=""):
    ''' Plot sginal in time domain.
    '''
    fig, ax = plt.subplots(figsize=(16, 6))
    ax.set_title(title)
    ax.set_xlabel("Time [sec]")
    ax.set_ylabel("Magnitude")
    ax.plot(signal)
    
    
def plot_spectrum(freq, S, title=""):
    ''' Plot sginal in frequency domain.
    '''    
    fig, ax = plt.subplots(figsize=(16, 6))
    ax.set_title(title)
    ax.set_xlabel("Frequency [Hz]")
    ax.set_ylabel("Magnitude")
    ax.plot(freq, S)
    

def plot_signal_and_spectrum(s, fs):
    ''' Plot sginal in time domain and in frequency domain.
    '''
    plt.subplots(1, 2, figsize=(20, 6))

    plt.subplot(121)
    plt.plot(s)
    plt.xlabel("Time [sec]")
    plt.ylabel("Magnitude")
    plt.ylim([-1.5, 1.5])
    plt.grid()

    # spectrum of s (fs samples taken)
    # plot only the 1st half of spectrum (since it's symmetric)
    plt.subplot(122)

    amps = np.abs(np.fft.rfft(s, fs))
    freqs = np.fft.rfftfreq(fs, 1/fs)

    plt.xlabel("Freq [Hz]")
    plt.ylabel("Magnitude")
    plt.plot(freqs, amps)
    plt.grid()
    
    
def moving_average(data, window_width=3):
    ''' Moving average filter
    '''    
    cumsum_vec = np.cumsum(np.insert(data, 0, 0), dtype=float) 
    ma_vec = (cumsum_vec[window_width:] - cumsum_vec[:-window_width]) / window_width
    return ma_vec
    
    
def group_nearest(samples, threshold):
    """ [Generator function] Groups frequencies where the diff is lower than threshold.
    
    Parameters
    ----------
    samples : (ndarray)
        Sample candidates. Form (freq, amp)
    threshold: (integer)
        Threshold.
        
    Returns
    ----------
    object : (generator object)
        Grouped numpy array
    """ 
    grp = []
    last = samples[0][0] # First time idx
    for (time, amp) in samples:
        if time - last > threshold:
            yield np.mean(grp, axis=0)
            grp = []
        grp.append((time, amp))
        last = time
    yield np.mean(grp, axis=0)
    

def dtmf_freq_idx(samples_freq):
    """ Find the closest frequency from samples_freq to DTMF frequencies.
    
    Parameters
    ----------
    samples_freq : (ndarray)
        List of frequencies from fftfreq

    Returns
    ----------
    idxs : (list)
        List of indexes from DTMF_FREQ list that correspond to samples_freq
    """ 
    idxs = []
    for freq in samples_freq:
        idxs.append((np.abs(DTMF_FREQ - freq)).argmin())
    return idxs


def get_key(dtmf_idx):
    """ Get the key from DTMF keypad 
    
    Parameters
    ----------
    dtmf_idx : (integer)
        Index/es of DTMF keypad 

    Returns
    ----------
    key : (string)
        Key/Number from DTMF keypad
    """    
    row = dtmf_idx[0]
    col = dtmf_idx[1] - len(DTMF_KEYPAD)
    
    return DTMF_KEYPAD[row][col]
    
    
def normalize(array, amp=1.0):
    """ Normalizes a array so the maximum amplitude is +amp or -amp.
    
    Parameters
    ----------
    array : (ndarray)
        signal array   
    amp : (float)
        max amplitude (pos or neg) in result. Defaults to 1.0

    Returns
    ----------
    array : (ndarray)
        normilized signal array
    """
    high, low = abs(max(array)), abs(min(array))
    return amp * array / max(high, low)
                        

def read_wave(filename):
    """ Reads a wave file.
    
    Parameters
    ----------
    filename: (string)

    Returns
    ----------
    array : (ndarray)
        normilized signal array
    """
    fp = open_wave(filename, 'r')

    nchannels = fp.getnchannels()
    nframes = fp.getnframes()
    sampwidth = fp.getsampwidth()
    framerate = fp.getframerate()
    
    z_str = fp.readframes(nframes)
    
    fp.close()

    dtype_map = {1:np.int8, 2:np.int16}
    if sampwidth not in dtype_map:
        raise ValueError('sampwidth {} unknown'.format(sampwidth))
    
    array = np.fromstring(z_str, dtype=dtype_map[sampwidth])

    # if it's in stereo, just pull out the first channel
    if nchannels == 2:
        array = array[::2]

    return framerate, normalize(array), nframes
    

In [47]:
def find_candidates(sig):
    ''' Filter and find candidates whos amplitude are above threshold
        
    Parameters
    ----------
    sig: (ndarray)

    Returns
    ----------
    tone_candidates : (ndarray)
        Frequency and amplitude of the candidates
    """
    '''
    ## Executing convolution on smaller window is faster than moving average.
    ## But very computation time consuming on bigger window.
    # for N = 3
    # moving_average: 674 µs ± 13.3 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
    # convolve:       274 µs ± 44.3 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

    # for N = 1000
    # moving_average: 689 µs ± 46.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
    # convolve:       14 ms ± 166 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
    
    s_filtered = np.abs(np.convolve(sig, np.ones(N)/N, mode='valid'))
    s_filtered = normalize(s_filtered)
#     plot_time(s_filtered, title="Filtered Signal in Time Domain")

    tone_candidates = np.array([(sample, amp) for (sample, amp) in enumerate(s_filtered) if amp > np.mean(s_filtered) * td_scalar])
    # display(tone_candidates, len(tone_candidates), avg)
    return tone_candidates

In [55]:
def freq_analyze(s, td_pieces, smpl_freqs, smpl_duration):
    ''' Analyze each time piece in frequency domain.
        
    Parameters
    ----------
    s : (ndarray)
        Original signal
    td_pieces : (ndarray)
        Time Domain pieces (candidates) to decode
    smpl_freqs : (ndarray)
        Span of frequencies
    smpl_duration : (float)
        Duration of the sample, sample_rate x tone_time_duration
    Returns
    ----------
    freq_cand : (ndarray)
        Frequency candidates
    start : (integer)
        Start offset
    stop : (integer)
        Stop offset
    duration : (integer)
        Duration of the signal        
    '''
    decoded = []

    for td_piece in td_pieces:
        start = int(td_piece[0] - smpl_duration / 2)
        stop = int(td_piece[0] + smpl_duration / 2)
        duration = stop - start

        # Amplitudes of the samples after fft
        smpl_fft = np.abs(np.fft.rfft(s[start:stop], N_fft))
        smpl_fft = normalize(smpl_fft)
#         plot_spectrum(smpl_freqs, smpl_fft) 
    
        # Take only highest amp frequencies
        freq_cand = smpl_freqs[np.argwhere(smpl_fft > np.average(smpl_fft) * fd_scalar)]
        display(freq_cand)

        # Convert freq candidates to DTMF indexes
        DTMF_idx = list(set(dtmf_freq_idx(freq_cand)))
#         display(DTMF_idx)

        # Check if we have exactly two candidates
        if len(DTMF_idx) == 2:
            decoded.append(get_key(DTMF_idx))
        else:
            decoded.append(None)
        
    return decoded, start, stop, duration

In [59]:
def decode_signal(filename="phonecall.wav"):
    ''' Decodes DTMF signal.
        
    Parameters
    ----------
    filename: (string)
        
    Returns
    ----------
    decoded : (string)
        Decoded string
    '''

    fs, s, frames = read_wave(filename)
    
    smpl_duration = fs * tone_duration
    
    tone_candidates = find_candidates(s)
    grp_cand = list(group_nearest(tone_candidates, threshold=400))
    
    # Frequency span of the samples
    smpl_freqs = np.fft.rfftfreq(N_fft, 1/fs)
    
    # Analyze each time group (candidates)
    (decoded, start, stop, duration) = freq_analyze(s, grp_cand, smpl_freqs, smpl_duration)
    
    # Print appropriete message to the user
    if all(decoded):
        print("Decoded number are {}".format(decoded))
    elif any(decoded):
        print("Could not decode all candidates. Decoded numbers are:\n{}\nTry lowering the tone_duration".format(decoded))
    else:
        print("Could not decode DTMF from {} try lowering the tone_duration".format(filename))

In [58]:
decode_signal()

[array([ 364.5       ,    0.80736432]), array([  1.99200000e+03,   7.93079244e-01]), array([  3.44150000e+03,   8.48330297e-01]), array([  1.30753333e+04,   8.05967920e-01]), array([  2.30540000e+04,   8.45754055e-01]), array([  3.24160000e+04,   8.31446578e-01]), array([  4.20894000e+04,   7.93957406e-01]), array([  5.16526667e+04,   8.40706114e-01]), array([  6.10998750e+04,   8.42879118e-01]), array([  7.08515000e+04,   8.81866245e-01])]


array([[  937.5 ],
       [ 1343.75]])

array([[  750.  ],
       [  781.25],
       [ 1343.75]])

array([[  750.  ],
       [  781.25],
       [ 1187.5 ],
       [ 1218.75]])

array([[  750.  ],
       [  781.25],
       [ 1468.75]])

array([[  781.25],
       [ 1187.5 ],
       [ 1218.75]])

array([[  687.5 ],
       [  718.75],
       [ 1343.75]])

array([[  843.75],
       [ 1187.5 ],
       [ 1218.75]])

array([[  687.5 ],
       [ 1468.75]])

array([[  687.5 ],
       [ 1187.5 ],
       [ 1218.75]])

array([[  750.  ],
       [  781.25],
       [ 1468.75]])

Decoded number are ['0', '5', '4', '6', '4', '2', '7', '3', '1', '6']
