In [None]:
pip install plotly

In [None]:
pip install giotto-tda

As a warmup, let's generate some noisy signals with a constant SNR of 17.98. As shown in Table 1 of the article, this corresponds to an $R$ value of 0.65. By picking the upper end of the interval, we place ourselves in a favorable scenario and, thus, can gain a sense for what the best possible performance is for our time series classifier. We pick a small number of samples to make the computations run fast, but in practice would scale this by 1-2 orders of magnitude as done in the original article.

In [None]:
import numpy as np
from os import getcwd

mypath = getcwd()
dataset1_imag = np.load(mypath+'/Data/7-16-24/OneSubcarrierHighDataRate.npy').imag
dataset1_real = np.load(mypath+'/Data/7-16-24/OneSubcarrierHighDataRate.npy').real

dataset2_imag = np.load(mypath+'/Data/7-16-24/OneSubcarrierLowDataRate.npy').imag
dataset2_real = np.load(mypath+'/Data/7-16-24/OneSubcarrierLowDataRate.npy').real

dataset3_imag = np.load(mypath+'/Data/7-16-24/TwoSubcarriersHighDataRate.npy').imag
dataset3_real = np.load(mypath+'/Data/7-16-24/TwoSubcarriersHighDataRate.npy').real

dataset4_imag = np.load(mypath+'/Data/7-16-24/TwoSubcarriersLowDataRate.npy').imag
dataset4_real = np.load(mypath+'/Data/7-16-24/TwoSubcarriersLowDataRate.npy').real

print(dataset1_imag.shape)
print(dataset2_imag.shape)
print(dataset3_imag.shape)
print(dataset4_imag.shape)

def find_norm(data_real,data_imag,target_length):
    
    sample_num,data_length,category_num = data_real.shape
    
    dataset = np.zeros([sample_num,target_length,category_num])
    
    for i in range(category_num):
        for j in range(sample_num):
            dataset[j,:,i] = [np.sqrt(data_real[j,indx,i]**2+data_imag[j,indx,i]**2) for indx in range(target_length)]
    
    return dataset

len1 = 900
len2 = 3500

dataset1_norm = find_norm(dataset1_real,dataset1_imag,len1)

dataset2_norm = find_norm(dataset2_real,dataset2_imag,len2)

dataset3_norm = find_norm(dataset3_real,dataset3_imag,len1)

dataset4_norm = find_norm(dataset4_real,dataset4_imag,len2)


# dataset = np.zeros([dataset_real.shape[0]*dataset_real.shape[2],dataset_real.shape[1]-30])
# label = np.zeros([dataset_real.shape[0]*dataset_real.shape[2]])
# for i in range(dataset_real.shape[2]):
#     for j in range(dataset_real.shape[0]-30):
#         dataset[j+dataset_real.shape[0]*i,:] = [np.sqrt(dataset_real[j,indx,i]**2+dataset_imag[j,indx,i]**2) for indx in range(dataset_real.shape[1]-30)]
#         label[j+dataset_real.shape[0]*i] = i

# print(f"Number of noisy signals: {len(dataset)}")
# print(dataset.shape)


In [None]:
import matplotlib.pyplot as plt

plt.figure(1)
plt.plot(dataset3_norm[200,:,3])
# plt.figure(2)
# plt.plot(abs(rx_data))
plt.show()

In [None]:
sample_num,_,category_num = dataset1_norm.shape
dataset = np.zeros([sample_num*category_num,2*(len1+len2)])
label = np.zeros([sample_num*category_num])
for i in range(category_num):
    for j in range(sample_num):
        dataset[j+sample_num*i,:] =np.concatenate((np.concatenate((dataset1_norm[j,:,i],dataset2_norm[j,:,i])),np.concatenate((dataset3_norm[j,:,i],dataset4_norm[j,:,i]))))
        label[j+sample_num*i] = i

In [None]:
print(dataset.shape)
plt.figure(1)
plt.plot(dataset[1000,0:8500])
# plt.figure(2)
# plt.plot(abs(rx_data))
plt.show()

