# Setup

**Imports**

In [10]:
import datetime as dt
from math import log
import IPython

import matplotlib.pyplot as plt
import numpy as np
import ruptures as rpt

import seaborn as sns
from scipy import signal

rng = np.random.default_rng()

# Question 1

The following cell loads the training data set `X_train` and `y_train`.
`X_train` is a list of 100 signals; `y_train` is a list of 100 symbol sequences. 

The signals have a varying number of symbols with a varying duration. 
There is a brief silence between each symbol.
The sampling frequency is $22.05 $ kHz.

In [11]:
FS = 22050  # sampling frequency (Hz)

X_train = np.load("X_train.npy", allow_pickle=True).tolist()
y_train = np.load("y_train.npy", allow_pickle=True).tolist()

In [12]:
signal, symbols = X_train[2], y_train[2]
print(" ".join(symbols))
IPython.display.Audio(signal, rate=FS)

D 6 A 8 3 D 1 8 B 9


### Dual-tone Multi-frequency Signaling

**Dual-tone multi-frequency signaling** is a procedure to encode symbols using an audio signal. The possible symbols are 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, *, #, A, B, C, and D. A symbol is represented by a sum of cosine waves: for \( t = 0, 1, \dots, T - 1 \),

$$
y_t = \cos\left( \frac{2\pi f_1 t}{f_s} \right) + \cos\left( \frac{2\pi f_2 t}{f_s} \right)
$$
where each combination of \( (f_1, f_2) \) represents a symbol. The first frequency has four different levels (low frequencies), and the second frequency has four other levels (high frequencies); there are 16 possible combinations.

