# Using giotto-tda for persistent homology

First lets write a function to create nice data ;)

In [1]:
import numpy as np
def make_point_clouds(n_samples_per_shape: int, n_points: int, noise: float):
    """Make point clouds for circles, spheres, and tori with random noise.
    """
    circle_point_clouds = [
        np.asarray(
            [
                [np.sin(t) + noise * (np.random.rand(1)[0] - 0.5), np.cos(t) + noise * (np.random.rand(1)[0] - 0.5), 0]
                for t in range((n_points ** 2))
            ]
        )
        for kk in range(n_samples_per_shape)
    ]
    # label circles with 0
    circle_labels = np.zeros(n_samples_per_shape)

    sphere_point_clouds = [
        np.asarray(
            [
                [
                    np.cos(s) * np.cos(t) + noise * (np.random.rand(1)[0] - 0.5),
                    np.cos(s) * np.sin(t) + noise * (np.random.rand(1)[0] - 0.5),
                    np.sin(s) + noise * (np.random.rand(1)[0] - 0.5),
                ]
                for t in range(n_points)
                for s in range(n_points)
            ]
        )
        for kk in range(n_samples_per_shape)
    ]
    # label spheres with 1
    sphere_labels = np.ones(n_samples_per_shape)

    torus_point_clouds = [
        np.asarray(
            [
                [
                    (2 + np.cos(s)) * np.cos(t) + noise * (np.random.rand(1)[0] - 0.5),
                    (2 + np.cos(s)) * np.sin(t) + noise * (np.random.rand(1)[0] - 0.5),
                    np.sin(s) + noise * (np.random.rand(1)[0] - 0.5),
                ]
                for t in range(n_points)
                for s in range(n_points)
            ]
        )
        for kk in range(n_samples_per_shape)
    ]
    # label tori with 2
    torus_labels = 2 * np.ones(n_samples_per_shape)

    point_clouds = np.concatenate((circle_point_clouds, sphere_point_clouds, torus_point_clouds))
    labels = np.concatenate((circle_labels, sphere_labels, torus_labels))

    return point_clouds, labels


# esferas y toros ;)
n_samples_per_class = 10
point_clouds, labels = make_point_clouds(n_samples_per_class, 10, 0.1)
point_clouds.shape
print(f"There are {point_clouds.shape[0]} point clouds in {point_clouds.shape[2]} dimensions, "
      f"each with {point_clouds.shape[1]} points.")

There are 30 point clouds in 3 dimensions, each with 100 points.


# Calculate persistent homology

In [2]:
from gtda.homology import VietorisRipsPersistence
from gtda.plotting import plot_diagram

VR = VietorisRipsPersistence(homology_dimensions=[0, 1, 2])  # Parameter explained in the text
diagrams = VR.fit_transform(point_clouds)
# diagrams.shape

i = 0
plot_diagram(diagrams[0])
# print(diagrams[0])

# Extract features
Instantiate a ``PersistenceEntropy`` transformer and extract scalar features from the persistence diagrams.

In [3]:
from gtda.diagrams import PersistenceEntropy

PE = PersistenceEntropy()

# ejercicio cambiar Entropia por Amplitude
features = PE.fit_transform(diagrams)
print(features.shape, labels.shape)

(30, 3) (30,)


# Use the new features in a standard classifier
Leverage the compatibility with ``scikit-learn`` to perform a train-test split and score the features.

In [4]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split

print(labels)

X_train, X_valid, y_train, y_valid = train_test_split(features, labels)
model = RandomForestClassifier()
model.fit(X_train, y_train)
model.score(X_valid, y_valid)

[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 2. 2. 2. 2.
 2. 2. 2. 2. 2. 2.]


1.0

# Encapsulate the steps above in a pipeline
- Define an end-to-end pipeline by chaining transformers from giotto-tda with scikit-learn ones

- Train-test split the input point cloud data and labels.

- Fit the pipeline on the training data.

- Score the fitted pipeline on the test data.

In [5]:
from sklearn.pipeline import make_pipeline

steps = [VietorisRipsPersistence(homology_dimensions=[0, 1, 2]),
         PersistenceEntropy(),
         RandomForestClassifier()]

