In [1]:
import numpy as np

from gtda.homology import VietorisRipsPersistence
from gtda.time_series import SingleTakensEmbedding
from gtda.diagrams import BettiCurve, PersistenceLandscape, PairwiseDistance, PersistenceEntropy
from gtda.plotting import plot_point_cloud, plot_diagram
from gtda.pipeline import Pipeline

from sklearn.ensemble import RandomForestClassifier

from pathlib import Path
from sklearn.datasets import make_circles
import plotly.graph_objects as go

Облако точек

In [2]:
np.random.seed(seed=42)

X = np.asarray([
    make_circles(100, factor=np.random.random())[0]
    for i in range(10)
])

In [3]:
i = 4
plot_point_cloud(X[i])

In [4]:
VR = VietorisRipsPersistence()
Xt = VR.fit_transform(X)
VR.plot(Xt, sample=i)

In [5]:
BC = BettiCurve()
X_betti_curve = BC.fit_transform_plot(Xt)

In [6]:
PL = PersistenceLandscape()
X_persistence_landscape = PL.fit_transform_plot(Xt)

 Временные ряды

In [7]:
def basic_time_series(n, b, y):
    r = np.random.rand(n)
    j = 2 * np.pi + np.arcsin(r)
    t = np.arange(n)
    x = b * np.sin(y * t + j)
    return x


def noisy_time_series(n, b, y):
    r = np.random.rand(n)
    j = 2 * np.pi * r
    t = np.arange(n)
    x = b * np.sin(y * t + j)
    return x

In [8]:
n = 500
b = 1.5
y = 1

basic_ts = basic_time_series(n, b, y)
noisy_ts = noisy_time_series(n, b, y)

In [9]:
fig = go.Figure(data=go.Scatter(y=basic_ts))
fig.update_layout(xaxis_title="Timestamp", yaxis_title="Amplitude")
fig.show()

In [10]:
fig = go.Figure(data=go.Scatter(y=noisy_ts))
fig.update_layout(xaxis_title="Timestamp", yaxis_title="Amplitude")
fig.show()

In [11]:
embedding_dimension_periodic = 3
embedding_time_delay_periodic = 8

embedder = SingleTakensEmbedding(
    parameters_type="fixed",
    time_delay=embedding_time_delay_periodic,
    dimension=embedding_dimension_periodic
)

basic_point_cloud = embedder.fit_transform(basic_ts)
noisy_point_cloud = embedder.fit_transform(noisy_ts)

In [12]:
plot_point_cloud(basic_point_cloud).show()

In [13]:
plot_point_cloud(noisy_point_cloud).show()

In [14]:
# 0 - connected components, 1 - loops, 2 - voids
homology_dimensions = [0, 1, 2]

persistence = VietorisRipsPersistence(
    homology_dimensions=homology_dimensions
)

In [15]:
diagrams = persistence.fit_transform([basic_point_cloud, noisy_point_cloud])

In [16]:
VR.plot(diagrams, sample=0)

In [17]:
VR.plot(diagrams, sample=1)

In [18]:
p_W = 2
PD = PairwiseDistance(metric='wasserstein',
                      metric_params={'p': p_W, 'delta': 0.1},
                      order=None)

PD.fit_transform(diagrams)

array([[[0.        , 0.        , 0.        ],
        [1.29031027, 0.9218511 , 0.2346551 ]],

       [[1.289664  , 0.92093727, 0.23481503],
        [0.        , 0.        , 0.        ]]])

In [19]:
PD = PairwiseDistance(metric='bottleneck',
                      metric_params={'delta': 0.1},
                      order=None)

PD.fit_transform(diagrams)

array([[[0.        , 0.        , 0.        ],
        [0.07992657, 0.63924557, 0.14452577]],

       [[0.07800749, 0.63128385, 0.14573882],
        [0.        , 0.        , 0.        ]]])

