In [None]:
import json
import h5py
import os
import shutil
import pickle
import numpy as np
import matplotlib.pyplot as plt

In [None]:
filename='frequency_step_frequency_'
path='results'

In [None]:
# Store ML data
ml_data = {}

# Loop over all files in directory
for i in np.arange(10.0, 24.1, 0.2):
    # Open new file
    file = f"{path}/{filename}{i:.03f}.h5"

    # -------------------------- READ CONTINUATION FILES
    # NOTE: COL -> Number of Periodic Solutions, ROW -> Number of Solution Points
    data = h5py.File(str(file), "r")
    pose = data["/Config_Time/POSE"][:].squeeze()
    vel = data["/Config_Time/VELOCITY"][:].squeeze()
    acc = data["/Config_Time/ACCELERATION"][:].squeeze()
    time = data["/Config_Time/Time"][:].T
    F = data["/Force_Amp"][:]
    T = data["/T"][:]
    # Close file
    data.close()

    # ------------------------------- COLLECT DATA
    ml_data[f"{i:.03f}"] = {
        "pose": pose,
        "vel": vel,
        "acc": acc,
        "time": time,
        "F": F,
        "T": T
    }

# SAVE DATA
with open(f"{path}/data.pkl", "wb") as f:
    pickle.dump(ml_data, f)

In [None]:
pose.shape, vel.shape, acc.shape, time.shape, F.shape, T.shape

In [None]:
SPLIT=0.2
seed=42 # for reproducibility

In [None]:
with open(f"{path}/data.pkl", "rb") as f:
    ml_data = pickle.load(f)
    
    # Collect data shapes
    n_curves = len(ml_data.keys())
    x_train, dx_train, ddx_train, t_train, F_train, T_train = [], [], [], [], [], []
    x_test, dx_test, ddx_test, t_test, F_test, T_test = [], [], [], [], [], []
    
    # Create datasets
    for k, v in ml_data.items():
        assert v["pose"].shape == v["vel"].shape == v["acc"].shape == v["time"].shape, f"Data shape mismatch for key {k}"
        assert v["F"].shape == v["T"].shape, f"Data shape mismatch for key {k}"

        n_time_steps_per_point = v["time"].shape[0]
        n_points_per_curve = v["pose"].shape[1]

        # Split data into training and testing sets
        rng = np.random.default_rng(seed=seed)
        shuffle = rng.permutation(n_points_per_curve)
        seed += 1 # Increment seed for next shuffle
        
        train_indices = shuffle[:int(n_points_per_curve * (1 - SPLIT))]
        test_indices = shuffle[int(n_points_per_curve * (1 - SPLIT)):]
        
        _x_train = v["pose"][:, train_indices].flatten()
        _dx_train = v["vel"][:, train_indices].flatten()
        _ddx_train = v["acc"][:, train_indices].flatten()
        _t_train = v["time"][:, train_indices].flatten()
        _F_train = v["F"][train_indices]
        _T_train = v["T"][train_indices]
        _x_test = v["pose"][:, test_indices].flatten()
        _dx_test = v["vel"][:, test_indices].flatten()
        _ddx_test = v["acc"][:, test_indices].flatten()
        _t_test = v["time"][:, test_indices].flatten()
        _F_test = v["F"][test_indices]
        _T_test = v["T"][test_indices]
        
        # Append to training data
        reps_train = _x_train.shape[0] // _F_train.shape[0]
        x_train.append(_x_train)
        dx_train.append(_dx_train)
        ddx_train.append(_ddx_train)
        t_train.append(_t_train)
        F_train.append(np.tile(_F_train, reps=(1, reps_train)).flatten())
        T_train.append(np.tile(_T_train, reps=(1, reps_train)).flatten())

        # Append to testing data
        reps_test = _x_test.shape[0] // _F_test.shape[0]
        x_test.append(_x_test)
        dx_test.append(_dx_test)
        ddx_test.append(_ddx_test)
        t_test.append(_t_test)
        F_test.append(np.tile(_F_test, reps=(1, reps_test)).flatten())
        T_test.append(np.tile(_T_test, reps=(1, reps_test)).flatten())

    # Convert lists to numpy arrays
    x_train = np.concatenate(x_train)
    dx_train = np.concatenate(dx_train)
    ddx_train = np.concatenate(ddx_train)
    t_train = np.concatenate(t_train)
    F_train = np.concatenate(F_train)
    T_train = np.concatenate(T_train)
    x_test = np.concatenate(x_test)
    dx_test = np.concatenate(dx_test)
    ddx_test = np.concatenate(ddx_test)
    t_test = np.concatenate(t_test)
    F_test = np.concatenate(F_test)
    T_test = np.concatenate(T_test)
    
    # Collect data
    train_data = np.concatenate(
        (x_train[:, np.newaxis], dx_train[:, np.newaxis], ddx_train[:, np.newaxis], t_train[:, np.newaxis], F_train[:, np.newaxis], T_train[:, np.newaxis]), axis=1
    )
    test_data = np.concatenate(
        (x_test[:, np.newaxis], dx_test[:, np.newaxis], ddx_test[:, np.newaxis], t_test[:, np.newaxis], F_test[:, np.newaxis], T_test[:, np.newaxis]), axis=1
    )
    
    info = {
        "train_data_shape": train_data.shape,
        "test_data_shape": test_data.shape,
        "n_curves": n_curves,
        "Shapes" : "[Samples, Features: x, dx, ddx, t, F, T]",
        "qmax": x_train.max(),
        "qmin": x_train.min(),
        "dxmax": dx_train.max(),
        "dxmin": dx_train.min(),
        "ddxmax": ddx_train.max(),
        "ddxmin": ddx_train.min(),
    }

In [None]:
x_train.shape, dx_train.shape, ddx_train.shape, t_train.shape, F_train.shape, T_train.shape

In [None]:
train_data.shape, test_data.shape