# Data

> Necessary scripts to read orbits from different formats

In [1]:
#| default_exp data

In [2]:
#| hide
#| export
import h5py
from scipy.io import loadmat
import numpy as np
import os
import pandas as pd
from typing import Optional, Any
import torch
import torch.nn.functional as F

In [3]:
#| hide
#| export
from unittest.mock import patch, MagicMock
from fastcore.test import test_eq

## Loading Data

In [4]:
#| export
def load_orbit_data(file_path: str,  # The path to the .mat, .h5, or .npy file.
                    variable_name: Optional[str] = None,  # Name of the variable in the .mat file, optional.
                    dataset_path: Optional[str] = None  # Path to the dataset in the .h5 file, optional.
                   ) -> Any:  # The loaded orbit data.
    """
    Load orbit data from MATLAB .mat files, HDF5 .h5 files, or NumPy .npy files.
    """
    if file_path.endswith('.mat'):
        if variable_name is None:
            raise ValueError("variable_name must be provided for .mat files")
        mat = loadmat(file_path)
        if variable_name in mat:
            data = mat[variable_name]
        else:
            raise ValueError(f"{variable_name} not found in {file_path}")

    elif file_path.endswith('.h5'):
        with h5py.File(file_path, 'r') as file:
            if dataset_path is None:
                raise ValueError("dataset_path must be provided for .h5 files")
            if dataset_path in file:
                data = np.array(file[dataset_path])
            else:
                raise ValueError(f"{dataset_path} not found in {file_path}")

    elif file_path.endswith('.npy'):
        data = np.load(file_path)

    else:
        raise ValueError("Unsupported file format. Please provide a .mat, .h5, or .npy file.")
    
    return data

In [5]:
#| test load_orbit_data
#| hide
mock_mat_data = {'Xarray': np.array([1, 2, 3])}
mock_h5_data = np.array([4, 5, 6])
mock_npy_data = np.array([7, 8, 9])

# Test for load_orbit_data with .mat file
with patch('__main__.loadmat', return_value=mock_mat_data) as mock_loadmat:
    result = load_orbit_data('test_data.mat', variable_name='Xarray')
    assert (result == mock_mat_data['Xarray']).all(), "MAT file loading failed or data mismatch"
    mock_loadmat.assert_called_once_with('test_data.mat')

# Test for load_orbit_data with .h5 file
with patch('__main__.h5py.File') as mock_h5py:
    mock_file = MagicMock()
    mock_file.__enter__.return_value = {'/files/PERIODIC ORBITS': mock_h5_data}
    mock_h5py.return_value = mock_file
    result = load_orbit_data('test_data.h5', dataset_path='/files/PERIODIC ORBITS')
    assert (result == mock_h5_data).all(), "H5 file loading failed or data mismatch"

# Test for load_orbit_data with .npy file
with patch('numpy.load', return_value=mock_npy_data) as mock_load:
    result = load_orbit_data('test_data.npy')
    assert (result == mock_npy_data).all(), "NPY file loading failed or data mismatch"
    mock_load.assert_called_once_with('test_data.npy')

In [6]:
#| export
def load_memmap_array(file_path: str,  # The path to the .npy file as a string.
                      mode: str = 'c'  # Mode for memory-mapping ('r', 'r+', 'w+', 'c').
                     ) -> np.memmap:   # Returns a memory-mapped array.
    """
    Load a .npy file as a memory-mapped array using numpy.memmap.
    
    Args:
    file_path: A string representing the path to the .npy file.
    mode: The mode in which the file is to be opened. Valid options are:
          - 'r'  : Read-only, no data can be modified.
          - 'r+' : Read/write, modifications to the data are written to the file.
          - 'w+' : Read/write, file is created if it does not exist, overwritten if it does.
          - 'c'  : Copy-on-write, data can be modified in memory but changes are not saved to the file.

    Returns:
    A numpy.memmap object that behaves like a numpy array but with data stored on disk instead of in memory.
    """
    
    # Check if the file exists at the specified path
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"No file found at the specified path: {file_path}")
    
    # Load the .npy file as a memmap object with the specified mode
    return np.load(file_path, mmap_mode=mode)