In [20]:
BC = BettiCurve()
betti_curves = BC.fit_transform(diagrams)

In [21]:
BC.plot(betti_curves, sample=0)

In [22]:
BC.plot(betti_curves, sample=1)

In [23]:
PD = PairwiseDistance(metric='betti',
                      metric_params={'p': 1},
                      order=None)

PD.fit_transform(diagrams)

array([[[ 0.        ,  0.        ,  0.        ],
        [27.95031904, 11.09281855,  1.68033309]],

       [[27.95031904, 11.09281855,  1.68033309],
        [ 0.        ,  0.        ,  0.        ]]])

In [24]:
PL = PersistenceLandscape()
persistence_landscapes = PL.fit_transform(diagrams)

In [25]:
PL.plot(persistence_landscapes, sample=0)

In [26]:
PL.plot(persistence_landscapes, sample=1)

In [27]:
PD = PairwiseDistance(metric='landscape',
                      metric_params={'p': 1},
                      order=None)

PD.fit_transform(diagrams)

array([[[0.        , 0.        , 0.        ],
        [0.00949094, 0.29436653, 0.02973432]],

       [[0.00949094, 0.29436653, 0.02973432],
        [0.        , 0.        , 0.        ]]])

Машинное обучение

In [29]:
def make_point_clouds(n_samples_per_shape: int, n_points: int, noise: float):
    """Make point clouds for circles, spheres, and tori with random noise.
    """
    circle_point_clouds = [
        np.asarray(
            [
                [np.sin(t) + noise * (np.random.rand(1)[0] - 0.5), np.cos(t) + noise * (np.random.rand(1)[0] - 0.5), 0]
                for t in range((n_points ** 2))
            ]
        )
        for kk in range(n_samples_per_shape)
    ]
    # label circles with 0
    circle_labels = np.zeros(n_samples_per_shape)

    sphere_point_clouds = [
        np.asarray(
            [
                [
                    np.cos(s) * np.cos(t) + noise * (np.random.rand(1)[0] - 0.5),
                    np.cos(s) * np.sin(t) + noise * (np.random.rand(1)[0] - 0.5),
                    np.sin(s) + noise * (np.random.rand(1)[0] - 0.5),
                ]
                for t in range(n_points)
                for s in range(n_points)
            ]
        )
        for kk in range(n_samples_per_shape)
    ]
    # label spheres with 1
    sphere_labels = np.ones(n_samples_per_shape)

    torus_point_clouds = [
        np.asarray(
            [
                [
                    (2 + np.cos(s)) * np.cos(t) + noise * (np.random.rand(1)[0] - 0.5),
                    (2 + np.cos(s)) * np.sin(t) + noise * (np.random.rand(1)[0] - 0.5),
                    np.sin(s) + noise * (np.random.rand(1)[0] - 0.5),
                ]
                for t in range(n_points)
                for s in range(n_points)
            ]
        )
        for kk in range(n_samples_per_shape)
    ]
    # label tori with 2
    torus_labels = 2 * np.ones(n_samples_per_shape)

    point_clouds = np.concatenate((circle_point_clouds, sphere_point_clouds, torus_point_clouds))
    labels = np.concatenate((circle_labels, sphere_labels, torus_labels))

    return point_clouds, labels


In [30]:
point_clouds_basic, labels_basic = make_point_clouds(n_samples_per_shape=10, n_points=20, noise=0.5)
point_clouds_basic.shape, labels_basic.shape

((30, 400, 3), (30,))

In [31]:
plot_point_cloud(point_clouds_basic[0])

In [32]:
plot_point_cloud(point_clouds_basic[10])

In [33]:
plot_point_cloud(point_clouds_basic[-1])

In [34]:
# Track connected components, loops, and voids
homology_dimensions = [0, 1, 2]

# Collapse edges to speed up H2 persistence calculation!
persistence = VietorisRipsPersistence(
    metric="euclidean",
    homology_dimensions=homology_dimensions,
    collapse_edges=True,
)