pipeline = make_pipeline(*steps)

pcs_train, pcs_valid, labels_train, labels_valid = train_test_split(point_clouds, labels)
pipeline.fit(pcs_train, labels_train)
pipeline.score(pcs_valid, labels_valid)

1.0

In [None]:
from gtda.time_series import TakensEmbedding
from gtda.homology import VietorisRipsPersistence
from gtda.diagrams import Amplitude
from gtda.plotting import plot_diagram
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
import numpy as np

# Generate dummy 1D time series data: shape (n_samples, n_timestamps)
n_samples = 100
n_timestamps = 500 #Si hacemos un cambio a 50 se hace más rápido
time_series = np.random.rand(n_samples, n_timestamps)
labels = np.random.randint(0, 2, size=n_samples)  # Binary classification

# Step 1: Takens Embedding for each time series
embedding = TakensEmbedding(time_delay=2, dimension=3)
embedded_point_clouds = embedding.fit_transform(time_series)

# Step 2: Vietoris–Rips persistence computation
VR = VietorisRipsPersistence(homology_dimensions=[0, 1, 2])
diagrams = VR.fit_transform(embedded_point_clouds)

# Optional: plot one diagram
plot_diagram(diagrams[0])

# Step 3: Use Amplitude for feature extraction
amplitude = Amplitude(metric='bottleneck')  # or 'wasserstein'
features = amplitude.fit_transform(diagrams)

# Step 4: Classification pipeline
X_train, X_valid, y_train, y_valid = train_test_split(features, labels, test_size=0.2, random_state=42)
model = RandomForestClassifier()
model.fit(X_train, y_train)

# Evaluate model
score = model.score(X_valid, y_valid)
print("Validation accuracy:", score)


Validation accuracy: 0.55


In [8]:
# Generate dummy 1D time series data: shape (n_samples, n_timestamps)
n_samples = 100
n_timestamps = 50 #Si hacemos un cambio a 50 se hace más rápido
time_series = np.random.rand(n_samples, n_timestamps)
labels = np.random.randint(0, 2, size=n_samples)  # Binary classification

# Step 1: Takens Embedding for each time series
embedding = TakensEmbedding(time_delay=2, dimension=3)
embedded_point_clouds = embedding.fit_transform(time_series)

# Step 2: Vietoris–Rips persistence computation
VR = VietorisRipsPersistence(homology_dimensions=[0, 1, 2])
diagrams = VR.fit_transform(embedded_point_clouds)

# Optional: plot one diagram
plot_diagram(diagrams[0])

# Step 3: Use Amplitude for feature extraction
amplitude = Amplitude(metric='wasserstein')
features = amplitude.fit_transform(diagrams)

# Step 4: Classification pipeline
X_train, X_valid, y_train, y_valid = train_test_split(features, labels, test_size=0.2, random_state=42)
model = RandomForestClassifier()
model.fit(X_train, y_train)

# Evaluate model
score = model.score(X_valid, y_valid)
print("Validation accuracy:", score)

Validation accuracy: 0.45


# Periodic or not periodic

In [7]:
import numpy as np
import plotly.graph_objects as go
from gtda.time_series import SingleTakensEmbedding
from gtda.plotting import plot_point_cloud
from gtda.homology import VietorisRipsPersistence


x_periodic = np.linspace(0, 10, 1000)
y_periodic = np.cos(5 * x_periodic)

fig = go.Figure(data=go.Scatter(x=x_periodic, y=y_periodic))
fig.update_layout(xaxis_title="Timestamp", yaxis_title="Amplitude")
fig.show()

embedding_dimension_periodic = 3
embedding_time_delay_periodic = 8
stride = 10

embedder_periodic = SingleTakensEmbedding(
    parameters_type="fixed",
    n_jobs=2,
    time_delay=embedding_time_delay_periodic,
    dimension=embedding_dimension_periodic,
    stride=stride,
)

y_periodic_embedded = embedder_periodic.fit_transform(y_periodic)
print(f"Shape of embedded time series: {y_periodic_embedded.shape}")
plot_point_cloud(y_periodic_embedded)