In [None]:
# Initialize an empty list to store moving averages
window_size = 5
dataset_avg= np.zeros([dataset.shape[0],dataset.shape[1]])
# Loop through the array to consider
# every window of size 3
for j in range(dataset.shape[0]):
    i=0
    while i < dataset.shape[1] - window_size +1:

        # Store elements from i to i+window_size
        # in list to get the current window
        window = dataset[j][i : i + window_size]

        # Calculate the average of current window
        window_average = sum(window) / window_size

        # Store the average of current
        # window in moving average list
        dataset_avg[j][i] = dataset[j][i]-window_average
        if abs(dataset_avg[j][i])>0.015:
                dataset_avg[j][i]=np.sign(dataset_avg[j][i])*0.015

        # Shift window to right by one position
        i += 1
#print(dataset_avg[4,:]) 

In [None]:
plt.figure(1)
plt.plot(dataset_avg[1000,0:8500])
# plt.figure(2)
# plt.plot(abs(rx_data))
plt.show()

Next let's visualise the two different types of time series that we wish to classify: one that is pure noise vs. one that is composed of noise plus an embedded gravitational wave signal:

In [None]:
from plotly.subplots import make_subplots
import plotly.graph_objects as go

fig = make_subplots(rows=1, cols=2)

fig.add_trace(
    go.Scatter(x=list(range(len(dataset_avg[0,:]))), y=dataset_avg[0,:], mode="lines", name="noise"),
    row=1,
    col=1,
)

fig.add_trace(
    go.Scatter(x=list(range(len(dataset_avg[84,:]))), y=dataset_avg[84,:], mode="lines", name="noise"),
    row=1,
    col=2,
)

fig.show()

We make two observations:
1. It is hard to distinguish the signal by eye,
2. The signal features some regularity or periodicity.

Both observations lead us to examining the _**Takens embedding**_ of the signal $s(t)$, in order to pick up the recurrent structure. Indeed, if $f$ is sampled from a dynamical system with a non-trivial recurrent structure, then, for appropriate parameters, the image by the embedding will have non-trivial topology.

More formally,, we extract a sequence of vectors in $\mathbb{R}^{d}$ of the form

$$
TD_{d,\tau} s : \mathbb{R} \to \mathbb{R}^{d}\,, \qquad t \to \begin{bmatrix}
           s(t) \\
           s(t + \tau) \\
           s(t + 2\tau) \\
           \vdots \\
           s(t + (d-1)\tau)
         \end{bmatrix},
$$
where $d$ is the embedding dimension and $\tau$ is the time delay. The quantity $(d-1)\tau$ is known as the "window size" and the difference between $t_{i+1}$ and $t_i$ is called the stride.

Let's examine what the time delay embedding of a pure gravitational wave signal looks like:

In [None]:
from gtda.time_series import SingleTakensEmbedding
embedding_dimension = 5
embedding_time_delay = 5
stride = 2

embedder = SingleTakensEmbedding(
    parameters_type="search", n_jobs=-1, time_delay=embedding_time_delay, dimension=embedding_dimension, stride=stride
)

y_gw_embedded = embedder.fit_transform(dataset_avg[0,:])

We can use PCA to project our high-dimensional space to 3-dimensions for visualisation:

In [None]:
from sklearn.decomposition import PCA
from gtda.plotting import plot_point_cloud

pca = PCA(n_components=2)
y_gw_embedded_pca = pca.fit_transform(y_gw_embedded)

plot_point_cloud(y_gw_embedded_pca)

From the plot we can see that the decaying periodic signal generated by a black hole merger emerges as a _spiral_ in the time delay embedding space! For contrast, let's compare this to one of the pure noise time series in our sample:

In [None]:
embedding_dimension = 5
embedding_time_delay = 5
stride = 2

embedder = SingleTakensEmbedding(
    parameters_type="search", n_jobs=5, time_delay=embedding_time_delay, dimension=embedding_dimension, stride=stride
)