diagrams_basic = persistence.fit_transform(point_clouds_basic)

In [36]:
# Circle
plot_diagram(diagrams_basic[0])

In [37]:
#spheres
plot_diagram(diagrams_basic[10])

In [38]:
#tori
plot_diagram(diagrams_basic[-1])

In [39]:
persistence_entropy = PersistenceEntropy()

# calculate topological feature matrix
X_basic = persistence_entropy.fit_transform(diagrams_basic)

In [45]:
X_basic.shape

(30, 3)

In [46]:
point_clouds_basic.shape


(30, 400, 3)

In [40]:
plot_point_cloud(X_basic)

In [41]:
rf = RandomForestClassifier(oob_score=True)
rf.fit(X_basic, labels_basic)

print(f"OOB score: {rf.oob_score_:.3f}")

OOB score: 1.000


In [42]:
steps = [
    ("persistence", VietorisRipsPersistence(metric="euclidean", homology_dimensions=homology_dimensions, n_jobs=6)),
    ("entropy", PersistenceEntropy()),
    ("model", RandomForestClassifier(oob_score=True)),
]

pipeline = Pipeline(steps)

pipeline.fit(point_clouds_basic, labels_basic)

Pipeline(steps=[('persistence',
                 VietorisRipsPersistence(homology_dimensions=[0, 1, 2])),
                ('entropy', PersistenceEntropy()),
                ('model', RandomForestClassifier(oob_score=True))])

pipeline["model"].oob_score_

1.0

Julia + Eirene

In [86]:
pip install julia

Collecting julia
  Downloading julia-0.6.1-py2.py3-none-any.whl (68 kB)
     ---------------------------------------- 68.7/68.7 kB 1.2 MB/s eta 0:00:00
Installing collected packages: julia
Successfully installed julia-0.6.1
Note: you may need to restart the kernel to use updated packages.


In [40]:
from julia.api import Julia
jl = Julia(compiled_modules=False)
from julia import Main
jl.using("Eirene")

In [41]:
x = Main.rand(3,50)
plot_point_cloud(x.T)

In [42]:
C = Main.eirene(x, model = "pc")

In [43]:
A0 = Main.barcode(C, dim=0)
A0 = np.append(np.array(A0), np.zeros((len(A0), 1)), axis= 1 )
A0 = np.delete(A0, -1, 0)

In [44]:
A1 = Main.barcode(C, dim=1)
A1 = np.append(np.array(A1), np.ones((len(A1), 1)), axis= 1 )

In [45]:
A = np.concatenate((A0, A1), axis=0)
plot_diagram(A)

In [46]:
Main.wasserstein_distance(dgm1, dgm2, q=q, p=p)

NameError: name 'dgm1' is not defined

In [47]:
Main.betticurve(C, dim=0)

array([[ 0.        , 50.        ],
       [ 0.05305266, 49.        ],
       [ 0.09086173, 48.        ],
       ...,
       [ 0.86298091,  1.        ],
       [ 0.86308996,  1.        ],
       [ 0.86403298,  1.        ]])

In [48]:
Main.betticurve(C, dim=1)

array([[0.        , 0.        ],
       [0.05305266, 0.        ],
       [0.09086173, 0.        ],
       ...,
       [0.86298091, 0.        ],
       [0.86308996, 0.        ],
       [0.86403298, 0.        ]])

In [49]:
BC = BettiCurve()
betti_curves = BC.fit_transform([A])
BC.plot(betti_curves, sample=0)

In [50]:
Main.classrep(C, dim=1)

array([[50, 17, 43, 17],
       [11, 43, 50, 11]], dtype=int64)

In [51]:
Main.classrep(C, dim=0)

array([[37, 44]], dtype=int64)

In [52]:
Main.classrep(C, dim=0, class=1)

SyntaxError: invalid syntax (635837562.py, line 1)