**Note:** This notebook is based on a notebook authored by Chris Tralie, which can be found at: https://github.com/ctralie/TDALabs/blob/master/SlidingWindow2-PersistentHomology.ipynb

In [None]:
!pip install ripser

In [2]:
# necessary imports
import numpy as np
from ripser import ripser
from persim import plot_diagrams
import matplotlib.pyplot as plt
from matplotlib import gridspec
from mpl_toolkits.mplot3d import Axes3D
from sklearn.decomposition import PCA
from scipy.interpolate import InterpolatedUnivariateSpline
import scipy.io.wavfile
import ipywidgets as widgets
from IPython.display import display
import warnings
warnings.filterwarnings('ignore')
from IPython.display import clear_output
import plotly.express as px

## Sliding Window Code
The code below performs a sliding window embedding on a 1D signal.  The parameters are as follows:

| | |
|:-:|---|
|$x$   | The 1-D signal (numpy array)   |
|dim|The dimension of the embedding|
|$\tau$   | The skip between samples in a given window  |
|$dT$   | The distance to slide from one window to the next  |

That is, along the signal given by the array $x$, the first three windows will will be $$\begin{bmatrix} x(\tau)\\ x(2\tau) \\ \ldots \\ x((\mbox{dim}-1)\cdot\tau)\end{bmatrix},  \begin{bmatrix} x(dT + \tau)\\ x(dT +2\tau) \\ \ldots \\ x(dT +(\mbox{dim}-1)\cdot\tau)\end{bmatrix},  \begin{bmatrix} x(2dT + \tau)\\ x(2dT +2\tau) \\ \ldots \\ x(2dT +(\mbox{dim}-1)\cdot\tau)\end{bmatrix}$$

The function *getSlidingWindow* below creates an array $X$ containing the windows as its columns.

In [3]:
def getSlidingWindow(x, dim, Tau, dT):
    """
    Return a sliding window of a time series,
    using arbitrary sampling.  Use linear interpolation
    to fill in values in windows not on the original grid
    Parameters
    ----------
    x: ndarray(N)
        The original time series
    dim: int
        Dimension of sliding window (number of lags+1)
    Tau: float
        Length between lags, in units of time series
    dT: float
        Length between windows, in units of time series
    Returns
    -------
    X: ndarray(N, dim)
        All sliding windows stacked up
    """
    N = len(x)
    NWindows = int(np.floor((N-dim*Tau)/dT))
    if NWindows <= 0:
        print("Error: Tau too large for signal extent")
        return np.zeros((3, dim))
    X = np.zeros((NWindows, dim))
    spl = InterpolatedUnivariateSpline(np.arange(N), x)
    for i in range(NWindows):
        idxx = dT*i + Tau*np.arange(dim)
        start = int(np.floor(idxx[0]))
        end = int(np.ceil(idxx[-1]))+2
        # Only take windows that are within range
        if end >= len(x):
            X = X[0:i, :]
            break
        X[i, :] = spl(idxx)
    return X

# Example 1: sliding window embedding of a signal with a single frequency


In [None]:
# the following code creates signal1, computes a representation of the sliding window embedding, and the persistent homology of this pointcloud
# use the parameters to see their effect on the output

dimsliderval = 20
tausliderval = 1
noiseval = 0
noise = np.random.randn(10000)
signal1 = None

def on_value_change(change):

    global dimsliderval
    global tausliderval
    global noiseval

    global dimslider
    global Tauslider
    global noiseampslider

    dimsliderval = dimslider.value
    tausliderval = Tauslider.value
    noiseval = noiseampslider.value

    execute_computation1()