In [7]:
#| export
def get_orbit_features(file_path: str,  # The path to the file (can be .mat, .h5, or .npy).
                       variable_name: Optional[str] = None,  # Name of the variable in the .mat file, optional.
                       dataset_path: Optional[str] = None  # Path to the dataset in the .h5 file, optional.
                      ) -> pd.DataFrame:  # DataFrame with detailed orbit features.
    """
    Load orbit feature data from a specified file and convert it to a DataFrame.
    """
    # Load data using the previously defined function that supports .mat, .h5, and .npy files
    orbit_data = load_orbit_data(file_path, variable_name=variable_name, dataset_path=dataset_path)
    
    # Define column labels for the DataFrame
    column_labels = [
        'Orbit Family', 'Initial Position X', 'Initial Position Y', 'Initial Position Z',
        'Initial Velocity X', 'Initial Velocity Y', 'Initial Velocity Z',
        'Jacobi Constant', 'Period', 'Stability Index'
    ]
    
    # Create a DataFrame from the loaded data
    features = pd.DataFrame(orbit_data, columns=column_labels)

    return features

In [8]:
#| test get_orbit_features
#| hide
def test_get_orbit_features():
    # Sample data simulating what might be returned by load_orbit_data
    mock_data = np.array([
        [1, 0, 0, 0, 1, 0, 0, 3.0, 2.0, 1.0],
        [2, 1, 1, 1, 0, 1, 0, 2.5, 1.5, 0.5]
    ])
    
    # Expected DataFrame structure
    expected_columns = [
        'Orbit Family', 'Initial Position X', 'Initial Position Y', 'Initial Position Z',
        'Initial Velocity X', 'Initial Velocity Y', 'Initial Velocity Z',
        'Jacobi Constant', 'Period', 'Stability Index'
    ]
    expected_df = pd.DataFrame(mock_data, columns=expected_columns)
    
    # Patch the load_orbit_data function to return mock_data
    with patch('__main__.load_orbit_data', return_value=mock_data) as mock_load_orbit_data:
        # Test for .mat file
        result_df = get_orbit_features('dummy_path.mat', variable_name='dummy_var')
        test_eq(result_df.equals(expected_df), True)
        
        # Ensure the mock was called correctly
        mock_load_orbit_data.assert_called_once_with('dummy_path.mat', variable_name='dummy_var', dataset_path=None)

        # Test for .h5 file with dataset_path
        mock_load_orbit_data.reset_mock()
        result_df = get_orbit_features('dummy_path.h5', dataset_path='dummy_dataset')
        test_eq(result_df.equals(expected_df), True)
        
        # Ensure the mock was called correctly
        mock_load_orbit_data.assert_called_once_with('dummy_path.h5', variable_name=None, dataset_path='dummy_dataset')

        # Test for .npy file
        mock_load_orbit_data.reset_mock()
        result_df = get_orbit_features('dummy_path.npy')
        test_eq(result_df.equals(expected_df), True)
        
        # Ensure the mock was called correctly
        mock_load_orbit_data.assert_called_once_with('dummy_path.npy', variable_name=None, dataset_path=None)

# Call the test function to execute tests
test_get_orbit_features()

## Save Data

In [9]:
#| export
def save_data(data: np.ndarray,  # The numpy array data to save.
              file_name: str  # The name of the file to save the data in, including the extension.
             ) -> None:
    """
    Save a numpy array to a file based on the file extension specified in `file_name`.
    Supports saving to HDF5 (.hdf5) or NumPy (.npy) file formats.
    """
    # Extract file extension from file name
    _, file_extension = os.path.splitext(file_name)
    
    if file_extension == '.hdf5':
        # Open a new HDF5 file
        with h5py.File(file_name, 'w') as f:
            # Create a dataset in the file
            f.create_dataset('data', data=data, compression='gzip', compression_opts=9)
    elif file_extension == '.npy':
        # Save the array to a NumPy .npy file
        np.save(file_name, data)
    else:
        # Raise an error for unsupported file types
        raise ValueError("Unsupported file extension. Supported extensions are '.hdf5' or '.npy'.")

In [10]:
#| test save_data
#| hide
# Test for NPY saving functionality
# Test for NPY saving functionality
def test_save_data_npy():
    data = np.random.rand(5, 5)
    file_name = 'test_data.npy'
    
    with patch('numpy.save', autospec=True) as mock_save:
        save_data(data, file_name)
        mock_save.assert_called_once_with(file_name, data)

# Test for HDF5 saving functionality
def test_save_data_hdf5():
    data = np.random.rand(5, 5)
    file_name = 'test_data.hdf5'
    
    with patch('h5py.File', autospec=True) as mock_file:
        save_data(data, file_name)
        mock_file.assert_called_once_with(file_name, 'w')

# Test for handling invalid file type
def test_save_data_invalid_type():
    data = np.random.rand(5, 5)
    file_name = 'test_data.unknown'
    
    try:
        save_data(data, file_name)
        assert False, "ValueError expected but not raised"
    except ValueError as e:
        assert str(e) == "Unsupported file extension. Supported extensions are '.hdf5' or '.npy'.", "Incorrect error message"

