In [1]:
import os
import scipy.io as sio
import h5py

import numpy

In [2]:
def import_data(mat_files):
    # Dictionary to store the loaded data
    datasets = {}

    for f in mat_files:
        name = os.path.splitext(os.path.basename(f))[0]  # filename without extension
        try:
            data = sio.loadmat(f)
            datasets[name] = data
            print(f"Loaded {f} using scipy.io")
        except NotImplementedError:
            try:
                with h5py.File(f, 'r') as hf:
                    datasets[name] = {k: hf[k][:] for k in hf.keys()}
                print(f"Loaded {f} using h5py")
            except Exception as e:
                print(f"Failed to read {f}: {e}")
        except Exception as e:
            print(f"Failed to read {f}: {e}")

    return datasets

In [3]:
def combine_data(datasets):
    """
    Combine all datasets into single X (sensors) and y (positions) arrays.
    Expects each dataset to have keys like:
      'mag_sensors', 'tip_position' (or mag_sensors2, tip_position2, etc.)
    """
    X_list, y_list = [], []

    for name, data in datasets.items():
        # find the sensor and position keys automatically
        sensor_key = [k for k in data.keys() if 'mag_sensors' in k][0]
        position_key = [k for k in data.keys() if 'tip_position' in k][0]

        X_list.append(numpy.array(data[sensor_key]))
        y_list.append(numpy.array(data[position_key]))

        print(f"✅ {name}: {sensor_key} {X_list[-1].shape}, {position_key} {y_list[-1].shape}")

    # stack all vertically
    X = numpy.vstack(X_list)
    y = numpy.vstack(y_list)

    print(f"\n📦 Combined shapes -> X: {X.shape}, y: {y.shape}")
    return X, y

In [4]:
def split_data(X, y, train_pct=0.7, val_pct=0.15, test_pct=0.15, random_state=42):
    """
    Split dataset (X, y) into train, validation, and test sets.
    Works for multi-output regression (e.g., 3D positions).
    """
    assert abs(train_pct + val_pct + test_pct - 1.0) < 1e-6, "Percents must sum to 1"

    numpy.random.seed(random_state)
    N = X.shape[0]
    indices = numpy.random.permutation(N)

    train_end = int(train_pct * N)
    val_end = train_end + int(val_pct * N)

    train_idx = indices[:train_end]
    val_idx = indices[train_end:val_end]
    test_idx = indices[val_end:]

    X_train, y_train = X[train_idx], y[train_idx]
    X_val, y_val = X[val_idx], y[val_idx]
    X_test, y_test = X[test_idx], y[test_idx]

    print(f"📊 Split -> Train: {X_train.shape}, Val: {X_val.shape}, Test: {X_test.shape}")
    return X_train, y_train, X_val, y_val, X_test, y_test


In [None]:
from sklearn.preprocessing import StandardScaler

def standardize(X_train, X_val, X_test, y_train, y_val, y_test):
    # Standardize inputs (X)
    x_scaler = StandardScaler()
    X_train = x_scaler.fit_transform(X_train)
    X_val = x_scaler.transform(X_val)
    X_test = x_scaler.transform(X_test)

    # Standardize outputs (y)
    y_scaler = StandardScaler()
    y_train = y_scaler.fit_transform(y_train)
    y_val = y_scaler.transform(y_val)
    y_test = y_scaler.transform(y_test)

    return X_train, X_val, X_test, y_train, y_val, y_test, x_scaler, y_scaler

In [None]:
def compute_avg_delta_dataset(mag_sensors):
    """
    Replace each pair of magnetic sensors with their average and delta,
    keeping the same overall dataset shape and order.

    Parameters
    ----------
    mag_sensors : numpy.ndarray
        Shape (N, 12), dtype numeric. 
        Sensors are assumed to be paired as (1,2), (3,4), ..., (11,12).

    Returns
    -------
    new_dataset : numpy.ndarray
        Shape (N, 12), where columns are ordered as:
        [avg_1, delta_1, avg_2, delta_2, ..., avg_6, delta_6]
    """
    if mag_sensors.shape[1] != 12:
        raise ValueError("Expected 12 sensor columns (got {})".format(mag_sensors.shape[1]))

    N = mag_sensors.shape[0]
    new_dataset = numpy.zeros_like(mag_sensors, dtype=float)

    for i in range(0, 12, 2):
        avg = 0.5 * (mag_sensors[:, i] + mag_sensors[:, i + 1])
        delta = mag_sensors[:, i] - mag_sensors[:, i + 1]
        pair_idx = i // 2
        new_dataset[:, 2 * pair_idx] = avg
        new_dataset[:, 2 * pair_idx + 1] = delta

    return new_dataset

In [None]:
from sklearn.feature_selection import mutual_info_regression
from sklearn.preprocessing import StandardScaler
import numpy as np

def select_features(X, y, top_k=None, threshold=None, scale=True):
    """
    Select the most relevant input features for regression tasks using mutual information.

    Args:
        X (ndarray): Input features, shape (n_samples, n_features)
        y (ndarray): Targets, shape (n_samples,) or (n_samples, n_outputs)
        top_k (int): Keep the top_k highest-score features (optional)
        threshold (float): Keep features with score >= threshold (optional)
        scale (bool): Whether to standardize inputs before scoring

    Returns:
        X_selected (ndarray): Reduced feature matrix
        selected_indices (list[int]): Indices of selected features
        scores (ndarray): Importance scores for all features
    """

    # Handle multi-output target by averaging scores
    y_ = y if y.ndim == 1 else np.mean(y, axis=1)

    # Optional standardization (important for continuous variables)
    if scale:
        X = StandardScaler().fit_transform(X)

    # Mutual information measures nonlinear dependency
    scores = mutual_info_regression(X, y_)
    scores = np.nan_to_num(scores)

    # Rank features
    ranked_indices = np.argsort(scores)[::-1]

    # Select top_k or threshold
    if top_k is not None:
        selected_indices = ranked_indices[:top_k]
    elif threshold is not None:
        selected_indices = np.where(scores >= threshold)[0]
    else:
        # Default: keep all features sorted by importance
        selected_indices = ranked_indices

    X_selected = X[:, selected_indices]
    return X_selected, selected_indices, scores