In [1]:
########################################################################################################################
# This script simulates data for longitudinal-prediction modeling.
########################################################################################################################

In [4]:
########################################################################################################################
# Import packages
########################################################################################################################
import os
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from tslearn.generators import random_walk_blobs
from typing import Literal, Optional

In [5]:
########################################################################################################################
# Define the core function to simulate the data
########################################################################################################################
def simulate_longitudinal(n_samples: int = 1000,
                          n_timestamps: int = 3,
                          n_features: int = 50,
                          n_informative: int = 15,
                          n_redundant: int = 5,
                          n_binary_features: int = 5,
                          prevalence: float = 0.2,
                          missing_rate: float = 0.2,
                          test_rate: float = 0.3,
                          impute: Literal['Zero', 'Mean', 'Median'] = 'Zero',
                          random_state: Optional[int] = 42):
    """
    :param n_samples: Number of samples
    :param n_timestamps: Number of timestamps
    :param n_features: Number of features
    :param n_informative: Number of informative features
    :param n_redundant: Number of redundant features
    :param n_binary_features: Number of binary_features
    :param prevalence: Prevalence rate in (0, 1)
    :param missing_rate: Missing rate in (0, 1)
    :param test_rate: Proportion of the held-out test set
    :param impute: A string in ['Zero', 'Mean', 'Median'] representing the imputation method
    :param random_state: Random state
    :return:
    (a) X_train_: np.ndarray. Feature dataset in the training partition
    with shape (n_samples * (1 - test_rate), n_features * 2)
    (b) X_test_: np.ndarray. Feature dataset in the training partition
    with shape (n_samples * test_rate, n_features * 2)
    (c) y_train_: np.ndarray. Target dataset in the training partition
    with shape (n_samples * (1 - test_rate), )
    (d) y_test_: np.ndarray. Target dataset in the training partition
    with shape (n_samples * test_rate, )
    (e) feat_names_: list of strings representing the names of the features.
    Names containing 'C' represent continuous variables, and 'B' for binary variables.
    Names containing '!NA' represent the binary missingness indicator variables.
    """

    # Validation of inputs
    assert n_timestamps >= 1
    assert 0 < prevalence < 1
    assert 0 <= missing_rate < 1
    assert n_informative + n_redundant <= n_features
    assert n_binary_features <= n_features
    assert 0 < test_rate < 1
    assert impute in ['Zero', 'Mean', 'Median']

    # Setting random state
    rng = np.random.default_rng(random_state)

    # Determine class counts from prevalence
    n_pos: int = int(np.round(n_samples * prevalence))
    n_pos = max(1, min(n_samples - 1, n_pos))
    n_neg = n_samples - n_pos

    # Simulate the dataset for the negative class
    X0, y0 = random_walk_blobs(n_ts_per_blob=n_neg,
                               sz=n_timestamps,
                               d=n_informative,
                               n_blobs=1,
                               random_state=random_state)
    y0[:] = 0

    # Simulate the dataset for the positive class
    X1, y1 = random_walk_blobs(n_ts_per_blob=n_pos,
                               sz=n_timestamps,
                               d=n_informative,
                               n_blobs=1,
                               random_state=None if random_state is None else random_state+1)
    y1[:] = 1

    # Concatenate the datasets
    X_concat = np.concatenate([X0, X1], axis=0).astype(np.float32)
    y_ = np.concatenate([y0, y1], axis=0).astype(np.int64)

    # Define the informative part of X_concat
    X_ = np.empty((n_samples, n_timestamps, n_features), dtype=np.float32)
    X_[:, :, :n_informative] = X_concat

    # Create redundant features
    if n_redundant > 0:
        src_idx = rng.integers(0, n_informative, size=n_redundant)
        scales = rng.normal(1.0, 0.2, size=n_redundant)
        noise = rng.normal(0, 0.01, size=(n_samples, n_timestamps, n_redundant))
        X_[:, :, n_informative:n_informative+n_redundant] = X_[:, :, src_idx] * scales[None, None, :] + noise

    # Create uninformative features
    n_uninformative = n_features - n_informative - n_redundant
    if n_uninformative > 0:
        X_[:, :, n_informative + n_redundant:] = (rng.normal(0.0, 1.0,
                                                  size=(n_samples, n_timestamps, n_uninformative))
                                                  .astype(np.float32))

    # Creating feature names
    feat_names_ = [f'X_{i+1}C' for i in range(n_features)]

    # Converting some continuous features to binary features
    if n_binary_features > 0:
        binary_idx = rng.choice(n_features, size=n_binary_features, replace=False)
        for j in binary_idx:
            feat_names_[j] = f'X_{j+1}B'
            thr = np.nanmedian(X_[:, :, j])
            X_[:, :, j] = (X_[:, :, j] > thr).astype(np.float32)

    # Simulating missingness
    if missing_rate > 0:
        mask = rng.random(X_.shape) < missing_rate
        X_[mask] = np.nan
    else:
        mask = np.isnan(X_)

    # Creating missingness indicator
    M = mask.astype(np.float32)

    # Imputing the data
    if impute == 'Zero':
        fill_values = np.zeros(n_features, dtype=np.float32)
    elif impute == 'Mean':
        fill_values = np.nanmean(X_, axis=(0, 1)).astype(np.float32)
    else:
        fill_values = np.nanmedian(X_, axis=(0, 1)).astype(np.float32)
    fill_values = np.where(np.isnan(fill_values), 0.0, fill_values).astype(np.float32)
    nan_idxs = np.where(np.isnan(X_))
    X_[nan_idxs] = fill_values[nan_idxs[2]]

    # Creating extra variable names for the binary missingness indicators
    feat_names_ += [f'{c}!NA' for c in feat_names_]

    # Concatenating the main data with the binary missingness indicators
    X_ = np.concatenate([X_, M], axis=2).astype(np.float32)

    # Stratified partitioning
    X_train_, X_test_, y_train_, y_test_ = train_test_split(X_, y_, test_size=test_rate,
                                                            random_state=random_state, stratify=y_,
                                                            shuffle=True)
    return X_train_, X_test_, y_train_, y_test_, feat_names_