def execute_computation1():
    global dimsliderval
    global tausliderval
    global noiseval

    global dimslider
    global Tauslider
    global noiseampslider

    global signal1

    clear_output(wait=True)

    dimslider = widgets.IntSlider(min=1,max=30,value=dimsliderval,description='Dimension:',continuous_update=False)
    dimslider.observe(on_value_change, names='value')

    Tauslider = widgets.FloatSlider(min=0.1,max=5,step=0.1,value=tausliderval,description=r'tau :' ,continuous_update=False)
    Tauslider.observe(on_value_change, names='value')

    noiseampslider = widgets.FloatSlider(min=0,max=2,step=0.1,value=noiseval,description='Noise Amplitude',continuous_update=False)
    noiseampslider.observe(on_value_change, names='value')

    fig = plt.figure(figsize=(9.5, 4))

    display(widgets.HBox(( dimslider,Tauslider, noiseampslider)))

    plt.clf()
    # Step 1: Setup the signal
    T = 40 # The period in number of samples
    NPeriods = 4 # How many periods to go through
    N = T*NPeriods # The total number of samples
    t = np.linspace(0, 2*np.pi*NPeriods, N+1)[0:N] # Sampling indices in time
    signal1 = np.cos(t) # The final signal
    signal1 += noiseampslider.value * noise[:len(signal1)]

    # Step 2: Do a sliding window embedding
    dim = dimslider.value
    Tau = Tauslider.value
    dT = 0.5
    X = getSlidingWindow(signal1, dim, Tau, dT)
    extent = Tau*dim

    # Step 3: Do Rips Filtration
    PDs = ripser(X, maxdim=1)['dgms']
    I = PDs[1]

    # Step 4: Perform PCA down to 2D for visualization
    pca = PCA(n_components = 2)
    Y = pca.fit_transform(X)
    eigs = pca.explained_variance_

    # Step 5: Plot original signal, 2-D projection, and the persistence diagram
    gs = gridspec.GridSpec(2, 2)
    ax = plt.subplot(gs[0,0])
    ax.plot(signal1)
    ax.set_ylim((2*min(signal1), 2*max(signal1)))
    ax.set_title("Original Signal")
    ax.set_xlabel("Sample Number")
    yr = np.max(signal1)-np.min(signal1)
    yr = [np.min(signal1)-0.1*yr, np.max(signal1)+0.1*yr]
    ax.plot([extent, extent], yr, 'r')
    ax.plot([0, 0], yr, 'r')     
    ax.plot([0, extent], [yr[0]]*2, 'r')
    ax.plot([0, extent], [yr[1]]*2, 'r')

    ax2 = plt.subplot(gs[1,0])
    plot_diagrams(PDs)
    plt.title("Max Persistence = %.3g"%np.max(I[:, 1] - I[:, 0]))

    ax3 = plt.subplot(gs[:,1])
    ax3.scatter(Y[:, 0], Y[:, 1])
    plt.axis('equal')
    plt.title("2-D PCA, Eigenvalues: %.3g, %.3g "%(eigs[0],eigs[1]))

    plt.tight_layout()
  
execute_computation1()

# Example 2: sliding window embedding of a signal which is a sum of two frequencies


In [None]:
secondfreqval2 = 3
dimsliderval2 = 20
tausliderval2 = 1
noiseval2 = 0
signal2 = None

def on_value_change2(change):
    global dimsliderval2
    global tausliderval2
    global noiseval2
    global secondfreqval2

    global dimslider2
    global Tauslider2
    global noiseampslider2
    global secondfreq2

    dimsliderval2 = dimslider2.value
    tausliderval2 = Tauslider2.value
    noiseval2 = noiseampslider2.value
    secondfreqval2 = secondfreq2.value

    execute_computation3()


noise = np.random.randn(10000)


def execute_computation3():
    global dimsliderval2
    global tausliderval2
    global noiseval2
    global secondfreqval2

    global dimslider2
    global Tauslider2
    global noiseampslider2
    global secondfreq2
    global signal2

    clear_output(wait=True)

    fig = plt.figure(figsize=(9.5, 5))

    secondfreq2 = widgets.Dropdown(options=[ 2, 3, np.pi],value=secondfreqval2,description='Second Frequency:',disabled=False)
    secondfreq2.observe(on_value_change2,names='value')

    noiseampslider2 = widgets.FloatSlider(min=0,max=2,step=0.1,value=noiseval2,description='Noise Amplitude',continuous_update=False)
    noiseampslider2.observe(on_value_change2, names='value')

    dimslider2 = widgets.IntSlider(min=1,max=30,value=dimsliderval2,description='Dimension:',continuous_update=False)
    dimslider2.observe(on_value_change2, names='value')

    Tauslider2 = widgets.FloatSlider(min=0.1,max=4.5,step=0.1,value=tausliderval2,description=r'tau :' ,continuous_update=False)
    Tauslider2.observe(on_value_change2, names='value')

    display(widgets.HBox(( dimslider2,Tauslider2)))
    display(widgets.HBox(( secondfreq2,noiseampslider2)))

    # Step 1: Setup the signal
    T1 = 10 # The period of the first sine in number of samples
    T2 = T1*secondfreq2.value # The period of the second sine in number of samples
    NPeriods = 5 # How many periods to go through, relative to the second sinusoid
    N = T2*NPeriods # The total number of samples
    t = np.arange(N) # Time indices
    signal2 = np.cos(2*np.pi*(1.0/T1)*t) # The first sinusoid
    signal2 += np.cos(2*np.pi*(1.0/T2)*t) # The second sinusoid
    signal2 += noiseampslider2.value * noise[:len(signal2)]

    #Step 2: Do a sliding window embedding
    dim = dimslider2.value
    Tau = Tauslider2.value
    dT = 0.35
    X = getSlidingWindow(signal2, dim, Tau, dT)
    extent = Tau*dim

    #Step 3: Do Rips Filtration
    if X.shape[0] > 200 :
      PDs = ripser(X, maxdim=2, n_perm=200)['dgms']
    else :
      PDs = ripser(X, maxdim=2)['dgms']

    #Step 4: Perform PCA down to 2D for visualization
    pca = PCA()
    Y = pca.fit_transform(X)
    eigs = pca.explained_variance_

    #Step 5: Plot original signal and the persistence diagram
    gs = gridspec.GridSpec(3, 2,width_ratios=[1, 2],height_ratios=[2,2,1])
    
    ax = plt.subplot(gs[0,1])
    ax.plot(signal2)
    ax.set_ylim((1.25*min(signal2), 1.25*max(signal2)))
    ax.set_title("Original Signal")
    yr = np.max(signal2)-np.min(signal2)
    yr = [np.min(signal2)-0.1*yr, np.max(signal2)+0.1*yr]
    ax.plot([extent, extent], yr, 'r')
    ax.plot([0, 0], yr, 'r')     
    ax.plot([0, extent], [yr[0]]*2, 'r')
    ax.plot([0, extent], [yr[1]]*2, 'r')

    ax2 = plt.subplot(gs[0:2,0])
    plot_diagrams(PDs)


    c = plt.get_cmap('jet')
    C = c(np.array(np.round(np.linspace(0, 255, Y.shape[0])), dtype=np.int32))
    C = C[:, 0:3]
    ax4 = fig.add_subplot(gs[1:,1], projection = '3d')
    ax4.set_title("PCA of Sliding Window Embedding")
    ax4.scatter(Y[:, 0], Y[:, 1], Y[:, 2], c=C)
    plt.tight_layout()

