In [294]:
from abc import ABC

import numpy as np
import pandas as pd
%matplotlib inline
import matplotlib.pyplot as plt

from gtda.time_series import TakensEmbedding
from gtda.homology import VietorisRipsPersistence
from gtda.diagrams import Scaler, Filtering, PersistenceEntropy
from gtda.plotting import plot_diagram

from xgboost import XGBClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import RandomForestClassifier, StackingClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.linear_model import LogisticRegression

from sklearn.model_selection import GridSearchCV

from sklearn.metrics import accuracy_score

from src.utils import get_data_from_directory, get_files_directory_list

import torch
import torch.nn as nn
import torch.nn.functional as F

## 1. Common routines for Topological Features extraction

In [51]:
class PersistenceDiagramsExtractor:
    def __init__(self, takens_embedding_dim, takens_embedding_delay, homology_dimensions, 
                 filtering=False, filtering_dimensions=(1, 2)):
        self.takens_embedding_dim_ = takens_embedding_dim
        self.takens_embedding_delay_ = takens_embedding_delay
        self.homology_dimensions_ = homology_dimensions
        self.filtering_ = filtering
        self.filtering_dimensions_ = filtering_dimensions
        
    def takens_embeddings_(self, X):
        X_transformed = list()
        for series in X:
            te = TakensEmbedding(parameters_type='search', 
                                 dimension=self.takens_embedding_dim_, 
                                 time_delay=self.takens_embedding_delay_)
            X_transformed.append(te.fit_transform(series))
        return X_transformed
    
    def persistence_diagrams_(self, X_embdeddings):
        X_transformed = list()
        for embedding in X_embdeddings:
            vr = VietorisRipsPersistence(metric='euclidean', 
                                         max_edge_length=100, 
                                         homology_dimensions=self.homology_dimensions_)
            diagram_scaler = Scaler()
            persistence_diagrams = diagram_scaler.fit_transform(vr.fit_transform([embedding]))
            if self.filtering_:
                diagram_filter = Filtering(epsilon=0.1, homology_dimensions=self.filtering_dimensions_)
                persistence_diagrams = diagram_filter.fit_transform(persistence_diagrams)
            X_transformed.append(persistence_diagrams[0])
        return X_transformed
        
    def fit_transform(self, X):
        X_embeddings = self.takens_embeddings_(X)
        X_persistence_diagrams = self.persistence_diagrams_(X_embeddings)
        return X_persistence_diagrams
        

In [334]:
"""
Abstract class for creating Topology Features extractors.
"""
class PersistenceDiagramFeatureExtractor(ABC):
    def extract_feature_(self, persistence_diagram):
        pass
    
    def fit_transform(self, X_pd):
        return np.array([self.extract_feature_(diagram) for diagram in X_pd])

In [153]:
class TopologicalFeaturesExtractor:
    def __init__(self, persistence_diagram_extractor, persistence_diagram_features):
        self.persistence_diagram_extractor_ = persistence_diagram_extractor
        self.persistence_diagram_features_ = persistence_diagram_features
        
    def fit_transform(self, X):
        X_transformed = None
        
        X_pd = self.persistence_diagram_extractor_.fit_transform(X)
        
        for pd_feature in self.persistence_diagram_features_:
            X_features = pd_feature.fit_transform(X_pd)
            X_transformed = np.vstack((X_transformed.T, X_features.T)).T if X_transformed is not None else X_features
            
        return X_transformed

## 2. Common routines for Topological Features extraction

### 2.1 Features from Pereira2015

### 2.1.1 Number of holes for each dimension

In [122]:
class HolesNumberFeature(PersistenceDiagramFeatureExtractor):
    def __init__(self):
        super(HolesPerDimFeature).__init__()
        
    def extract_feature_(self, persistence_diagram):
        # Persistence diagrams has dimensionality (N, 3).
        # The second coordinate has following notation: (birth, death, homology dimension).
        # See https://github.com/giotto-ai/giotto-tda/blob/master/gtda/plotting/persistence_diagrams.py
        feature = np.zeros(int(np.max(persistence_diagram[:, 2])) + 1)
        for hole in persistence_diagram:
            if hole[1] - hole[0] > 0:
                # Some holes still can have zero-lifetime
                # See https://giotto-ai.github.io/gtda-docs/latest/modules/generated/diagrams/preprocessing/gtda.diagrams.Filtering.html#gtda.diagrams.Filtering.fit_transform
                feature[int(hole[2])] += 1.0
        return feature

