# Data Aggregation

This dataset loads all `.csv` files and transforms them into one big dataframe.
Then, the correct datatypes are induced and the names are set to something more appropriate and easy to work with later.
These results are saved in case more fields are wanted later.

Next, all non-relevant feature fields are discarded and outliers are filtered, where they are first set to `np.nan` for the resampling and interpolation, but are set to 0.0, eventually, to mimic a natural drop-out.
In a final step, the data is saved in a feature numpy array, which can be used as the input data for training and validation.

In [None]:
# imports
from pathlib import Path

import numpy as np
import pandas as pd
from tqdm import tqdm
from joblib import Parallel, delayed, cpu_count

from src.physics.eclipse import calculate_eclipse_details
from src.physics.msis_density import get_nrlm_densities
from src.model.scaler import ZTransform

In [None]:
# constants
raw_data_dir = Path("../data/raw")
preprocessed_data_dir = Path("../data/preprocessed")
aggregated_data_dir = preprocessed_data_dir / "aggregated"

higher_order_interpolation_limit = 4
interpolation_limit = 6
fill_limit=8

# ensure no error occurs later
aggregated_data_dir.mkdir(parents=True, exist_ok=True)

## OMNI2

The first dataset to be prepared is the OMNI2 dataset.

In [None]:
# omni constants
raw_omni_dir = raw_data_dir / "OMNI2"
preprocessed_omni_path = aggregated_data_dir / "omni.parquet"

# discard unnecessary fields
filtered_omni_path = aggregated_data_dir / "omni-filtered.parquet"


In [None]:
# aggregate into one file
omni_dtypes = {
    'YEAR': np.uint16, 'DOY': np.uint16, 'Hour': np.uint8, 'Bartels_rotation_number': np.uint16,
    'ID_for_IMF_spacecraft': np.uint16, 'ID_for_SW_Plasma_spacecraft': np.uint16,
    'num_points_IMF_averages': np.int16, 'num_points_Plasma_averages': np.int16,
    'Scalar_B_nT': np.float32, 'Vector_B_Magnitude_nT':np.float32,
    'Lat_Angle_of_B_GSE': np.float32, 'Long_Angle_of_B_GSE': np.float32,
    'BX_nT_GSE_GSM': np.float32, 'BY_nT_GSE': np.float32, 'BZ_nT_GSE': np.float32, 'BY_nT_GSM': np.float32, 'BZ_nT_GSM': np.float32,
    'RMS_magnitude_nT': np.float32, 'RMS_field_vector_nT': np.float32, 'RMS_BX_GSE_nT': np.float32, 'RMS_BY_GSE_nT': np.float32, 'RMS_BZ_GSE_nT': np.float32,
    'SW_Plasma_Temperature_K': np.float64, 'SW_Proton_Density_N_cm3': np.float32, 'SW_Plasma_Speed_km_s': np.float32, 'SW_Plasma_flow_long_angle': np.float32, 'SW_Plasma_flow_lat_angle': np.float32,
    'Alpha_Prot_ratio': np.float32,
    'sigma_T_K': np.float64, 'sigma_n_N_cm3': np.float32, 'sigma_V_km_s': np.float32, 'sigma_phi_V_degrees': np.float32, 'sigma_theta_V_degrees': np.float32, 'sigma_ratio': np.float32,
    'Flow_pressure': np.float32, 'E_electric_field': np.float32, 'Plasma_Beta': np.float32, 'Alfen_mach_number': np.float32, 'Magnetosonic_Mach_number': np.float32, 'Quasy_Invariant': np.float32,
    'Kp_index': np.float32, 'R_Sunspot_No': np.float32,
    'Dst_index_nT': np.float32, 'ap_index_nT': np.float32, 'f10.7_index': np.float32, 'AE_index_nT': np.float32, 'AL_index_nT': np.float32, 'AU_index_nT': np.float32, 'pc_index': np.float32, 'Lyman_alpha': np.float32,
    'Proton_flux_>1_Mev': np.float32, 'Proton_flux_>2_Mev': np.float32, 'Proton_flux_>4_Mev': np.float32,
    'Proton_flux_>10_Mev': np.float32, 'Proton_flux_>30_Mev': np.float32, 'Proton_flux_>60_Mev': np.float32,
    'Flux_FLAG': np.int8
}

