In [None]:

import sys
sys.path.append('../src')

import time
import numpy as np
from scipy.integrate import solve_ivp

import matplotlib.pyplot as plt
import pandas as pd

from nfoursid.kalman import Kalman
from nfoursid.kalman import Kalman
from nfoursid.nfoursid import NFourSID
from nfoursid.state_space import StateSpace

pd.set_option('display.max_columns', None)
np.random.seed(42)

SAMPLING_FREQUENCY = 10000  # Hz
TIME_STEP = 1 / SAMPLING_FREQUENCY # seconds

# Testing with the target dimensions for IBR identifications
INPUT_DIM = 4
OUTPUT_DIM = 2
INTERNAL_STATE_DIM = 18

# Noise parameters
NOISE_AMPLITUDE = .1  # add noise to the training- and test-set

# Plotting parameters
FIGSIZE = 8
figsize = (1.3 * FIGSIZE, FIGSIZE)

# Parameter for our timing tests
# NUM_TRAINING_DATAPOINTS_ARRAY = [100, 1000, 10000, 20000]
# NUM_BLOCK_ROWS_ARRAY = [15, 25, 35, 45, 55]

NUM_TRAINING_DATAPOINTS_ARRAY = [30000]
NUM_BLOCK_ROWS_ARRAY = [25, 45]

# # Generate a random stable state matrix A
# np.random.seed(0)  # For reproducibility
# A = np.random.randn(INTERNAL_STATE_DIM, INTERNAL_STATE_DIM)
# A = A - 3.5 * np.eye(INTERNAL_STATE_DIM)  # Ensure eigenvalues have negative real parts

# print("\nMatrix A:")
# print(pd.DataFrame(A))

A = np.array([
    [1,  .01,    0,   0,   0,   0,  .1,   0, -.01,   0,   0,   0,   0,   0,   0,   0,   0,   0],
    [0,    1,  .01,   0,   0,   0,   0,  .1,   0, -.01,   0,   0,   0,   0,   0,   0,   0,   0],
    [0,    0,    1, .02,   0,   0,   0,   0,  .1,   0,    0,   0,   0,   0,   0,   0,   0,   0],
    [0, -.01,    0,   1,   0,   0,   0,   0,   0,  .1,    0,   0,   0,   0,   0,   0,   0,   0],
    [0,    0,   .1,   0,   1, .01,   0,   0,   0,   0,   .1,   0,   0,   0,   0,   0,   0,   0],
    [0,    0,    0,   0,   0,   1,   0,   0,   0,   0,    0,  .1,   0,   0,   0,   0,   0,   0],
    [0,    0,    0,   0,   0,   0,   1, .01,   0,   0,    0,   0,  .1,   0,   0,   0,   0,   0],
    [0,    0,    0,   0, .01,   0,   0,   1,   0,   0,    0,   0,   0,  .1,   0,   0,   0,   0],
    [0,    0,    0, -.01,   0,   0,   0,   0,   1, .01,    0,   0,   0,   0,  .1,   0,   0,   0],
    [0,    0,    0,   0,   0,   0,   0,   0,   0,   1,    0,   0,   0,   0,   0,  .1,   0,   0],
    [0,  .01,    0,   0,   0,   0,   0,   0,   0,   0,    1, .01,   0,   0,   0,   0,  .1,   0],
    [0,    0,    0,   0,   0,   0, -.01,   0,   0,   0,    0,   1, .02,   0,   0,   0,   0,  .1],
    [0,    0,    0,   0,   0,   0,   0,   0,   0,   0,    0,   0,   1,  .1,   0,   0,   0,   0],
    [0,    0,    0,   0,   0,   0,   0,   0,   0,   0,    0,   0,   0,   1, .01,   0,   0,   0],
    [0,    0,    0, .01,   0,   0,   0,   0,   0, -.01,    0,   0,   0,   0,   1,   0,   0,   0],
    [0,    0,    0,   0,   0,   0,   0,   0,   0,   0,    0,   0,   0,   0,   0,   1,   0,   0],
    [0,    0,    0,   0,   0,   0,   0,   0,   0,   0,    0,   0,   0,   0,   0,   0,   1, .01],
    [0,    0,    0,   0,   0,   0, .01,   0,   0,   0,    0,   0,  .1,   0,   0,   0,   0,   1],
]) / 10.01

eigenvalues_system = np.linalg.eigvals(A)
fig,ax = plt.subplots(figsize=figsize)
ax.scatter(eigenvalues_system.real,eigenvalues_system.imag)