execute_computation3()

# **Optional**: An example of dissonance detection
We use persistence homology to detect dissonance in music.
We dowload a recording of a tritone, which consists of an interval (two notes played at the same time), in which the second frequency is $\sqrt{2}$ times the first frequency. In particular, the two frequencies are inconmensurable over the rational numbers, and thus one should be able to find parameters for the sliding window embedding to get a 2D torus.

To solve this exercise, you may want to take a look at [Sliding Window Persistence of Quasiperiodic Functions](https://arxiv.org/pdf/2103.04540.pdf). Gakhar, Perea, 2021

In [11]:
! wget https://github.com/LuisScoccola/emalca-applied-topology-minicourse/blob/main/data/tritone.wav?raw=true -O tritone.wav

--2021-11-17 13:25:10--  https://github.com/LuisScoccola/emalca-applied-topology-minicourse/blob/main/data/tritone.wav?raw=true
Resolving github.com (github.com)... 140.82.114.4
Connecting to github.com (github.com)|140.82.114.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://github.com/LuisScoccola/emalca-applied-topology-minicourse/raw/main/data/tritone.wav [following]
--2021-11-17 13:25:10--  https://github.com/LuisScoccola/emalca-applied-topology-minicourse/raw/main/data/tritone.wav
Reusing existing connection to github.com:443.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/LuisScoccola/emalca-applied-topology-minicourse/main/data/tritone.wav [following]
--2021-11-17 13:25:10--  https://raw.githubusercontent.com/LuisScoccola/emalca-applied-topology-minicourse/main/data/tritone.wav
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133,

In [None]:
sample_rate, X = scipy.io.wavfile.read("tritone.wav")
signal3 = X[10000:40000,0]/np.max(X[10000:40000,0])

plt.figure(figsize=(10,2))
plt.plot(np.arange(len(signal3))/float(sample_rate), signal3)
plt.xlabel("Time (Seconds)")
plt.title("Tritone Waveform")
plt.show()

from IPython.display import Audio
# load a remote WAV file
Audio('tritone.wav')

In [None]:
# we can get the frequencies of each signal above by computing the Fourier transform.
# each peak corresponds to a sinusoid, and its position determines its frequency.

# interpret this plot!

def plotFourierTransform(signal, sample_rate, max_frequency):
  x = signal
  plt.figure(figsize=(10,2))
  P1 = np.fft.fftshift(np.abs(np.fft.fft(x)))/len(x)
  freq = np.fft.fftshift(np.fft.fftfreq(len(x), d = 1/sample_rate))
  plt.plot(freq, P1)
  plt.xlim((0,max_frequency))
  plt.xlabel("Frequency in Hz")
  plt.locator_params(axis='x', nbins=25)
  plt.show()

plotFourierTransform(signal3, sample_rate, 1000)

In [None]:
# Find the torus!

# Change the following parameters:
tau = 0.028
dim = 8


# sliding window
Y = getSlidingWindow(signal3, dim, tau * sample_rate, 1)
# persistence diagram
PDs = ripser(Y, maxdim=2, thresh = 1.2, n_perm = 2200)
PD = PDs['dgms']
plt.figure()
plot_diagrams(PD)