### 2.1.2 Maximum hole lifetime in each dimension

In [113]:
class MaxHoleLifeTimeFeature(PersistenceDiagramFeatureExtractor):
    def __init__(self):
        super(MaxHoleLifeTimeFeature).__init__()
        
    def extract_feature_(self, persistence_diagram):
        feature = np.zeros(int(np.max(persistence_diagram[:, 2])) + 1)
        for hole in persistence_diagram:
            lifetime = hole[1] - hole[0]
            if lifetime > feature[int(hole[2])]:
                feature[int(hole[2])] = lifetime
        return feature

### 2.1.3 Fraction of holes that are at a similar distance as the farthest point from the diagonal (maximum lifetime) with some ration

In [114]:
class RelevantHolesNumber(PersistenceDiagramFeatureExtractor):
    def __init__(self, ratio=1.0):
        super(RelevantHolesNumber).__init__()
        self.ratio_ = ratio
        
    def extract_feature_(self, persistence_diagram):
        feature = np.zeros(int(np.max(persistence_diagram[:, 2])) + 1)
        max_lifetimes = np.zeros(int(np.max(persistence_diagram[:, 2])) + 1)
        
        for hole in persistence_diagram:
            lifetime = hole[1] - hole[0]
            if lifetime > max_lifetimes[int(hole[2])]:
                max_lifetimes[int(hole[2])] = lifetime
                
        for hole in persistence_diagram:
            index = int(hole[2])
            lifetime = hole[1] - hole[0]
            if np.equal(lifetime, self.ratio_ * max_lifetimes[index]):
                feature[index] += 1.0
                
        return feature

### 2.1.4 Average lifetime in each dimension

In [119]:
class AverageHoleLifetimeFeature(PersistenceDiagramFeatureExtractor):
    def __init__(self):
        super(AverageHoleLifetimeFeature).__init__()
        
    def extract_feature_(self, persistence_diagram):
        feature = np.zeros(int(np.max(persistence_diagram[:, 2])) + 1)
        n_holes = np.zeros(int(np.max(persistence_diagram[:, 2])) + 1)
        
        for hole in persistence_diagram:
            lifetime = hole[1] - hole[0]
            index = int(hole[2])
            if lifetime > 0:
                feature[index] += lifetime
                n_holes[index] += 1
        
        for i in range(feature.shape[0]):
            feature[i] = feature[i] / n_holes[i] if n_holes[i] != 0 else 0.0
            
        return feature

### 2.1.5 Sum of all holes lifetimes in each dimension

In [123]:
class SumHoleLifetimeFeature(PersistenceDiagramFeatureExtractor):
    def __init__(self):
        super(SumHoleLifetimeFeature).__init__()
        
    def extract_feature_(self, persistence_diagram):
        feature = np.zeros(int(np.max(persistence_diagram[:, 2])) + 1)
        for hole in persistence_diagram:
            feature[int(hole[2])] += hole[1] - hole[0]
        return feature

## 2.2 Custom Topological Features

In [176]:
"""
Wrap over Giotto's feature
"""
class PersistenceEntropyFeature(PersistenceDiagramFeatureExtractor):
    def __init__(self):
        super(PersistenceEntropyFeature).__init__()
        
    def extract_feature_(self, persistence_diagram):
        persistence_entropy = PersistenceEntropy(n_jobs=-1)
        return persistence_entropy.fit_transform([persistence_diagram])[0]