# Generate random B, C, D matrices with appropriate dimensions
B = np.random.randn(INTERNAL_STATE_DIM, INPUT_DIM)
C = np.random.randn(OUTPUT_DIM, INTERNAL_STATE_DIM)
D = np.random.randn(OUTPUT_DIM, INPUT_DIM)


In [None]:
import os
import tracemalloc
import linecache
from collections import Counter

def display_top(snapshot, key_type='lineno', limit=3):
    snapshot = snapshot.filter_traces((
        tracemalloc.Filter(False, "<frozen importlib._bootstrap>"),
        tracemalloc.Filter(False, "<unknown>"),
    ))
    top_stats = snapshot.statistics(key_type)

    print("Top %s lines" % limit)
    for index, stat in enumerate(top_stats[:limit], 1):
        frame = stat.traceback[0]
        # replace "/path/to/module/file.py" with "module/file.py"
        filename = os.sep.join(frame.filename.split(os.sep)[-2:])
        print("#%s: %s:%s: %.1f KiB"
              % (index, filename, frame.lineno, stat.size / 1024))
        line = linecache.getline(frame.filename, frame.lineno).strip()
        if line:
            print('    %s' % line)

    other = top_stats[limit:]
    if other:
        size = sum(stat.size for stat in other)
        print("%s other: %.1f KiB" % (len(other), size / 1024))
    total = sum(stat.size for stat in top_stats)
    print("Total allocated size: %.1f KiB" % (total / 1024))


timing_data = np.zeros((len(NUM_BLOCK_ROWS_ARRAY), len(NUM_TRAINING_DATAPOINTS_ARRAY)))

for i in range(len(NUM_BLOCK_ROWS_ARRAY)):
    num_block_rows = NUM_BLOCK_ROWS_ARRAY[i]

    for j in range(len(NUM_TRAINING_DATAPOINTS_ARRAY)):
        num_training_datapoints = NUM_TRAINING_DATAPOINTS_ARRAY[j]
        
        state_space = StateSpace(A, B, C, D)

        for _ in range(num_training_datapoints):
            input_state = np.random.standard_normal((INPUT_DIM, 1))
            noise = np.random.standard_normal((OUTPUT_DIM, 1)) * NOISE_AMPLITUDE

            state_space.step(input_state, noise)
        
        figsize = (1.3 * FIGSIZE, FIGSIZE)
        fig = plt.figure(figsize=figsize)
        state_space.plot_input_output(fig)  # the state-space model can plot its inputs and outputs
        # fig.tight_layout()

        start_time = time.time()

        tracemalloc.start()
        counts = Counter()

        nfoursid = NFourSID(
            state_space.to_dataframe(),  # the state-space model can summarize inputs and outputs as a dataframe
            output_columns=state_space.y_column_names,
            input_columns=state_space.u_column_names,
            num_block_rows=num_block_rows,
            performance_logging=False
        )
        
        nfoursid.subspace_identification()

        ORDER_OF_MODEL_TO_FIT = INTERNAL_STATE_DIM

        state_space_identified, covariance_matrix = nfoursid.system_identification(
            rank=ORDER_OF_MODEL_TO_FIT
        )

        end_time = time.time()
        timing_data[i, j] = end_time - start_time

        snapshot = tracemalloc.take_snapshot()
        display_top(snapshot)
        print("\n\n")

print('Top prefixes:', counts.most_common(3))

df = pd.DataFrame(timing_data, columns=NUM_TRAINING_DATAPOINTS_ARRAY, index=NUM_BLOCK_ROWS_ARRAY)

df.index.name = 'Number of block rows'
df.columns.name = 'Number of training datapoints'

print("\nTiming data:")
print(df)

In [None]:

import sys
sys.path.append('../src')

import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from nfoursid.nfoursid import NFourSID
from nfoursid.state_space import StateSpace

pd.set_option('display.max_columns', None)
np.random.seed(0)

# Testing with the target dimensions for IBR identifications
INPUT_DIM = 4
OUTPUT_DIM = 2
INTERNAL_STATE_DIM = 18

# Noise parameters
NOISE_AMPLITUDE = .1  # add noise to the training- and test-set

# Plotting parameters
FIGSIZE = 8

NUM_TRAINING_DATAPOINTS = [100, 500, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000]
NUM_BLOCK_ROWS = 25