y_noise_embedded = embedder.fit_transform(dataset_avg[11,:])
print(y_noise_embedded.shape)
pca = PCA(n_components=3)
y_noise_embedded_pca = pca.fit_transform(y_noise_embedded)

plot_point_cloud(y_noise_embedded_pca)

Evidently, pure noise resembles a high-dimensional ball in the time delay embedding space. Let's see if we can use persistent homology to tease apart which time series contain a gravitational wave signal versus those that don't. To do so we will adapt the strategy from the original article:

1. Generate 200-dimensional time delay embeddings of each time series
2. Use PCA to reduce the time delay embeddings to 3-dimensions
3. Use the Vietoris-Rips construction to calculate persistence diagrams of $H_0$ and $H_1$ generators
4. Extract feature vectors using persistence entropy
5. Train a binary classifier on the topological features

### Define the topological feature generation pipeline

We can do steps 1 and 2 by using the following ``giotto-tda`` tools:

- The ``TakensEmbedding`` transformer – instead of ``SingleTakensEmbedding`` – which will transform each time series in ``noisy_signals`` separately and return a collection of point clouds;
- ``CollectionTransformer``, which is a convenience "meta-estimator" for applying the same PCA to each point cloud resulting from step 1.

Using the ``Pipeline`` class from ``giotto-tda``, we can chain all operations up to and including step 4 as follows:

In [None]:

import sklearn


print('The scikit-learn version is {}.'.format(sklearn.__version__))

In [None]:
from gtda.diagrams import PersistenceEntropy, Scaler
from gtda.homology import VietorisRipsPersistence
from gtda.metaestimators import CollectionTransformer
from gtda.pipeline import Pipeline
from gtda.time_series import TakensEmbedding

embedding_dimension = 5
embedding_time_delay = 5
stride = 2

embedder = TakensEmbedding(time_delay=embedding_time_delay,
                           dimension=embedding_dimension,
                           stride=stride)

batch_pca = CollectionTransformer(PCA(n_components=5), n_jobs=-1)

persistence = VietorisRipsPersistence(homology_dimensions=[0, 1,2], n_jobs=-1)

scaling = Scaler()

entropy = PersistenceEntropy(normalize=True, nan_fill_value=-10)


steps = [("embedder", embedder),
         ("pca", batch_pca),
         ("persistence", persistence),
         ("scaling", scaling)]
         #("entropy", entropy)]
topological_transfomer = Pipeline(steps)

In [None]:
print(dataset_avg.shape)

In [None]:
features = topological_transfomer.fit_transform(dataset_avg)
print(features.shape)

In [None]:
from gtda.time_series import SingleTakensEmbedding

embedder = SingleTakensEmbedding(time_delay=5, dimension=5)
xt= embedder.fit_transform(dataset_avg[100])
xt=xt[None,:,:]
print(xt.shape)
xt = persistence.fit_transform(xt)
xt = scaling.fit_transform(xt)
persistence.plot(xt)

In [None]:
xt= embedder.fit_transform(dataset_avg[301])
xt=xt[None,:,:]
print(xt.shape)
xt = persistence.fit_transform(xt)
xt = scaling.fit_transform(xt)
persistence.plot(xt)

### Train and evaluate a model

For the final step, let's train a simple classifier on our topological features. As usual we create training and validation sets

In [None]:
from sklearn.model_selection import train_test_split

X_train, X_valid, y_train, y_valid = train_test_split(
    dataset_avg, label, test_size=0.3, random_state=42
)

and then fit and evaluate our model:

In [None]:
from sklearn.metrics import accuracy_score, roc_auc_score


def print_scores(fitted_model):
    res = {
        "Accuracy on train:": accuracy_score(fitted_model.predict(X_train), y_train),
        # "ROC AUC on train:": roc_auc_score(
        #     y_train, fitted_model.predict_proba(X_train)[:, 1]
        # ),
        "Accuracy on valid:": accuracy_score(fitted_model.predict(X_valid), y_valid),
        # "ROC AUC on valid:": roc_auc_score(
        #     y_valid, fitted_model.predict_proba(X_valid)[:, 1]
        # ),
    }

    for k, v in res.items():
        print(k, round(v, 3))