# reshape to keep the vietoris format
y_periodic_embedded = y_periodic_embedded[None, :, :]

# 0 - connected components, 1 - loops, 2 - voids
homology_dimensions = [0, 1, 2]

periodic_persistence = VietorisRipsPersistence(
    homology_dimensions=homology_dimensions, n_jobs=6
)
print("Persistence diagram for periodic signal")
periodic_persistence.fit_transform_plot(y_periodic_embedded)

Shape of embedded time series: (99, 3)
Persistence diagram for periodic signal


array([[[0.        , 0.02968527, 0.        ],
        [0.        , 0.02979543, 0.        ],
        [0.        , 0.03002368, 0.        ],
        [0.        , 0.03050811, 0.        ],
        [0.        , 0.0310097 , 0.        ],
        [0.        , 0.03119781, 0.        ],
        [0.        , 0.03184138, 0.        ],
        [0.        , 0.03283644, 0.        ],
        [0.        , 0.03369502, 0.        ],
        [0.        , 0.0344742 , 0.        ],
        [0.        , 0.03494403, 0.        ],
        [0.        , 0.03597357, 0.        ],
        [0.        , 0.03632426, 0.        ],
        [0.        , 0.03743265, 0.        ],
        [0.        , 0.03896736, 0.        ],
        [0.        , 0.04018315, 0.        ],
        [0.        , 0.04183634, 0.        ],
        [0.        , 0.04312626, 0.        ],
        [0.        , 0.04355394, 0.        ],
        [0.        , 0.04487357, 0.        ],
        [0.        , 0.04663508, 0.        ],
        [0.        , 0.04798774, 0

In [10]:
x_periodic1 = np.linspace(0, 10, 1000)
y_periodic1 = x_periodic1**2

fig = go.Figure(data=go.Scatter(x=x_periodic1, y=y_periodic1))
fig.update_layout(xaxis_title="Timestamp", yaxis_title="Amplitude")
fig.show()

embedding_dimension_periodic = 3
embedding_time_delay_periodic = 8
stride = 10

embedder_periodic = SingleTakensEmbedding(
    parameters_type="fixed",
    n_jobs=2,
    time_delay=embedding_time_delay_periodic,
    dimension=embedding_dimension_periodic,
    stride=stride,
)

y_periodic_embedded1 = embedder_periodic.fit_transform(y_periodic1)
print(f"Shape of embedded time series: {y_periodic_embedded1.shape}")
plot_point_cloud(y_periodic_embedded1)


# reshape to keep the vietoris format
y_periodic_embedded1 = y_periodic_embedded1[None, :, :]

# 0 - connected components, 1 - loops, 2 - voids
homology_dimensions = [0, 1, 2]

periodic_persistence = VietorisRipsPersistence(
    homology_dimensions=homology_dimensions, n_jobs=6
)
print("Persistence diagram for periodic signal")
periodic_persistence.fit_transform_plot(y_periodic_embedded1)

Shape of embedded time series: (99, 3)
Persistence diagram for periodic signal


array([[[0.        , 0.05998643, 0.        ],
        [0.        , 0.0930515 , 0.        ],
        [0.        , 0.12699771, 0.        ],
        [0.        , 0.16126958, 0.        ],
        [0.        , 0.19569609, 0.        ],
        [0.        , 0.23020788, 0.        ],
        [0.        , 0.26477158, 0.        ],
        [0.        , 0.29936925, 0.        ],
        [0.        , 0.33399031, 0.        ],
        [0.        , 0.36862817, 0.        ],
        [0.        , 0.4032785 , 0.        ],
        [0.        , 0.43793836, 0.        ],
        [0.        , 0.47260565, 0.        ],
        [0.        , 0.5072788 , 0.        ],
        [0.        , 0.54195672, 0.        ],
        [0.        , 0.57663858, 0.        ],
        [0.        , 0.61132365, 0.        ],
        [0.        , 0.64601147, 0.        ],
        [0.        , 0.68070155, 0.        ],
        [0.        , 0.71539366, 0.        ],
        [0.        , 0.75008744, 0.        ],
        [0.        , 0.78478265, 0