A = np.array([
    [1,  .01,    0,   0,   0,   0,  .1,   0, -.01,   0,   0,   0,   0,   0,   0,   0,   0,   0],
    [0,    1,  .01,   0,   0,   0,   0,  .1,   0, -.01,   0,   0,   0,   0,   0,   0,   0,   0],
    [0,    0,    1, .02,   0,   0,   0,   0,  .1,   0,    0,   0,   0,   0,   0,   0,   0,   0],
    [0, -.01,    0,   1,   0,   0,   0,   0,   0,  .1,    0,   0,   0,   0,   0,   0,   0,   0],
    [0,    0,   .1,   0,   1, .01,   0,   0,   0,   0,   .1,   0,   0,   0,   0,   0,   0,   0],
    [0,    0,    0,   0,   0,   1,   0,   0,   0,   0,    0,  .1,   0,   0,   0,   0,   0,   0],
    [0,    0,    0,   0,   0,   0,   1, .01,   0,   0,    0,   0,  .1,   0,   0,   0,   0,   0],
    [0,    0,    0,   0, .01,   0,   0,   1,   0,   0,    0,   0,   0,  .1,   0,   0,   0,   0],
    [0,    0,    0, -.01,   0,   0,   0,   0,   1, .01,    0,   0,   0,   0,  .1,   0,   0,   0],
    [0,    0,    0,   0,   0,   0,   0,   0,   0,   1,    0,   0,   0,   0,   0,  .1,   0,   0],
    [0,  .01,    0,   0,   0,   0,   0,   0,   0,   0,    1, .01,   0,   0,   0,   0,  .1,   0],
    [0,    0,    0,   0,   0,   0, -.01,   0,   0,   0,    0,   1, .02,   0,   0,   0,   0,  .1],
    [0,    0,    0,   0,   0,   0,   0,   0,   0,   0,    0,   0,   1,  .1,   0,   0,   0,   0],
    [0,    0,    0,   0,   0,   0,   0,   0,   0,   0,    0,   0,   0,   1, .01,   0,   0,   0],
    [0,    0,    0, .01,   0,   0,   0,   0,   0, -.01,    0,   0,   0,   0,   1,   0,   0,   0],
    [0,    0,    0,   0,   0,   0,   0,   0,   0,   0,    0,   0,   0,   0,   0,   1,   0,   0],
    [0,    0,    0,   0,   0,   0,   0,   0,   0,   0,    0,   0,   0,   0,   0,   0,   1, .01],
    [0,    0,    0,   0,   0,   0, .01,   0,   0,   0,    0,   0,  .1,   0,   0,   0,   0,   1],
]) / 10.01


# Generate random B, C, D matrices with appropriate dimensions
B = np.random.randn(INTERNAL_STATE_DIM, INPUT_DIM)
C = np.random.randn(OUTPUT_DIM, INTERNAL_STATE_DIM)
D = np.random.randn(OUTPUT_DIM, INPUT_DIM)

state_space = StateSpace(A, B, C, D)


timing_tests = np.zeros((2, len(NUM_TRAINING_DATAPOINTS)))

for i, num_datapoints in enumerate(NUM_TRAINING_DATAPOINTS):

    start_time = time.time()

    for _ in range(num_datapoints):
        input_state = np.random.standard_normal((INPUT_DIM, 1))
        noise = np.random.standard_normal((OUTPUT_DIM, 1)) * NOISE_AMPLITUDE

        state_space.step(input_state, noise)

    nfoursid = NFourSID(
        state_space.to_dataframe(),  # the state-space model can summarize inputs and outputs as a dataframe
        output_columns=state_space.y_column_names,
        input_columns=state_space.u_column_names,
        num_block_rows=NUM_BLOCK_ROWS,
        performance_logging=False
    )

    nfoursid.subspace_identification()

    ORDER_OF_MODEL_TO_FIT = INTERNAL_STATE_DIM

    state_space_identified, covariance_matrix = nfoursid.system_identification(
        rank=ORDER_OF_MODEL_TO_FIT
    )

    print(f"finihed {num_datapoints} datapoints")

    stop_time = time.time()

    timing_tests[0, i] = num_datapoints
    timing_tests[1, i] = stop_time - start_time

print(timing_tests)

plt.figure(figsize=(FIGSIZE, FIGSIZE))

fig, ax = plt.subplots()

ax.plot(timing_tests[0], timing_tests[1])

ax.grid

ax.set_xlabel('Number of training datapoints')
ax.set_ylabel('Time [s]')
ax.set_title('Time to identify a model with N4SID')