By reading the page from the [link](https://en.wikipedia.org/wiki/DTMF), we find the DTMF Keypad Frequencies (with sound clips) table:

|          | 1209 Hz | 1336 Hz | 1477 Hz | 1633 Hz |
|----------|---------|---------|---------|---------|
| **697 Hz** | 1       | 2       | 3       | A       |
| **770 Hz** | 4       | 5       | 6       | B       |
| **852 Hz** | 7       | 8       | 9       | C       |
| **941 Hz** | *       | 0       | #       | D       |

We use the `ruptures` package for change-point detection.



In [53]:
# !pip install ruptures
import ruptures as rpt

According to the lectures, in the context of change-point detection with L2 cost function, these criteria provide estimates for the $\beta$ parameter:

- **Bayesian information criterion (BIC) for L2 change-point detection**
  
  $$
  \beta = 4\sigma^2 \log N
  $$

- **Akaike information criterion (AIC) for L2 change-point detection**

  $$
  \beta = 4\sigma^2
  $$

On fixe $\sigma=2$.

In [54]:
DTMF_keypad = {
                "1": (697, 1209),
                "2": (697, 1336),
                "3": (697, 1477),
                "4": (770, 1209),
                "5": (770, 1336),
                "6": (770, 1477),
                "7": (852, 1209),
                "8": (852, 1336),
                "9": (852, 1477),
                "0": (941, 1336),
                "*": (941, 1209),
                "#": (941, 1477),
                "A": (697, 1633),
                "B": (770, 1633),
                "C": (852, 1633),
                "D": (941, 1633),
            }
FS = 22050  # Hz
sigma = 2

In [51]:
sigma = 2
pred_bkps_list = []
for i in range(len(X_train)):
    f, t, Sxx = signal.spectrogram(X_train[i], FS, nperseg=512, noverlap=6)
    pen_bic = 4 * sigma**2 * np.log(len(X_train[i]))
    algo = rpt.Dynp(model="l2", min_size=2, jump=1).fit(Sxx.T)
    predicted_bkps = algo.predict(n_bkps=len(y_train[i]))
    for j in range(len(y_train[i])):
        dic[y_train[i][j]].append((f[predicted_bkps[j]], f[predicted_bkps[j+1]]))
    pred_bkps_list.append(predicted_bkps)

In [52]:
dic

{'0': [(1119.7265625, 1421.19140625),
  (3617.578125, 3919.04296875),
  (947.4609375, 1335.05859375),
  (516.796875, 775.1953125),
  (1593.45703125, 2110.25390625),
  (1291.9921875, 1593.45703125),
  (2368.65234375, 3316.11328125),
  (947.4609375, 1593.45703125),
  (904.39453125, 2024.12109375),
  (947.4609375, 1851.85546875),
  (2540.91796875, 2670.1171875),
  (3832.91015625, 4134.375),
  (1162.79296875, 1894.921875),
  (3531.4453125, 3832.91015625),
  (947.4609375, 1076.66015625),
  (1076.66015625, 1291.9921875),
  (3402.24609375, 3919.04296875),
  (258.3984375, 559.86328125),
  (2024.12109375, 3014.6484375),
  (1162.79296875, 1464.2578125),
  (732.12890625, 861.328125),
  (861.328125, 1636.5234375),
  (1636.5234375, 1937.98828125),
  (2842.3828125, 3100.78125),
  (258.3984375, 559.86328125),
  (3359.1796875, 3660.64453125),
  (473.73046875, 775.1953125),
  (2024.12109375, 2282.51953125),
  (990.52734375, 1248.92578125),
  (1248.92578125, 1378.125),
  (2239.453125, 2454.78515625),
  

In [48]:
f[1]

43.06640625

In [None]:
dic = {}
for index, bkpts_pred in enumerate(pred_bkps_list):


[[8, 12, 22, 38],
 [8, 10, 33, 41, 56, 70, 77, 85],
 [8, 12, 27, 34, 40, 46, 60, 62, 65, 68, 80],
 [14, 21, 28, 32, 46, 54, 73, 79, 92],
 [7, 12, 18, 25, 33, 37, 49, 81, 87, 106],
 [8, 10, 26, 33, 39],
 [5, 13, 35, 41, 59, 66, 73, 80, 95, 100, 113],
 [12, 20, 27, 49, 57, 65, 73, 87, 94, 110],
 [13, 18, 23, 30, 44],
 [12, 18, 32, 36, 44, 48],
 [20, 26, 33, 41, 63, 67, 84, 91, 107, 112, 130],
 [5, 17, 25, 34, 37, 45],
 [24, 29, 35, 42, 48, 64, 71, 84, 91, 97, 104, 118],
 [14, 22, 31, 35, 46, 56],
 [12, 18, 24, 30, 37, 45, 63, 71, 86, 92, 119],
 [17, 25, 33, 37, 49, 60, 66, 83],
 [13, 22, 30, 37, 44, 52, 74, 78, 95, 103],
 [20, 30, 37, 44],
 [23, 30, 48, 55, 77, 85, 93],
 [9, 15, 21, 27],
 [3, 11, 25, 31, 53, 59, 66, 71, 87, 95, 107],
 [15, 22, 37, 42, 47],
 [9, 22, 29, 47, 52, 57, 65, 78, 85, 99, 107, 122],
 [21, 27, 39, 45, 51, 57, 77, 82, 90, 98],
 [8, 15, 35, 39, 58, 61, 72, 88, 95, 119],
 [10, 18, 25, 40, 44, 55, 62, 70, 77, 95],
 [19, 25, 32, 44],
 [9, 15, 34, 42],
 [14, 21, 47, 53,

In [38]:
sigma = 2
f, t, Sxx = signal.spectrogram(X_train[0], FS, nperseg=512, noverlap=6)

pen_bic = 4 * sigma**2 * np.log(len(X_train[0]))
algo = rpt.Pelt(model="l1", jump=1, min_size=2)
predicted_bkps = algo.fit_predict(signal=Sxx.T, pen=pen_bic)

In [37]:
predicted_bkps
y_train[0]

['5', 'C', '3']

In [26]:
predicted_bkps

[2, 5, 8, 11, 14, 17, 20, 23, 26, 29, 32, 35, 38]

In [None]:
def predict_from_signal(signal, nperseg= 512, noverlap = 6, min_size = 2, min_break_time = 0.2, min_energy = 0.1, sigma=2):
    """Predicts a list of symbols from a signal

    Args:
        signal (np.ndarray): signal (with noise ?)
        nperseg (int, optional): Length of each segment in the spectrogram. Defaults to 512.
        noverlap (int, optional): Number of points to overlap between segments in the spectrogram. Defaults to 6.
        min_size (int, optional): Minimum size of symbol in the spectrogram. Defaults to 2.
        min_break_time (float, optional): Minimum break time between 2 symbols, without being merged. Defaults to 0.2.
        min_energy (float, optional): Minimum energy of a symbol in the spectrogram for it to be considered. Defaults to 0.1.
        ax (Axes, optional): Axes to plot. Defaults to None.

    Returns:
        List[str]: list of symbols
    """
    # Estimate penalty
    pen_bic = 0.001 * sigma**2 * np.log(len(signal))

    # Compute spectrogram
    f, t, Sxx = signal.spectrogram(signal, FS, nperseg=nperseg, noverlap=noverlap)

    # Compute studied windows
    min_freq = 650
    max_freq = 1800
    idx_min_freq = np.argmin(np.abs(f - min_freq))
    idx_max_freq = np.argmin(np.abs(f - max_freq))

    # Compute minimum and maximum distance (in Hz) between 2 frequencies
    # Those are the min and max distances found in the DTMF
    min_freq_dist = 250
    max_freq_dist = 950

    # Initialize algorithm
    algo = rpt.Pelt(model="l1", jump=1, min_size=min_size)

    # Fit and predict on signal
    predicted_bkps = algo.fit_predict(signal=Sxx.T, pen=pen_bic)

    # Display ?
    if ax is not None:
        ax.pcolormesh(t, f[0:100], Sxx[0:100, :])
        ax.set_ylabel("Frequency [Hz]")
        ax.set_xlabel("Time [sec]")

    # We iterate every 2 breakpoints
    # Odd breakpoints show symbols, even breakponts show silences
    predictions_segments = []
    for idx_bkp in range(0, len(predicted_bkps[:-2]), 2):
        # define start and end time
        start_time = t[predicted_bkps[idx_bkp]]
        end_time = (
            t[predicted_bkps[idx_bkp + 1]]
            if predicted_bkps[idx_bkp + 1] < len(t)
            else t[-1]
        )

        # define zone in spectrogram
        Sxx_zone = Sxx[
            idx_min_freq:idx_max_freq,
            predicted_bkps[idx_bkp] : predicted_bkps[idx_bkp + 1],
        ]

        # find the 2 frequencies with maximum energy in the spectrogram
        energy = np.mean(Sxx_zone, axis=1)
        freqs = f[idx_min_freq + np.argsort(energy)[-2:]]

        # if the 2 found frequencies are too far aport, or not enough, we discard them
        if (
            np.abs(freqs[0] - freqs[1]) < min_freq_dist
            or np.abs(freqs[0] - freqs[1]) > max_freq_dist
        ):
            continue

        # compute the closest frequencies in the DTMF
        distances_f1 = np.min(
            np.abs(f1[:None, None] - freqs[None, None, :]), axis=(0, 2)
        )
        distances_f2 = np.min(
            np.abs(f2[:None, None] - freqs[None, None, :]), axis=(0, 2)
        )
        pred_f1 = np.argmin(distances_f1)
        pred_f2 = np.argmin(distances_f2)

        # save segment
        predictions_segments.append(
            {
                "freqs": freqs,
                "symbol": touches[pred_f1][pred_f2],
                "start_time": start_time,
                "end_time": end_time,
                "energy": np.sum(np.sort(energy)[-2:]) / (end_time - start_time),
            }
        )

    # merge segments if they are close and have the same symbol
    whitelisted_segments = [predictions_segments[0]]
    for idx, next_segment in enumerate(predictions_segments[1:]):
        if (
            next_segment["symbol"] == whitelisted_segments[-1]["symbol"]
            and next_segment["start_time"] - whitelisted_segments[-1]["end_time"]
            < min_break_time
        ):
            whitelisted_segments[-1]["end_time"] = next_segment["end_time"]
        else:
            whitelisted_segments.append(next_segment)

    # only keep segment with enough energy
    whitelisted_segments = [
        segment for segment in whitelisted_segments if segment["energy"] > min_energy
    ]

    # display ?
    if ax is not None:
        for segment in whitelisted_segments:
            for freq in segment["freqs"]:
                ax.axhline(
                    freq,
                    xmin=segment["start_time"] / t[-1],
                    xmax=segment["end_time"] / t[-1],
                    color="red",
                )
            ax.text(
                segment["start_time"],
                np.max(segment["freqs"]) + 200,
                segment["symbol"],
                size=30,
                color="red",
            )

    predicted_symbols = [segment["symbol"] for segment in whitelisted_segments]
    return predicted_symbols

# Question 2

In [28]:
X_test = np.load("X_test.npy", allow_pickle=True).tolist()

In [29]:
X_test

[array([-0.11986354,  1.59285965, -0.01986525, ..., -1.63190721,
         0.62423732, -1.5106623 ]),
 array([ 0.95018612, -0.71438407,  1.26761872, ..., -2.38157518,
         2.27365674, -1.30848881])]

# Question 3

# Question 4

# Question 5

# Question 6

# Question 7