read_dfs = {}
for file in tqdm(raw_omni_dir.iterdir(), total=len(tuple(raw_omni_dir.iterdir())), desc="Reading raw OMNI2 files"):
    if file.is_file():
        file_id = int(file.name[6:11])
        df = pd.read_csv(file, parse_dates=["Timestamp"], sep=',', index_col="Timestamp", engine='pyarrow').astype(omni_dtypes)
        read_dfs[file_id] = df

omni_df = pd.concat(read_dfs)
omni_df.index.rename(['File ID', 'Timestamp'], inplace=True)
omni_df.to_parquet(preprocessed_omni_path)

In [None]:
omni_df = pd.read_parquet(preprocessed_omni_path)
omni_df.info()

In [None]:
# look at docs/used_fields_revised.md for more details
fields_to_keep = ['Hour', 'Bartels_rotation_number', 'DOY', 'YEAR', 'f10.7_index', 'Kp_index', 'Dst_index_nT', 'Lyman_alpha', 'BY_nT_GSM', 'BZ_nT_GSM', 'Proton_flux_>1_Mev', 'Proton_flux_>2_Mev', 'Proton_flux_>4_Mev', 'Proton_flux_>10_Mev', 'Proton_flux_>30_Mev', 'Proton_flux_>60_Mev', 'SW_Proton_Density_N_cm3', 'SW_Plasma_Speed_km_s', 'AL_index_nT']

omni_df = omni_df[fields_to_keep]

In [None]:
omni_df.describe()

In [None]:
# filter outliers and save as preprocessing target
fields_to_filter = ['f10.7_index', 'BY_nT_GSM', 'BZ_nT_GSM', 'Proton_flux_>1_Mev', 'Proton_flux_>2_Mev', 'Proton_flux_>4_Mev', 'Proton_flux_>10_Mev', 'Proton_flux_>30_Mev', 'Proton_flux_>60_Mev', 'SW_Proton_Density_N_cm3', 'SW_Plasma_Speed_km_s', 'AL_index_nT']

for field in fields_to_filter:
    max_val = omni_df[field].max()
    omni_df.loc[omni_df[field] >= max_val - 1e-2 * max_val, field] = np.nan

omni_df.to_parquet(filtered_omni_path)

## GOES

Next is GOES.
Due to the amount of data being preset, it is directly resampled from 1 minute to 15 minute intervals.
This also somewhat helps to somewhat alleviate the massive gaps in the data.

In [None]:
# goes constants
raw_goes_dir = raw_data_dir / "GOES"
preprocessed_goes_path = aggregated_data_dir / "goes.parquet"

In [None]:
# aggregate into one file after choosing relevant fields and downsampling to 15 min
goes_dtypes = {
    'xrsa_flux': np.float32,
    'xrsb_flux': np.float32,
    'xrsa_flag': np.float32,    # some flags are null unfortunately
    'xrsb_flag': np.float32
}

fields_to_keep = ['xrsa_flux', 'xrsb_flux', 'xrsa_flag', 'xrsb_flag']

read_dfs = {}
goes_files = [fp for intermediate_fp in raw_goes_dir.iterdir() for fp in intermediate_fp.iterdir()]

for file in tqdm(goes_files, total=len(goes_files), desc="Reading raw GOES files"):
    if file.is_file():
        file_id = int(file.name[5:10])
        df = pd.read_csv(file, parse_dates=["Timestamp"], sep=',', index_col="Timestamp", engine='pyarrow').astype(goes_dtypes)
        df = df[fields_to_keep]

        for field in ['xrsa_', 'xrsb_']:
            field_name = field + 'flux'
            field_flag = field + 'flag'

            df.loc[df[field_flag] != 0, field_name] = np.nan

        df = df.resample('15min')
        df = (df.mean() + df.max()) / 2   # linear combination to capture better extreme events
        read_dfs[file_id] = df

goes_df = pd.concat(read_dfs)
goes_df.index.rename(['File ID', 'Timestamp'], inplace=True)
goes_df.to_parquet(preprocessed_goes_path)

In [None]:
goes_df = pd.read_parquet(preprocessed_goes_path)
goes_df.describe()

In [None]:
# only keep flux info
goes_df = goes_df[['xrsa_flux', 'xrsb_flux']]
goes_df.info()

## Satellite Initial States