test_save_data_invalid_type()


## Get Example Data

In [11]:
#| export
def get_example_orbit_data():
    """
    Load orbit data from a hardcoded MAT file located in the `data` directory.
    
    The function is specifically designed to load the 'Xarray' variable 
    from the '1_L2_S_200_EM_CR3BP.mat' file. This setup is intended for 
    demonstration or testing purposes, where the data file and the variable 
    of interest are known ahead of time.

    :return: A numpy.ndarray containing the transposed data from the MAT file.
    """
    # Hardcoded file name and variable name
    filename = "example_orbits_1_L2_S_200_EM_CR3BP.mat"
    variable_name = 'Xarray'
    
    # Assuming the notebook or script is executed in a directory at the same level as the `data` folder
    matlab_file_path = '..' + "/data/example_data/" + filename
    
    # Assuming `load_orbit_data` is a predefined function that loads and returns data from the .mat file
    data = load_orbit_data(str(matlab_file_path), variable_name=variable_name)
    # Transpose the data for further use
    data = np.transpose(data, (2, 1, 0))
    
    return data

In [12]:
# | test
data = get_example_orbit_data()
data.shape

(200, 6, 300)

## Random Sampler

In [13]:
#| export
def sample_orbits(orbit_data: np.ndarray,  # Orbit data array
                  sample_spec: dict or int, # Number of samples per class (dict) or total number of samples (int)
                  labels: np.ndarray = None # Optional: Array of labels corresponding to each orbit
                 ) -> (np.ndarray, np.ndarray):
    """
    Randomly sample orbits from the provided dataset.
    
    Parameters:
        orbit_data (np.ndarray): Array of orbit data with shape (num_orbits, 6, num_time_points).
        sample_spec (dict or int): If int, it is the total number of orbits to sample.
                                   If dict, it specifies the number of samples for each class.
        labels (np.ndarray, optional): Array of labels for each orbit.
    
    Returns:
        tuple: A tuple containing the sampled orbit data and corresponding labels (if provided).
    """
    if labels is not None and isinstance(sample_spec, dict):
        # Sampling specified number of orbits for each class
        indices = []
        for label, count in sample_spec.items():
            class_indices = np.where(labels == label)[0]
            if len(class_indices) < count:
                raise ValueError(f"Not enough samples for class {label}. Requested {count}, available {len(class_indices)}.")
            selected_indices = np.random.choice(class_indices, size=count, replace=False)
            indices.extend(selected_indices)
        indices = np.array(indices)
    else:
        # Random sampling without considering classes
        indices = np.random.choice(orbit_data.shape[0], size=sample_spec, replace=False)
    
    # Select the sampled data and labels
    sampled_data = orbit_data[indices]
    sampled_labels = labels[indices] if labels is not None else None
    
    return sampled_data, sampled_labels

In [14]:
#| hide
#| test sample_orbits_random_sampling_without_labels
def test_sample_orbits_random_sampling_without_labels():
    # Setup a mock dataset
    orbit_data = np.random.rand(100, 6, 10)  # 100 orbits, 6 values, 10 time points
    
    # Perform random sampling without labels
    sampled_data, sampled_labels = sample_orbits(orbit_data, 10)
    
    # Test outcomes
    assert sampled_data.shape == (10, 6, 10), "Shape of sampled data should match the requested sample size"
    assert sampled_labels is None, "Labels should be None when not provided"

#| test sample_orbits_class_specific_sampling
def test_sample_orbits_class_specific_sampling():
    # Setup a mock dataset and labels
    orbit_data = np.random.rand(100, 6, 10)
    labels = np.random.randint(0, 3, size=100)  # 100 labels in 3 classes
    sample_spec = {0: 5, 1: 5}
    
    # Perform class-specific sampling
    sampled_data, sampled_labels = sample_orbits(orbit_data, sample_spec, labels)
    
    # Test outcomes
    assert sampled_data.shape == (10, 6, 10), "Shape of sampled data should match the total requested sample size"
    assert len(sampled_labels) == 10, "Number of labels should match the total requested sample size"
    assert all(label in sample_spec for label in sampled_labels), "All labels should be from requested classes"

#| test sample_orbits_insufficient_class_samples
def test_sample_orbits_insufficient_class_samples():
    # Setup a mock dataset and labels
    orbit_data = np.random.rand(100, 6, 10)
    labels = np.random.randint(0, 1, size=100)  # 100 labels in 1 class only
    sample_spec = {0: 50, 1: 50}  # Requesting 50 samples each from class 0 and 1
    
    # Perform class-specific sampling with expectation of failure
    try:
        sample_orbits(orbit_data, sample_spec, labels)
        assert False, "Expected ValueError due to insufficient samples for class 1"
    except ValueError as e:
        assert str(e) == "Not enough samples for class 1. Requested 50, available 0.", "Error message should indicate insufficient samples for class 1"