In [None]:
from sklearn.linear_model import LogisticRegression

model = LogisticRegression()
model.fit(X_train, y_train)
print_scores(model)

In [None]:
from sklearn import svm
model = svm.SVC(kernel='rbf')
model.fit(X_train, y_train)
print_scores(model)

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification

model = RandomForestClassifier(max_depth=2, random_state=1)
model.fit(X_train, y_train)
print_scores(model)

In [None]:
from sklearn.model_selection import train_test_split

X_train, X_valid, y_train, y_valid = train_test_split(
    features, label, test_size=0.2, random_state=42
)

As a simple baseline, this model is not too bad - it outperforms the deep learning baseline in the article which typically fares little better than random on the raw data. However, the combination of deep learning and persistent homology is where significant performance gains are seen - we leave this as an exercise to the intrepid reader!

In [None]:
from sklearn.pipeline import make_pipeline, make_union
from gtda.diagrams import PersistenceEntropy
from sklearn.pipeline import Pipeline
from gtda.diagrams import Amplitude
# Listing all metrics we want to use to extract diagram amplitudes
metric_list = [
    {"metric": "bottleneck", "metric_params": {}},
    {"metric": "wasserstein", "metric_params": {"p": 1}},
    {"metric": "wasserstein", "metric_params": {"p": 2}},
    {"metric": "landscape", "metric_params": {"p": 1, "n_layers": 1, "n_bins": 100}},
    {"metric": "landscape", "metric_params": {"p": 1, "n_layers": 2, "n_bins": 100}},
    {"metric": "landscape", "metric_params": {"p": 2, "n_layers": 1, "n_bins": 100}},
    {"metric": "landscape", "metric_params": {"p": 2, "n_layers": 2, "n_bins": 100}},
    {"metric": "betti", "metric_params": {"p": 1, "n_bins": 100}},
    {"metric": "betti", "metric_params": {"p": 2, "n_bins": 100}},
    {"metric": "heat", "metric_params": {"p": 1, "sigma": 1.6, "n_bins": 100}},
    {"metric": "heat", "metric_params": {"p": 1, "sigma": 3.2, "n_bins": 100}},
    {"metric": "heat", "metric_params": {"p": 2, "sigma": 1.6, "n_bins": 100}},
    {"metric": "heat", "metric_params": {"p": 2, "sigma": 3.2, "n_bins": 100}},
]

#
feature_union = make_union(
    *[PersistenceEntropy(nan_fill_value=-1)]
    + [Amplitude(**metric, n_jobs=-1) for metric in metric_list]
)

In [None]:
X_train_tda = feature_union.fit_transform(features)
X_train_tda.shape

In [None]:
from sklearn.model_selection import train_test_split

X_train, X_valid, y_train, y_valid = train_test_split(
    X_train_tda, label, test_size=0.2, random_state=42
)
from sklearn.metrics import accuracy_score, roc_auc_score


def print_scores(fitted_model):
    res = {
        "Accuracy on train:": accuracy_score(fitted_model.predict(X_train), y_train),
        # "ROC AUC on train:": roc_auc_score(
        #     y_train, fitted_model.predict_proba(X_train)[:, 1]
        # ),
        "Accuracy on valid:": accuracy_score(fitted_model.predict(X_valid), y_valid),
        # "ROC AUC on valid:": roc_auc_score(
        #     y_valid, fitted_model.predict_proba(X_valid)[:, 1]
        # ),
    }

    for k, v in res.items():
        print(k, round(v, 3))

In [None]:
from sklearn import svm
model = svm.SVC(kernel='rbf')
model.fit(X_train, y_train)
print_scores(model)

In [None]:
from sklearn.linear_model import LogisticRegression

model = LogisticRegression()
model.fit(X_train, y_train)
print_scores(model)

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification

model = RandomForestClassifier(max_depth=3, random_state=10)
model.fit(X_train, y_train)
print_scores(model)