Next in the line are the satellite initial states.

In [None]:
# initial states constants
raw_is_dir = raw_data_dir / "INITIAL_STATES"
preprocessed_is_path = aggregated_data_dir / "initial-states.parquet"

In [None]:
# load csvs and combine into one dataset
is_dtypes = {
    'File ID': np.int16,
    'Semi-major Axis (km)': np.float64,
    'Eccentricity': np.float32,
    'Inclination (deg)': np.float32,
    'RAAN (deg)': np.float32,
    'Argument of Perigee (deg)': np.float32,
    'True Anomaly (deg)': np.float32,
    'Latitude (deg)': np.float32,
    'Longitude (deg)': np.float32,
    'Altitude (km)': np.float64
}

fields_to_keep = ['Altitude (km)', 'Inclination (deg)', 'RAAN (deg)', 'Argument of Perigee (deg)', 'True Anomaly (deg)', 'Eccentricity', 'Semi-major Axis (km)']

read_dfs = []
is_files = [fp for fp in raw_is_dir.iterdir()]

for file in tqdm(is_files, total=len(is_files), desc="Reading raw Initial States files"):
    if file.is_file():
        df = pd.read_csv(file, parse_dates=["Timestamp"], sep=',', index_col=["File ID", "Timestamp"], dtype=goes_dtypes)
        df = df[fields_to_keep]
        read_dfs.append(df)

is_df = pd.concat(read_dfs)
is_df.to_parquet(preprocessed_is_path)

In [None]:
is_df = pd.read_parquet(preprocessed_is_path)
is_df.info()

## Sat Density

Now for the final pre-downloaded datasets, the ground truth.

In [None]:
# sat density constants
raw_sd_dir = raw_data_dir / "SAT_DENSITY"
preprocessed_sd_path = aggregated_data_dir / "sat-density.parquet"
interpolated_sd_path = aggregated_data_dir / "sat-density-interp2.parquet"

In [None]:
# load csvs and combine into one dataset
sd_dtypes = {
    'Orbit Mean Density (kg/m^3)': np.float32
}

fields_to_keep = ['Orbit Mean Density (kg/m^3)']

read_dfs = {}
sd_files = [fp for fp in raw_sd_dir.iterdir()]

for file in tqdm(sd_files, total=len(sd_files), desc="Reading raw Sat Density files"):
    if file.is_file():
        idx = file.name.index('-', 3) + 1
        file_id = int(file.name[idx:idx+5])
        df = pd.read_csv(file, parse_dates=["Timestamp"], sep=',', index_col="Timestamp", engine='pyarrow').astype(sd_dtypes)
        max_val = df['Orbit Mean Density (kg/m^3)'].max()
        df.loc[df['Orbit Mean Density (kg/m^3)'] >= max_val - 1e-2 * max_val, 'Orbit Mean Density (kg/m^3)'] = np.nan
        read_dfs[file_id] = df

sd_df = pd.concat(read_dfs)
sd_df.index.rename(['File ID', 'Timestamp'], inplace=True)
sd_df.to_parquet(preprocessed_sd_path)

In [None]:
sd_df = pd.read_parquet(preprocessed_sd_path)
sd_df.describe()

In [None]:
def _interpolate_group(
        group: pd.DataFrame
) -> pd.DataFrame:
    """Interpolate and refine a single group."""
    # if group.isna().sum().item() >= len(group) / (1 + interpolation_limit):
    #     return group

    group = group.sort_index()
    orig_idx = group.index
    group.index = orig_idx.droplevel(0)

    group = group.interpolate(method="pchip", limit_direction="both", limit=higher_order_interpolation_limit)
    group = group.interpolate(method="linear", limit_direction="both", limit=interpolation_limit)

    group.bfill(inplace=True, limit=fill_limit)
    group.ffill(inplace=True, limit=fill_limit)

    group.index = orig_idx

    return group