In [6]:
########################################################################################################################
# USER_SPECIFIC SETTING
# N_SAMPLES: Number of samples
# N_TIMESTAMPS: Number of timestmaps
# N_FEATURES: Number of features
# N_INFORMATIVE: Number of informative features
# N_REDUNDANT: Number of redundant features
# N_BINARY_FEATURES: Number of binary_features
# PREVALENCE: Prevalence rate in (0, 1)
# MISSING_RATE: Missing rate in (0, 1)
# TEST_RATE: Proportion of the held-out test set
# IMPUTE: A string in ['Zero', 'Mean', 'Median'] representing the imputation method
# RANDOM_STATE: Random state
# OUT_DIR_PATH: Path of the output directory storing the organized datasets for modeling
########################################################################################################################
N_SAMPLES: int = 1000
N_TIMESTAMPS: int = 3
N_FEATURES: int = 50
N_INFORMATIVE: int = 15
N_REDUNDANT: int = 5
N_BINARY_FEATURES: int = 5
PREVALENCE: float = 0.2
MISSING_RATE: float = 0.2
TEST_RATE: float = 0.3
IMPUTE: Literal['Zero', 'Mean', 'Median'] = 'Zero'
RANDOM_STATE: Optional[int] = 42
OUT_DIR_PATH: str = f'Longitudinal_Model_Data/{N_TIMESTAMPS}_encounters_60_days/'

In [None]:
########################################################################################################################
# Simulate the data
########################################################################################################################
X_train, X_test, y_train, y_test, feat_names = simulate_longitudinal(n_samples=N_SAMPLES,
                                                                     n_timestamps=N_TIMESTAMPS,
                                                                     n_features=N_FEATURES,
                                                                     n_informative=N_INFORMATIVE,
                                                                     n_redundant=N_REDUNDANT,
                                                                     n_binary_features=N_BINARY_FEATURES,
                                                                     prevalence=PREVALENCE,
                                                                     missing_rate=MISSING_RATE,
                                                                     test_rate=TEST_RATE,
                                                                     impute=IMPUTE,
                                                                     random_state=RANDOM_STATE)

In [None]:
########################################################################################################################
# Define the output paths and export the data
########################################################################################################################
out_dir_sub: str = os.path.join(OUT_DIR_PATH, f'{IMPUTE}/')
os.makedirs(out_dir_sub, exist_ok=True)

# Export X_train
X_train_path: str = f'{out_dir_sub}X_train.npy'
np.save(X_train_path, X_train)

# Export X_test
X_test_path: str = f'{out_dir_sub}X_test.npy'
np.save(X_test_path, X_test)

# Export y_train
y_train_path: str = f'{out_dir_sub}y_train.npy'
np.save(y_train_path, y_train)

# Export y_test
y_test_path: str = f'{out_dir_sub}y_test.npy'
np.save(y_test_path, y_test)

# Export feat_names
feat_name_path: str = f'{out_dir_sub}Feature_Names.csv'
pd.DataFrame({'Features': feat_names}).to_csv(feat_name_path, index=False)