In [1]:
import os
import sys
import random
sys.path.append('../')

import numpy as np

from sklearn.linear_model import RidgeClassifier
from sklearn.neighbors import KNeighborsClassifier
from rocket_rag.transform import *
from rocket_rag.node_indexing import *

* 'allow_population_by_field_name' has been renamed to 'populate_by_name'


In [2]:
cfg = json.load(open("../config/configs.json"))

INSTANCES_DIR = '../data/instances/'
INFERENCE_DIR = '../data/inference/'
STATES = ['normal', 
            'backlash1', 'backlash2',
            'lackLubrication1', 'lackLubrication2',
            'spalling1', 'spalling2', 'spalling3', 'spalling4', 'spalling5', 'spalling6', 'spalling7', 'spalling8']
LOADS= ['20kg', '40kg', '-40kg']
DOC_DIR = '../docs'

In [3]:
ts_transform = TimeSeriesTransform(cfg)
ts_node_indexer = TimeSeriesNodeIndexer()

In [4]:
load = '20kg'
curs = ts_node_indexer.load_nodes(f'../store/ts_indexing/current_nodes_{load}.pkl')
poss = ts_node_indexer.load_nodes(f'../store/ts_indexing/position_error_nodes_{load}.pkl')

[32m2024-09-27 21:11:22.649[0m | [34m[1mDEBUG   [0m | [36mrocket_rag.node_indexing[0m:[36mload_nodes[0m:[36m59[0m - [34m[1mLoading all nodes...[0m
[32m2024-09-27 21:11:24.707[0m | [34m[1mDEBUG   [0m | [36mrocket_rag.node_indexing[0m:[36mload_nodes[0m:[36m63[0m - [34m[1mAll nodes are loaded.[0m
[32m2024-09-27 21:11:24.709[0m | [34m[1mDEBUG   [0m | [36mrocket_rag.node_indexing[0m:[36mload_nodes[0m:[36m59[0m - [34m[1mLoading all nodes...[0m
[32m2024-09-27 21:11:26.647[0m | [34m[1mDEBUG   [0m | [36mrocket_rag.node_indexing[0m:[36mload_nodes[0m:[36m63[0m - [34m[1mAll nodes are loaded.[0m


In [5]:
filenames = [os.listdir(os.path.join(INFERENCE_DIR, load, state)) for state in STATES]
filenames = [filename for sublist in filenames for filename in sublist]

curs_rocket = [c.rocket for c in curs]
poss_rocket = [p.rocket for p in poss]
curs_labels = [c.id_ for c in curs]
poss_labels = [p.id_ for p in poss]

cur_knn = KNeighborsClassifier(n_neighbors=1)
pos_knn = KNeighborsClassifier(n_neighbors=1)
cur_ridge = RidgeClassifier()
pos_ridge = RidgeClassifier()

cur_knn.fit(curs_rocket, curs_labels)
pos_knn.fit(poss_rocket, poss_labels)
cur_ridge.fit(curs_rocket, curs_labels)
pos_ridge.fit(poss_rocket, poss_labels)

In [6]:
def complex_to_mag_phase(X):
    magnitude = np.abs(X)
    phase = np.angle(X)
    return np.column_stack((magnitude, phase))

curs_fft_magn = np.array([complex_to_mag_phase(c.fft[0]).reshape(1, -1) for c in curs])
poss_fft_magn = np.array([complex_to_mag_phase(p.fft[0]).reshape(1, -1) for p in poss])

curs_fft_magn = curs_fft_magn.squeeze()
poss_fft_magn = poss_fft_magn.squeeze()

cur_fft_knn = KNeighborsClassifier(n_neighbors=1)
pos_fft_knn = KNeighborsClassifier(n_neighbors=1)
cur_fft_ridge = RidgeClassifier()
pos_fft_ridge = RidgeClassifier()

cur_fft_knn.fit(curs_fft_magn, curs_labels)
pos_fft_knn.fit(poss_fft_magn, poss_labels)
cur_fft_ridge.fit(curs_fft_magn, curs_labels)
pos_fft_ridge.fit(poss_fft_magn, poss_labels)

### ROCKET and FFT Features Prediction with Ridge Classifier for a random sample

In [7]:
rand_filename = random.choice(filenames)
print(rand_filename)

load_num = load[:2]
state = re.match(fr'(.*)_{load_num}', rand_filename).group(1)
rand_ts_df = pd.read_csv(os.path.join(INFERENCE_DIR, load, state, rand_filename))

rand_cur = ts_transform.smoothing(ts_df=rand_ts_df, field='current')
rand_pos = ts_transform.smoothing(ts_df=rand_ts_df, field='position_error')

spalling3_20_2_4.csv


In [8]:
rand_cur_rocket = ts_transform.get_rocket(rand_cur)
rand_pos_rocket = ts_transform.get_rocket(rand_pos)

rand_cur_fft = np.array(complex_to_mag_phase(ts_transform.get_fft(rand_cur)[0]).reshape(1, -1))
rand_pos_fft = np.array(complex_to_mag_phase(ts_transform.get_fft(rand_pos)[0]).reshape(1, -1))

In [9]:
# print(f'KNN for current signal: {cur_knn.predict([rand_cur_rocket])}')
# print(f'KNN for position error: {pos_knn.predict([rand_pos_rocket])}')
print(f'Ridge for current signal with Rocket features: {cur_ridge.predict([rand_cur_rocket])}')
print(f'Ridge for position error with Rocket features: {pos_ridge.predict([rand_pos_rocket])}')
# print(f'KNN for current signal: {cur_fft_knn.predict(rand_cur_fft)}')
# print(f'KNN for position error: {pos_fft_knn.predict(rand_pos_fft)}')
print(f'Ridge for current signal with FFT features: {cur_fft_ridge.predict(rand_cur_fft)}')
print(f'Ridge for position error with FFT features: {pos_fft_ridge.predict(rand_pos_fft)}')


Ridge for current signal with Rocket features: ['spalling2']
Ridge for position error with Rocket features: ['spalling4']
Ridge for current signal with FFT features: ['spalling2']
Ridge for position error with FFT features: ['spalling4']


### ROCKET and FFT Features Prediction with Ridge Classifier for all samples

In [10]:
valid_curs_rocket, valid_poss_rocket = [], []
valid_curs_fft, valid_poss_fft = [], []
states = []
for filename in tqdm(filenames):
    load_num = load[:2]
    state = re.match(fr'(.*)_{load_num}', filename).group(1)
    states.append(state)
    ts_df = pd.read_csv(os.path.join(INFERENCE_DIR, load, state, filename))

    cur = ts_transform.smoothing(ts_df=ts_df, field='current')
    pos = ts_transform.smoothing(ts_df=ts_df, field='position_error')

    cur_rocket = ts_transform.get_rocket(cur)
    pos_rocket = ts_transform.get_rocket(pos)

    cur_fft = np.array(complex_to_mag_phase(ts_transform.get_fft(cur)[0]).reshape(1, -1))
    pos_fft = np.array(complex_to_mag_phase(ts_transform.get_fft(pos)[0]).reshape(1, -1))

    valid_curs_rocket.append(cur_rocket)
    valid_poss_rocket.append(pos_rocket)
    valid_curs_fft.append(cur_fft)
    valid_poss_fft.append(pos_fft)

  0%|          | 0/195 [00:00<?, ?it/s]

In [11]:
pred_cur_rocket = cur_ridge.predict(valid_curs_rocket)
pred_pos_rocket = pos_ridge.predict(valid_poss_rocket)
pred_cur_fft = np.array([cur_fft_ridge.predict(vcf) for vcf in valid_curs_fft]).T
pred_pos_fft = np.array([pos_fft_ridge.predict(vpf) for vpf in valid_poss_fft]).T

In [12]:
valid_labels = np.array([re.match(fr'(.*)_{load_num}', filename).group(1) for filename in filenames])

In [13]:
print(f'Accuracy for current signal with Rocket features: {sum(valid_labels == pred_cur_rocket) / len(valid_labels) * 100}')
print(f'Accuracy for position error with Rocket features: {sum(valid_labels == pred_pos_rocket) / len(valid_labels) * 100}')
print(f'Accuracy for current signal with FFT features: {sum(valid_labels == pred_cur_fft[0]) / len(valid_labels) * 100}')
print(f'Accuracy for position error with FFT features: {sum(valid_labels == pred_pos_fft[0]) / len(valid_labels) * 100}')

Accuracy for current signal with Rocket features: 72.3076923076923
Accuracy for position error with Rocket features: 43.58974358974359
Accuracy for current signal with FFT features: 63.589743589743584
Accuracy for position error with FFT features: 30.256410256410255