freq = pd.Timedelta(minutes=10)
def _pad_group(group: pd.DataFrame,
               target_length: int) -> pd.DataFrame:
    """Pad a single group to target_length."""
    cur_len = len(group)
    pad_len = target_length - cur_len

    if pad_len == 0:
        return group
    elif pad_len < 0:
        return group.tail(target_length)

    group = group.sort_index()

    # last_timestamp = group.index.get_level_values("Timestamp").max()

    # Calculate the start of the required full window
    # The new timestamps should go *before* the existing ones to fill the gap
    # new_window_start_time = last_timestamp - (target_length - 1) * freq

    # Generate the full set of timestamps for the desired window
    # This will be the timestamps for the NaNs + existing data
    # full_window_timestamps = pd.date_range(start=new_window_start_time,
    #                                         periods=target_length,
    #                                         freq=freq)

    # Create an empty DataFrame for the full window with NaN values
    # padded_df = pd.DataFrame(
    #     np.nan,
    #     index=full_window_timestamps,
    #     columns=group.columns,
    #     dtype=group.dtypes.iloc[0] # Use the dtype of the first column
    # )

    # Overlay the existing data onto this padded DataFrame
    # This will correctly align by Timestamp and fill in non-NaN values
    # padded_df.update(group.reset_index())
    ts = group.index.get_level_values("Timestamp").to_series()
    file_id = group.index.get_level_values("File ID")[0]


    extra_ts = pd.date_range(
        start=ts.iloc[-1] + freq, periods=pad_len, freq=freq
    )

    extra_idx = pd.MultiIndex.from_product(
        [[file_id], extra_ts], names=["File ID", "Timestamp"]
    )

    full_idx = group.index.append(extra_idx)

    padded_df = group.reindex(full_idx)
    return padded_df

def _process_group_batch(
        batch_groups: list[tuple[any, pd.DataFrame]],
        target_length: int,
) -> pd.DataFrame:
    """Process a batch of groups."""
    processed_results = []
    for name, group in batch_groups:
        padded_group = _pad_group(group, target_length)
        interpolated_group = _interpolate_group(
            padded_group
        )
        processed_results.append(interpolated_group)

    return pd.concat(processed_results)

