In [93]:
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
from scipy.fft import fft, fftfreq
import pickle

from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
from sklearn.preprocessing import StandardScaler

plt.style.use('dark_background')

In [3]:
# https://www.youtube.com/watch?v=spUNpyF58BY&t=35s

In [4]:
file_path = r"C:\Users\peterdb1\Documents\Masters in ACM\(i-j) 625.801-802 - ACM Master's Research\Technical Work\koopman-category-discovery\data\3-dimensional-systems\dataset_7_class_500_samples.pkl"
with open(file_path, "rb") as f:
    dataset = pickle.load(f)

In [43]:
# Step 2: FFT per dimension
def extract_fourier_features(signal, T, normalize=False, plot=False):

    if normalize:
        signal = (signal - signal.mean(axis=0)) / signal.std(axis=0)
        
    N = len(signal)
    yf = fft(signal)
    xf = fftfreq(N, T)[:N//2]
    
    amplitudes = 2.0/N * np.abs(yf[:N//2])
    dominant_freq = xf[np.argmax(amplitudes)]
    centroid = np.sum(xf * amplitudes) / np.sum(amplitudes)
    bandwidth = np.sqrt(np.sum(((xf - centroid)**2) * amplitudes) / np.sum(amplitudes))
    energy = np.sum(amplitudes**2)
    
    features_dict = {
        "dominant_freq": dominant_freq,
        "centroid": centroid,
        "bandwidth": bandwidth,
        "energy": energy,
        "frequencies": xf,
        "amplitudes": amplitudes
    }

    features = np.array([dominant_freq, centroid, bandwidth, energy])

    return features_dict, features
        
    if plot:
        
        # Make Plot of Signal
        plt.plot(t, y)
        plt.title("Time Series Signal")
        plt.xlabel("Time [s]")
        plt.ylabel("Amplitude")
        plt.grid(True)
        plt.show()
    
        # Make Plots of Frequency Spectrum
        plt.plot(xf, amplitudes)
        plt.title("FFT - Frequency Spectrum")
        plt.xlabel("Frequency [Hz]")
        plt.ylabel("Amplitude")
        plt.grid(True)
        plt.show()


In [46]:
normalize = False
plot = False

system = 'rossler'
index = 1
signal = dataset[system][index]

t = signal['t']
x, y, z = signal['y']

T = np.mean(np.diff(t))

extract_fourier_features(x,T,normalize,plot)

_, features_x = extract_fourier_features(x, T, normalize, plot)
_, features_y = extract_fourier_features(y, T, normalize, plot)
_, features_z = extract_fourier_features(z, T, normalize, plot)

# Step 3: FFT on vector magnitude
magnitude = np.sqrt(x**2 + y**2 + z**2)
_, features_mag = extract_fourier_features(magnitude, T, normalize, plot)

In [72]:
normalize = False
plot = False

num_indices = 500

x_feats = []
y_feats = []
z_feats = []
mag_feats = []

for system in dataset.keys():
    for index in range(num_indices):
        
        signal = dataset[system][index]
        
        t = signal['t']
        x, y, z = signal['y']
        
        T = np.mean(np.diff(t))
        
        extract_fourier_features(x,T,normalize,plot)
        
        _, features_x = extract_fourier_features(x, T, normalize, plot)
        _, features_y = extract_fourier_features(y, T, normalize, plot)
        _, features_z = extract_fourier_features(z, T, normalize, plot)
        
        # Step 3: FFT on vector magnitude
        magnitude = np.sqrt(x**2 + y**2 + z**2)
        _, features_mag = extract_fourier_features(magnitude, T, normalize, plot)

        x_feats.extend(features_x)
        y_feats.extend(features_y)
        z_feats.extend(features_z)
        mag_feats.extend(features_mag)


x_feats = np.array(x_feats).reshape(-1,4)
y_feats = np.array(y_feats).reshape(-1,4)
z_feats = np.array(z_feats).reshape(-1,4)
mag_feats = np.array(mag_feats).reshape(-1,4)

features = np.hstack([x_feats,y_feats,z_feats,mag_feats])

In [75]:
col_names = ['dominant_freq', 'centroid', 'bandwidth', 'energy']
columns = [col + '_x' for col in col_names] + [col + '_y' for col in col_names] + [col + '_z' for col in col_names] + [col + '_mag' for col in col_names]

df = pd.DataFrame(data=features,columns=columns)
df['target'] = np.arange(len(df)) // num_indices

X = df.drop(columns='target')
y = df['target']

In [89]:
X_train, X_test, y_train, y_test = train_test_split(X,y,test_size=0.3)


scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

model = SVC()

# model = LogisticRegression()
model.fit(X_train_scaled,y_train)
print(model.score(X_test_scaled,y_test))

confusion_matrix(y_test,model.predict(X_test_scaled))

0.8428571428571429


array([[140,   0,   3,   7,   0,   0,   0],
       [  1, 167,   2,   0,   0,   0,   0],
       [  5,   4,  83,   3,   0,  60,   0],
       [ 28,   2,   0, 112,   1,   0,   0],
       [  0,   0,   0,   0, 149,   0,   0],
       [  3,   0,  45,   0,   0, 105,   0],
       [  0,   0,   0,   0,   1,   0, 129]])