In [1]:
import pandas as pd
import numpy as np

from gtda.homology import CubicalPersistence

from gtda.diagrams import Amplitude
from gtda.diagrams import PersistenceEntropy
from gtda.diagrams import PersistenceLandscape
from gtda.diagrams import NumberOfPoints

from gtda.images import Binarizer
from gtda.images import RadialFiltration
from gtda.images import HeightFiltration

from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split

from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score

from datetime import datetime

In [2]:
X1, y1 = fetch_openml("mnist_784", version=1, return_X_y=True)
print(f"X shape: {X1.shape}, y shape: {y1.shape}")

X shape: (70000, 784), y shape: (70000,)


In [3]:
X = pd.DataFrame.to_numpy(X1)

# Reshape to (n_samples, n_pixels_x, n_pixels_y)
X = X.reshape((-1, 28, 28))  # -1 means that first dimension as an unknown and we want numpy to figure it out
print(X.shape, X.ndim)

y = pd.Series.to_numpy(y1)


(70000, 28, 28) 3


In [4]:
# Select train and test data sets
train_size, test_size = 10000, 300
X_train, X_test, y_train, y_test = train_test_split(
    X, y, train_size=train_size, test_size=test_size, stratify=y, random_state=666)

print(f"X_train shape: {X_train.shape}, y_train shape: {y_train.shape}")
print(f"X_test shape: {X_test.shape}, y_test shape: {y_test.shape}")


X_train shape: (10000, 28, 28), y_train shape: (10000,)
X_test shape: (300, 28, 28), y_test shape: (300,)


In [5]:
# create array with features using persistence diagram:
# Entropy, Amplitude, Number of points, multiple radial and height filtrations

def process_data(im_data, center_lst, direction_lst):

    im_features = np.array([])
    
    binarizer = Binarizer(threshold=0.4)
    for filtr_center in center_lst:
        radial_filtration = RadialFiltration(center=filtr_center)
        cubical_persistence = CubicalPersistence(n_jobs=-1)
        im_binarized = binarizer.fit_transform(im_data)
        im_filtration = radial_filtration.fit_transform(im_binarized)
        im_cubical = cubical_persistence.fit_transform(im_filtration)
        im_pe = PersistenceEntropy().fit_transform(im_cubical)
        im_num_of_points = NumberOfPoints().fit_transform(im_cubical)
        im_ampl = Amplitude(metric='landscape').fit_transform(im_cubical)
        
        im_features = np.append(im_features , im_pe)
        im_features = np.append(im_features , im_num_of_points)
        im_features = np.append(im_features , im_ampl)

    for filtr_dir in direction_lst:
        height_filtration = HeightFiltration(direction=filtr_dir)
        cubical_persistence = CubicalPersistence(n_jobs=-1)
        im_binarized = binarizer.fit_transform(im_data)
        im_filtration = height_filtration.fit_transform(im_binarized)
        im_cubical = cubical_persistence.fit_transform(im_filtration)
        im_pe = PersistenceEntropy().fit_transform(im_cubical)
        im_num_of_points = NumberOfPoints().fit_transform(im_cubical)
        im_ampl = Amplitude(metric='landscape').fit_transform(im_cubical)
        
        im_features = np.append(im_features , im_pe)
        im_features = np.append(im_features , im_num_of_points)
        im_features = np.append(im_features , im_ampl)
    
    return im_features


In [6]:
direction_list = [np.array([1, 0]), np.array([1, 1]), np.array([0, 1]),
                  np.array([-1, 1]), np.array([-1, 0]), np.array([-1, -1]), 
                  np.array([0, -1]), np.array([1, -1])]

center_list = [np.array([6, 20]), np.array([20, 6]), np.array([13, 6]),
               np.array([6, 13]), np.array([13, 13]), np.array([20, 13]),
               np.array([13, 20]), np.array([6, 6]),  np.array([20, 20])]

num_of_objects_train = X_train.shape[0]
num_of_objects_test = X_test.shape[0]

num_of_features = 6 * len(center_list) + 6 * len(direction_list)
X_features_train = np.zeros((num_of_objects_train, num_of_features))
X_features_test = np.zeros((num_of_objects_test, num_of_features))

print(X_features_train.shape, X_features_test.shape)
print(y_train.shape, y_test.shape)

(10000, 102) (300, 102)
(10000,) (300,)


In [7]:
# create array with features for train data set

print('start ', datetime.now())

for i in range(num_of_objects_train):
    im = X_train[i][None, :, :]
    features = process_data(im, center_list, direction_list)
        
    X_features_train[i,:] = features
    
print('end   ', datetime.now())


start  2024-10-14 14:38:36.410812
end    2024-10-14 15:33:03.972859


In [8]:
# create array with features for test data set

print('start ', datetime.now())

for i in range(num_of_objects_test):
    im = X_test[i][None, :, :]
    features = process_data(im, center_list, direction_list)
        
    X_features_test[i,:] = features
    
print('end   ', datetime.now())


start  2024-10-14 15:39:44.001303
end    2024-10-14 15:40:54.007663


In [9]:
rf = RandomForestClassifier()
rf.fit(X_features_train, y_train)


RandomForestClassifier()

In [10]:
y_pred = rf.predict(X_features_test)
rf.score(X_features_test, y_test)


0.96

In [11]:
accuracy = accuracy_score(y_test, y_pred)
print("Accuracy:", accuracy)

Accuracy: 0.96


In [12]:
j=0
print(len(y_pred), len(y_test))
for i in range(len(y_pred)):
   if y_pred[i] == y_test[i]:
    j +=1
   else:
    print(i, y_pred[i], y_test[i])

print(j, j*100/len(y_pred))


300 300
52 5 9
58 2 1
107 7 6
115 8 2
157 6 8
195 9 8
211 4 0
232 3 0
264 6 0
277 9 8
287 3 5
294 4 6
288 96.0