# Execute tests to verify the behavior
test_sample_orbits_random_sampling_without_labels()
test_sample_orbits_class_specific_sampling()
test_sample_orbits_insufficient_class_samples()

## Scaler

In [15]:
#| export
EPS = 1e-18  # A small epsilon to prevent division by zero

In [16]:
#| export
class TSFeatureWiseScaler():
    """
    Scales time series data feature-wise using PyTorch tensors.

    Parameters:
    -----------
    feature_range : tuple(float, float), optional
        Tuple representing the minimum and maximum feature values (default is (0, 1)).

    Attributes:
    -----------
    _min_v : float
        Minimum feature value.
    _max_v : float
        Maximum feature value.
    """
    def __init__(self, feature_range: tuple = (0, 1)) -> None:
        assert len(feature_range) == 2
        self._min_v, self._max_v = feature_range

    # X: N x T x D
    def fit(self, X: torch.Tensor) -> "TSFeatureWiseScaler":
        """
        Fits the scaler to the data.
        
        :param X: Input data. Shape: (N, T, D)
        :type X: torch.Tensor
        
        :returns: The fitted scaler object.
        :rtype: TSFeatureWiseScaler
        """
        D = X.shape[2]
        self.mins = torch.zeros(D, device=X.device)
        self.maxs = torch.zeros(D, device=X.device)

        for i in range(D):
            self.mins[i] = torch.min(X[:, :, i])
            self.maxs[i] = torch.max(X[:, :, i])

        return self

    def transform(self, X: torch.Tensor) -> torch.Tensor:
        """
        Transforms the data.
        
        :param X: Input data. Shape: (N, T, D)
        :type X: torch.Tensor
        
        :returns: Scaled data.
        :rtype: torch.Tensor
        """
        return ((X - self.mins) / (self.maxs - self.mins + EPS)) * (self._max_v - self._min_v) + self._min_v

    def inverse_transform(self, X: torch.Tensor) -> torch.Tensor:
        """
        Inverse-transforms the data.
        
        :param X: Scaled data. Shape: (N, T, D)
        :type X: torch.Tensor
        
        :returns: Original data.
        :rtype: torch.Tensor
        """
        X = X - self._min_v
        X = X / (self._max_v - self._min_v)
        X = X * (self.maxs - self.mins + EPS)
        X = X + self.mins
        return X

    def fit_transform(self, X: torch.Tensor) -> torch.Tensor:
        """
        Fits the scaler to the data and transforms it.
        
        :param X: Input data. Shape: (N, T, D)
        :type X: torch.Tensor
        
        :returns: Scaled data.
        :rtype: torch.Tensor
        """
        self.fit(X)
        return self.transform(X)

In [17]:
#| export
class TSGlobalScaler():
    """
    Scales time series data globally using PyTorch tensors.

    Attributes:
    -----------
    min : float
        Minimum value encountered in the data.
    max : float
        Maximum value encountered in the data.
    """
    def fit(self, X: torch.Tensor) -> "TSGlobalScaler":
        """
        Fits the scaler to the data.
        
        :param X: Input data.
        :type X: torch.Tensor
        
        :returns: The fitted scaler object.
        :rtype: TSGlobalScaler
        """
        self.min = torch.min(X)
        self.max = torch.max(X)
        return self

    def transform(self, X: torch.Tensor) -> torch.Tensor:
        """
        Transforms the data.
        
        :param X: Input data.
        :type X: torch.Tensor
        
        :returns: Scaled X.
        :rtype: torch.Tensor
        """
        return (X - self.min) / (self.max - self.min + EPS)

    def inverse_transform(self, X: torch.Tensor) -> torch.Tensor:
        """
        Inverse-transforms the data.
        
        :param X: Scaled data.
        :type X: torch.Tensor
        
        :returns: Original data.
        :rtype: torch.Tensor
        """
        X = X * (self.max - self.min + EPS)
        X = X + self.min
        return X

    def fit_transform(self, X: torch.Tensor) -> torch.Tensor:
        """
        Fits the scaler to the data and transforms it.
        
        :param X: Input data.
        :type X: torch.Tensor
        
        :returns: Scaled input data X.
        :rtype: torch.Tensor
        """
        self.fit(X)
        return self.transform(X)

In [18]:
#| hide
import nbdev; nbdev.nbdev_export()