In [361]:
class SimultaneousAliveHolesFeatue(PersistenceDiagramFeatureExtractor):
    def __init__(self):
        super(SimultaneousAliveHolesFeatue).__init__()
        
    def get_average_intersection_number_(self, segments):
        intersections = list()
        n_segments = segments.shape[0]
        i = 0

        for i in range(0, n_segments):
            count = 1
            start = segments[i, 0]
            end = segments[i, 1]

            for j in range(i + 1, n_segments):
                if segments[j, 0] >= start and segments[j, 0] <= end:
                    count += 1
                else:
                    break
            intersections.append(count)

        return np.sum(intersections) / len(intersections)

    def get_average_simultaneous_holes_(self, holes):
        starts = holes[:, 0]
        ends = holes[:, 1]
        ind = np.lexsort((starts, ends))
        segments = np.array([[starts[i], ends[i]] for i in ind])
        return self.get_average_intersection_number_(segments)

    def extract_feature_(self, persistence_diagram):
        n_dims = int(np.max(persistence_diagram[:, 2])) + 1
        feature = np.zeros(n_dims)

        for dim in range(n_dims):
            holes = list()
            for hole in persistence_diagram:
                if hole[1] - hole[0] != 0.0 and int(hole[2]) == dim:
                    holes.append(hole)
            if len(holes) != 0:      
                feature[dim] = self.get_average_simultaneous_holes_(np.array(holes))

        return feature

## 3. Example with Time Series

In [4]:
# !wget -nc "http://www.timeseriesclassification.com/Downloads/Archives/Univariate2018_arff.zip"
#!unzip -q -n "Univariate2018_arff.zip"

In [132]:
directory_list = get_files_directory_list()
directory_list = sorted(directory_list)

random_index = 77
random_path = directory_list[random_index]

X_train, X_test, y_train, y_test = get_data_from_directory(random_path)
X_train = X_train.squeeze()
y_train = y_train.squeeze()
X_test = X_test.squeeze()
y_test = y_test.squeeze()

print('Dataset: ', random_path)
print('X_train shape: ', X_train.shape)
print('y_train shape: ', y_train.shape)
print('X_test shape:  ', X_test.shape)
print('y_test shape:  ', y_test.shape)

Dataset:  MoteStrain
X_train shape:  (20, 84)
y_train shape:  (20,)
X_test shape:   (1252, 84)
y_test shape:   (1252,)


In [369]:
feature_extractor = TopologicalFeaturesExtractor(
    persistence_diagram_extractor=PersistenceDiagramsExtractor(takens_embedding_dim=3, 
                                                               takens_embedding_delay=10,
                                                               homology_dimensions=(0, 1, 2)),
    persistence_diagram_features=[HolesNumberFeature(),
                                  MaxHoleLifeTimeFeature(),
                                  RelevantHolesNumber(),
                                  AverageHoleLifetimeFeature(),
                                  SumHoleLifetimeFeature(),
                                  PersistenceEntropyFeature(),
                                  SimultaneousAliveHolesFeatue()])
X_train_transformed = feature_extractor.fit_transform(X_train)
X_test_transformed = feature_extractor.fit_transform(X_test)

In [370]:
parameters = {"C": [10**i for i in range(-2, 5)],
              "kernel": ["linear", "rbf", "sigmoid", "poly"]}
svc_cv = GridSearchCV(SVC(random_state=42), 
                      param_grid=parameters,
                      cv=5,
                      scoring='accuracy', 
                      n_jobs=-1)
svc_cv.fit(X_train_transformed, y_train)

print("Train accuracy: ", accuracy_score(y_train, svc_cv.best_estimator_.predict(X_train_transformed)))
print("Test accuracy: ", accuracy_score(y_test, svc_cv.best_estimator_.predict(X_test_transformed)))

Train accuracy:  1.0
Test accuracy:  0.4968051118210863


In [371]:
parameters = {"max_depth": [2, 10, 15, 20, 25, 30, 35, 40, 45, 50, 70, 100, 120, 150],
              "n_estimators": [20, 50, 100, 150, 200, 250]}
svc_cv = GridSearchCV(XGBClassifier(n_jobs=-1, random_state=42), 
                      param_grid=parameters,
                      cv=2,
                      scoring='accuracy', 
                      n_jobs=-1)
svc_cv.fit(X_train_transformed, y_train)

print("Train accuracy: ", accuracy_score(y_train, svc_cv.best_estimator_.predict(X_train_transformed)))
print("Test accuracy: ", accuracy_score(y_test, svc_cv.best_estimator_.predict(X_test_transformed)))

Train accuracy:  0.8
Test accuracy:  0.6110223642172524