def pad_and_interpolate_parallel(_df: pd.DataFrame, target_length: int) -> pd.DataFrame:
    groups = list(_df.groupby(level='File ID', sort=False))
    num_groups = len(groups)
    actual_n_jobs = cpu_count()
    actual_n_jobs = max(1, actual_n_jobs)
    batch_size = max(1, (num_groups // actual_n_jobs) + 1)

    group_batches = [groups[i:i + batch_size] for i in range(0, num_groups, batch_size)]

    print("CPU-count:", actual_n_jobs)
    print("Batch size:", batch_size)
    print("Number of groups:", num_groups)
    print("Number of group batches:", len(group_batches))
    print("Groups:", list((i, i + batch_size) for i in range(0, num_groups, batch_size)) + [(num_groups - (num_groups % batch_size), num_groups)])

    processed_batches = Parallel(
        n_jobs=-1, backend='loky', prefer='processes', verbose=1, batch_size=1, pre_dispatch=actual_n_jobs
    )(
        delayed(_process_group_batch)(
            batch,
            target_length
        )
        for batch in group_batches #  tqdm(group_batches, desc=f"Interpolating and Padding for {desc}")
    )

    return pd.concat(processed_batches).sort_index()

In [None]:
sd_df_interp = pad_and_interpolate_parallel(sd_df, 432).clip(lower=1e-14, upper=5e-11)
sd_df_interp.to_parquet(interpolated_sd_path)
sd_df_interp.describe(), sd_df_interp.isnull().sum().item()

In [None]:
sd_df_interp = pd.read_parquet(interpolated_sd_path)
sd_df_interp.describe()

## Eclipse Simulation

We supply our model with an additional Eclipse simulation for umbra and penumbra information.

In [None]:
# eclipse constants
eclipse_preprocessed_p = aggregated_data_dir / 'eclipse.parquet'

In [None]:
# put the necessary fields of initial states df into the format I used during the challenge
orig_is_df = pd.read_parquet(preprocessed_is_path)

orig_is_df.rename(columns={
    "Altitude (km)": "altitude",
    "Semi-major Axis (km)": "a",
    "Eccentricity": "e",
    "Inclination (deg)": "i",
    "RAAN (deg)": "RAAN",
    "Argument of Perigee (deg)": "omega",
    "True Anomaly (deg)": "nu"
}, inplace=True)

_conversion_factor = np.pi / 180.0
orig_is_df.a /= 1_000
orig_is_df.i = orig_is_df.i * _conversion_factor
orig_is_df.RAAN = orig_is_df.RAAN * _conversion_factor
orig_is_df.omega = orig_is_df.omega * _conversion_factor
orig_is_df.nu = orig_is_df.nu * _conversion_factor

In [None]:
# prepare eclipse
eclipse_details_df = calculate_eclipse_details(orig_is_df, n_steps=10_000, threshold=1e-9, n_workers=None)
eclipse_details_df.to_parquet(eclipse_preprocessed_p)

In [None]:
# load
eclipse_details_df = pd.read_parquet(eclipse_preprocessed_p)
eclipse_details_df.describe()

## NRLMSISE-2.1

We also need the baseline.

In [None]:
# constants
nrlm_path = aggregated_data_dir / 'nrlm.parquet'

In [None]:
# put the necessary fields of omni df into the format I used during the challenge
orig_omni_df = pd.read_parquet(preprocessed_omni_path)

orig_omni_df.rename(columns={
    "f10.7_index": "f10_7_index",
    "ap_index_nT": "ap_index",
}, inplace=True)

orig_omni_df = orig_omni_df[['f10_7_index', 'ap_index']]

max_val = orig_omni_df.f10_7_index.max()
orig_omni_df.loc[orig_omni_df.f10_7_index >= max_val - 1e-2 * max_val, 'f10_7_index'] = np.nan

omni_df_interp = orig_omni_df.copy()
omni_df_interp['f10_7_index'] = pad_and_interpolate_parallel(orig_omni_df.f10_7_index, 1440)
omni_df_interp.describe(), omni_df_interp.isnull().sum(), orig_omni_df.isnull().sum()

orig_omni_df = omni_df_interp

nrlm_df = get_nrlm_densities(orig_is_df, orig_omni_df, n_steps=1, n_workers=None, mode='relaxed')
nrlm_df.to_parquet(nrlm_path)

In [None]:
nrlm_df = pd.read_parquet(nrlm_path)
nrlm_df.describe()

## Constructing the feature dataset

In a final step all previous steps are combined into one big dataset ready for training or evaluation.

### Sat Density

Prepare the final sat density part for the dataset.

In [None]:
# sat density
is_df = pd.read_parquet(preprocessed_is_path)
is_df = is_df.rename(columns={
    'Semi-major Axis (km)': 'a',
    'Altitude (km)': 'h',
    'Inclination (deg)': 'i',   # optional value (we will probably keep it for the time being
    'RAAN (deg)': 'RAAN',
    'Argument of Perigee (deg)': 'omega',
    'True Anomaly (deg)': 'nu',
})

# we assume almost perfect circular orbits (e ~= 0) and a round earth for nan values
threshold = 1e5
is_df.loc[is_df.h > threshold, 'h'] = is_df.loc[is_df.h > threshold, 'a'] - 6378

_conversion_factor = np.pi / 180.0
is_df['orbit_angle'] = (is_df['omega'] + is_df['nu']) * _conversion_factor
is_df.RAAN *= _conversion_factor
is_df.i *= _conversion_factor

is_npy = is_df[['h', 'i', 'RAAN', 'orbit_angle']].sort_index(level=0).to_numpy()
print(is_npy.shape)

### OMNI2

Prepare the final OMNI2 part for the dataset.

In [None]:
# load filtered OMNI
omni_df = pd.read_parquet(filtered_omni_path)
omni_df.describe()

In [None]:
fields_to_keep = [
    'year', 'doy', 'bartels_no', 'f10_7_index', 'kp_index', 'dst_index',
    'lyman_alpha', 'by', 'bz', 'proton_density', 'plasma_speed',
    # optional fields?
    'proton_flux_1', 'proton_flux_2', 'proton_flux_4', 'proton_flux_10', 'proton_flux_10', 'proton_flux_30', 'proton_flux_60', 'al_index', 'hour'
]

omni_df = omni_df.rename(columns={
    'YEAR': 'year',
    'DOY': 'doy',
    'Hour': 'hour',
    'Bartels_rotation_number': 'bartels_no',
    'f10.7_index': 'f10_7_index',
    'Kp_index': 'kp_index',
    'Dst_index_nT': 'dst_index',
    'Lyman_alpha': 'lyman_alpha',
    'BY_nT_GSM': 'by',
    'BZ_nT_GSM': 'bz',
    'SW_Proton_Density_N_cm3': 'proton_density',
    'SW_Plasma_Speed_km_s': 'plasma_speed',
    'Proton_flux_>1_Mev': 'proton_flux_1',
    'Proton_flux_>2_Mev': 'proton_flux_2',
    'Proton_flux_>4_Mev': 'proton_flux_4',
    'Proton_flux_>10_Mev': 'proton_flux_10',
    'Proton_flux_>30_Mev': 'proton_flux_30',
    'Proton_flux_>60_Mev': 'proton_flux_60',
    'AL_index_nT': 'al_index'
})[fields_to_keep]

norm_factor = 2 * np.pi
omni_df.bartels_no = np.sin(omni_df.bartels_no * norm_factor)
omni_df.doy = np.sin(omni_df.doy * norm_factor / 365.25)
omni_df.hour = np.sin(omni_df.hour * norm_factor / 24)
omni_df['solar_cycle'] = np.sin(omni_df.year * norm_factor / 11)

In [None]:
# Get unique File IDs in the desired order
unique_file_ids = omni_df.index.get_level_values("File ID").unique().sort_values()

# List to store 1D NumPy arrays for each feature/aggregation type
all_feature_arrays = []

# 1. Get first element of each Subseries for: ["bartels_no", "doy", "solar_cycle"]
first_elements_df = omni_df.groupby(level="File ID")[["bartels_no", "doy", "solar_cycle"]].last().reindex(unique_file_ids)
all_feature_arrays.append(first_elements_df.values) # Convert DataFrame to NumPy array

def aggregate_time_window_numpy(dataframe, columns, window_str, file_ids_order):
    last_timestamps = dataframe.groupby(level='File ID').apply(lambda x: x.index.get_level_values('Timestamp').max()).reindex(file_ids_order)

    # Pre-allocate array for results for this aggregation type
    # Each column will have 3 or now 4 aggregations (max, mean, std, (min))
    num_output_cols = len(columns) * 3
    # num_output_cols = len(columns) * 4
    results_array = np.full((len(file_ids_order), num_output_cols), np.nan, dtype=np.float64)

    for i, file_id in tqdm(enumerate(file_ids_order), desc="Aggregating OMNI"):
        last_ts = last_timestamps.loc[file_id]
        window_start = last_ts - pd.to_timedelta(window_str)
        file_df = dataframe.loc[file_id]
        windowed_data = file_df.loc[window_start:last_ts, columns]

        if not windowed_data.empty:
            agg_result = windowed_data.agg(['max', 'mean', 'std'])
            # agg_result = windowed_data.agg(['max', 'mean', 'std', 'min'])
            # agg_result is a DataFrame, convert its values to a 1D array row-wise
            # This order (max, mean, std for col1, then max, mean, std for col2, etc.)
            # matches the desired flattening
            results_array[i, :] = agg_result.values.flatten()
        # If windowed_data is empty, the pre-allocated np.nan values remain

    return results_array

# 2. Get max, mean, std for each Subseries for last 14d, 3d, last 3h for:
#    ["f10_7_index", "kp_index", "dst_index", "lyman_alpha", "proton_density", "plasma_speed"]
fields_long_window = ["f10_7_index", "kp_index", "dst_index", "lyman_alpha", "proton_density", "plasma_speed"]
time_windows_long = ["14d", "3d", "3h"]
for window in time_windows_long:
    agg_array = aggregate_time_window_numpy(omni_df, fields_long_window, window, unique_file_ids)
    all_feature_arrays.append(agg_array)

# 3. Get max, mean, std for each Subseries for last 24h, 3h for: ["by", "bz"]
fields_short_window = ["by", "bz"]
time_windows_short = ["24h", "3h"]
for window in time_windows_short:
    agg_array = aggregate_time_window_numpy(omni_df, fields_short_window, window, unique_file_ids)
    all_feature_arrays.append(agg_array)

def resample_f10_7_numpy(dataframe, window_str="60d", file_ids_order=unique_file_ids):
    last_timestamps = dataframe.groupby(level='File ID').apply(lambda x: x.index.get_level_values('Timestamp').max()).reindex(file_ids_order)

    results_array = np.full((len(file_ids_order), 3), np.nan, dtype=np.float64) # 3 for mean, std, max

    for i, file_id in enumerate(file_ids_order):
        last_ts = last_timestamps.loc[file_id]
        window_start = last_ts - pd.to_timedelta(window_str)
        file_df = dataframe.loc[file_id]
        windowed_data = file_df.loc[window_start:last_ts, "f10_7_index"]

        if not windowed_data.empty:
            mean_val = windowed_data.mean()
            std_val = windowed_data.std()
            max_val = windowed_data.max()
            results_array[i, :] = [mean_val, std_val, max_val]
        # If windowed_data is empty, the pre-allocated np.nan values remain

    return results_array

# 4. Resample f10_7_index to full 60d window with mean, std, and max aggregations
f10_7_resampled_array = resample_f10_7_numpy(omni_df, "60d", unique_file_ids)
all_feature_arrays.append(f10_7_resampled_array)

# Combine all feature arrays horizontally
omni_npy = np.hstack(all_feature_arrays)
print(omni_npy.shape)

### GOES

Prepare features for GOES dataset.

In [None]:
# load
goes_df = pd.read_parquet(preprocessed_goes_path)
goes_df.describe()

In [None]:
# prepare
goes_df = goes_df[['xrsa_flux', 'xrsb_flux']]
all_feature_arrays = []

for window_str in ['14d', '3d', '3h']:
    agg_array = aggregate_time_window_numpy(goes_df, ['xrsa_flux', 'xrsb_flux'], window_str, unique_file_ids)
    all_feature_arrays.append(agg_array)

goes_npy = np.hstack(all_feature_arrays)

In [None]:
goes_npy.shape

### Eclipse

Prepare the eclipse simulation.

In [None]:
# load
eclipse_df = pd.read_parquet(eclipse_preprocessed_p)
eclipse_df.describe()

In [None]:
# prepare
eclipse_npy = eclipse_df[['penumbra_entry_nu', 'penumbra_exit_nu', 'umbra_entry_nu', 'umbra_exit_nu']].to_numpy()

eclipse_npy += is_df.omega.to_numpy().reshape(-1, 1)
print(eclipse_npy.shape)

### Residual targets

For the residual targets we need both the given ground truth and the NRLMSISE-2.1 predictions.

In [None]:
# load
nrlm_df = pd.read_parquet(nrlm_path)
sd_df = pd.read_parquet(interpolated_sd_path)

# sd_df.sort_index().to_numpy().reshape(len(unique_file_ids), -1)
# fixed this already in the interpolation step
# sd_df = sd_df.groupby(level='File ID').tail(432)    # some are malformatted... -.-
sd_df.describe()

In [None]:
# combine residuals
nrlm_npy = nrlm_df.to_numpy()
unique_file_ids = sd_df.index.get_level_values("File ID").unique().sort_values()
y_npy = sd_df.to_numpy().reshape(len(unique_file_ids), -1) - nrlm_npy

In [None]:
y_npy

In [None]:
bad_ys = np.unique(np.argwhere(np.isnan(y_npy))[:, 0])
print(len(bad_ys))

### Combine everything

Combining all previous steps into one final dataset.

In [None]:
# combine
final_npy = np.hstack([
    is_npy,
    omni_npy,
    goes_npy,
    eclipse_npy
])

# exclude bad ys
good_final_npy = np.delete(final_npy, bad_ys, axis=0)
good_nrlm_npy = np.delete(nrlm_npy, bad_ys, axis=0)
good_y_npy = np.delete(y_npy, bad_ys, axis=0)

np.nan_to_num(good_final_npy, nan=0.0, posinf=0.0, neginf=0.0, copy=False)

print(good_final_npy.shape)

In [None]:
# apply z transform
base_z_scaler_path = Path('../model/z-scaler/')
feature_z_scaler_path = base_z_scaler_path / "feature-scaler-scaled-time-with-min.npz"
x_scaler = ZTransform(good_final_npy)
x_scaler.save(feature_z_scaler_path)

ground_truth_path = base_z_scaler_path / "ground-truth-scaler-scaled-time-with-min.npz"
y_scaler = ZTransform(good_y_npy)
y_scaler.save(ground_truth_path)

In [None]:
# save final dataset
dataset_path = preprocessed_data_dir / "dataset-scaled-time-with-min.npz"
np.savez(file=dataset_path,
         x=x_scaler.z_transform(good_final_npy), y=y_scaler.z_transform(good_y_npy), nrlm=good_nrlm_npy)