In [None]:
# In: notebooks/05_Test_DataLoader.ipynb
# Purpose: Test the custom OASISDataset and pad_collate_fn
#          to ensure data loading, preprocessing (using saved objects),
#          sequencing, padding, and batching works correctly.

# Notebook 05: Test Data Loading Pipeline

**Purpose:** Verify that the custom `OASISDataset` class and `pad_collate_fn` (defined in `src/datasets.py`) work correctly. This involves:
1.  Loading the pre-split data (`.parquet` files from NB 03).
2.  Loading and applying the pre-fitted preprocessors (`.joblib` files from NB 04).
3.  Correctly structuring data into sequences grouped by subject.
4.  Handling variable sequence lengths via padding and masking within batches.
5.  Generating batches in the expected format (shapes, data types) for input into a PyTorch sequence model.

**Input:**
* `cohort_{train|validation}.parquet` (Output from NB 03)
* `standard_scaler.joblib` (Output from NB 04)
* `simple_imputer_median.joblib` (Output from NB 04)
* `src/datasets.py` (Contains `OASISDataset` and `pad_collate_fn`)

**Output:** Console output displaying the properties (shapes, types, example values) of generated batches for verification. No data files are saved by this notebook.

## Setup

Import necessary libraries (torch, pandas, DataLoader, joblib, etc.). Critically, import the custom `OASISDataset` and `pad_collate_fn` from the `src` directory (ensuring `src` is added to the Python path). Load project configuration from `config.json` to retrieve paths to data splits and saved preprocessors. Define DataLoader parameters like `BATCH_SIZE`.

In [None]:
# --- Import Libraries ---
import pandas as pd
import numpy as np
import torch # Assuming PyTorch
from torch.utils.data import DataLoader
import joblib
import json
from pathlib import Path
import sys
import os
import wandb

In [None]:
# --- Add src directory to Python path to allow importing datasets ---
try:
    # Assumes notebook is in notebooks/ and src/ is parallel
    module_path = os.path.abspath(os.path.join('..'))
    if module_path not in sys.path:
        sys.path.append(module_path)
    print(f"Added {module_path} to sys.path")
    
    # --- Import custom dataset class and collate function ---
    from src.datasets import OASISDataset, pad_collate_fn
    print("Successfully imported OASISDataset and pad_collate_fn from src/datasets.py")
except ModuleNotFoundError:
     print("Error: Could not import from src/datasets.py.")
     print("Ensure the file exists in the 'src' directory parallel to 'notebooks'.")
     print(f"Current working directory: {os.getcwd()}")
     print(f"Sys path: {sys.path}")
     exit()
except Exception as e:
     print(f"An unexpected error occurred during import: {e}")
     exit()

In [None]:
# --- Config Loading ---
print("\n--- Loading Configuration ---")
CONFIG_PATH = Path('../config.json')
run = None # Initialize run to None globally for the notebook if needed elsewhere
config = {} # Initialize config
# Define default lists in case W&B fails - adjust these if needed or handle error differently
time_varying_cols = []
static_cols = []
scaling_cols = []
imputation_cols = []

try:
    PROJECT_ROOT = CONFIG_PATH.parent.resolve()
    with open(CONFIG_PATH, 'r', encoding='utf-8') as f:
        config = json.load(f)
    print("Configuration loaded successfully.")

    # Define paths from config
    OUTPUT_DIR_BASE = PROJECT_ROOT / config['data']['output_dir_base']
    NB03_OUTPUT_DIR = OUTPUT_DIR_BASE / "03_Feature_Engineering_Splitting"
    NB04_OUTPUT_DIR = OUTPUT_DIR_BASE / "04_Fit_Preprocessors"

    # Paths to data splits
    TRAIN_DATA_PATH = NB03_OUTPUT_DIR / "cohort_train.parquet"
    VAL_DATA_PATH = NB03_OUTPUT_DIR / "cohort_validation.parquet"
    # TEST_DATA_PATH = NB03_OUTPUT_DIR / "cohort_test.parquet" # Not to be tested here

    # Paths to saved preprocessors
    SCALER_PATH = NB04_OUTPUT_DIR / "standard_scaler.joblib"
    IMPUTER_PATH = NB04_OUTPUT_DIR / "simple_imputer_median.joblib"

    # --- Define W&B Project/Entity for potential use ---
    WANDB_PROJECT = config['wandb']['project_name']
    WANDB_ENTITY = config['wandb'].get('entity', None)

    # Basic check if required data/preprocessor files exist
    if not TRAIN_DATA_PATH.is_file() or not VAL_DATA_PATH.is_file():
         raise FileNotFoundError(f"Train ({TRAIN_DATA_PATH.is_file()}) or Validation ({VAL_DATA_PATH.is_file()}) parquet file not found.")
    if not SCALER_PATH.is_file() or not IMPUTER_PATH.is_file():
         raise FileNotFoundError(f"Scaler ({SCALER_PATH.is_file()}) or Imputer ({IMPUTER_PATH.is_file()}) joblib file not found.")

except Exception as e:
    print(f"Error during setup before W&B init: {e}")
    # Decide if you can proceed without config/paths or exit
    exit()

In [None]:
# --- Initialize W&B and Load Configuration from Prior Run ---
print("\n--- Initializing W&B to load configuration from NB04 Run ---")

# *** IMPORTANT: Replace 'RUN_ID_FROM_NB04' below with the actual W&B Run ID ***
# Obtain this from the output of your successful NB04 run or from the W&B UI.
PRIOR_RUN_ID = "RUN_ID_FROM_NB04" 

try:
    run = wandb.init(
        project=WANDB_PROJECT,
        entity=WANDB_ENTITY,
        id=PRIOR_RUN_ID,
        resume="allow", # Allows fetching config/summary from a finished run
        job_type="create-dataloaders", # Job type for this notebook
        name=f"NB05_dataloader_using_{PRIOR_RUN_ID.split('/')[-1]}", # Create a descriptive name
        config=config # Pass the base config loaded from json initially
    )
    print(f"W&B run initialized. Attached to prior run: {run.url}")

    # --- Load feature/preprocess lists from the fetched run.config ---
    # Use .get() for safety, providing empty list as default if key missing
    # Access using the nested structure logged by NB04
    features_config = run.config.get('features', {})
    preprocess_config = run.config.get('preprocess', {})

    time_varying_cols = features_config.get('time_varying', [])
    static_cols = features_config.get('static', [])
    scaling_cols = preprocess_config.get('scaling_cols', [])
    imputation_cols = preprocess_config.get('imputation_cols', [])

    # Check if lists were loaded successfully
    if not time_varying_cols or not static_cols or not scaling_cols:
         print("⚠️ Warning: Feature or scaling lists loaded from W&B config are empty!")
         print(f"  Loaded time_varying: {time_varying_cols}")
         print(f"  Loaded static: {static_cols}")
         print(f"  Loaded scaling: {scaling_cols}")
         print(f"  Loaded imputation: {imputation_cols}")
         print("  Check the config of the W&B run specified:", PRIOR_RUN_ID)
         # Decide if you should exit or proceed with empty lists
         # exit()
    else:
        print("Feature and preprocess lists loaded successfully from W&B config.")
        print(f"  Time Varying: {time_varying_cols}")
        print(f"  Static: {static_cols}")
        print(f"  Scaling: {scaling_cols}")
        print(f"  Imputation: {imputation_cols}")

except Exception as e:
    print(f"Error initializing W&B or loading config from run '{PRIOR_RUN_ID}': {e}")
    print("Proceeding with potentially empty feature lists defined earlier (or default).")
    # 'run' will be None if wandb.init failed


In [None]:
# --- Parameters for DataLoader ---
BATCH_SIZE = 4 # Use a small batch size for testing

## Instantiate Datasets

Create instances of the `OASISDataset` class for the training and validation splits, providing the paths to the data files and the saved preprocessor objects. The `config` dictionary is passed to provide feature lists.

In [None]:
print("\n--- Instantiating Datasets ---")
try:
    print("Creating train_dataset...")
    train_dataset = OASISDataset(
        data_parquet_path=TRAIN_DATA_PATH,
        scaler_path=SCALER_PATH,
        imputer_path=IMPUTER_PATH,
        config=run.config # Pass the loaded config dictionary
    )
    print(f"Train dataset created with {len(train_dataset)} subjects.")

    print("\nCreating val_dataset...")
    val_dataset = OASISDataset(
        data_parquet_path=VAL_DATA_PATH,
        scaler_path=SCALER_PATH, # Use the SAME scaler/imputer fitted on train data
        imputer_path=IMPUTER_PATH,
        config=run.config
    )
    print(f"Validation dataset created with {len(val_dataset)} subjects.")

except Exception as e:
    print(f"Error instantiating datasets: {e}")
    # Potentially print more debug info or raise
    raise e

## Create DataLoaders

Wrap the `OASISDataset` instances in PyTorch `DataLoader`. The DataLoader is responsible for:
* Batching the data (grouping multiple subjects/sequences together).
* Shuffling the training data before each epoch (optional but recommended).
* Using the custom `pad_collate_fn` to take lists of sequences (potentially of different lengths) and pad them into uniform tensors suitable for model input, while also providing sequence lengths and masks.
* Potentially using multiple worker processes for efficiency (set `num_workers=0` for easier debugging initially).

In [None]:
print("\n--- Creating DataLoaders ---")
# Use num_workers=0 for easier debugging initially, can increase later for speed if needed
# persistent_workers=False is often needed on some systems esp. Windows with num_workers > 0
train_loader = DataLoader(
    train_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True, # Shuffle training data each epoch
    collate_fn=pad_collate_fn,
    num_workers=0,
    persistent_workers=False
)
print(f"Total batches calculated by DataLoader: {len(train_loader)}")

val_loader = DataLoader(
    val_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False, # No need to shuffle validation data
    collate_fn=pad_collate_fn,
    num_workers=0,
    persistent_workers=False
)

print("DataLoaders created.")
print(f"Number of batches in train_loader: ~{len(train_loader)}")
print(f"Number of batches in val_loader: ~{len(val_loader)}")

## Test Batch Iteration

To verify the pipeline, we iterate through a small number of batches yielded by the `train_loader`. For each batch, we unpack the contents (padded sequences, lengths, targets, masks) and print their shapes, data types, and some example values or summaries.

**Key things to check in the output:**
* Are there any errors during iteration?
* Is the shape of `sequences_padded` `(batch_size, max_seq_len_in_batch, num_features)`?
* Is the shape of `lengths` `(batch_size,)` and do the values match the actual sequence lengths before padding (verifiable by looking at the `masks`)?
* Is the shape of `targets` `(batch_size, 1)` or `(batch_size,)`?
* Is the shape of `masks` `(batch_size, max_seq_len_in_batch)` and is its dtype `torch.bool`?
* Do the mask values (`True`/`False`) correctly correspond to the `lengths` tensor?

In [None]:
print("\n--- Testing Batch Iteration (Train Loader) ---")

num_batches_to_test = 2
for i, batch in enumerate(train_loader):
    if i >= num_batches_to_test:
        break

    print(f"\n--- Batch {i+1} ---")
    # Unpack the batch (output from pad_collate_fn)
    sequences_padded, lengths, targets, masks = batch

    print(f"Sequences Tensor Shape: {sequences_padded.shape}") # Should be (batch_size, max_seq_len_in_batch, num_features)
    print(f"Sequences Tensor Type: {sequences_padded.dtype}") # Should be torch.float32
    print(f"Lengths Tensor Shape: {lengths.shape}") # Should be (batch_size,)
    print(f"Lengths Tensor Type: {lengths.dtype}") # Should be torch.int64
    print(f"Lengths Tensor Values: {lengths.tolist()}") # Show actual lengths
    print(f"Targets Tensor Shape: {targets.shape}") # Should be (batch_size, 1)
    print(f"Targets Tensor Type: {targets.dtype}") # Should be torch.float32
    print(f"Targets Tensor Values:\n{targets.squeeze().tolist()}") # Show target values
    print(f"Masks Tensor Shape: {masks.shape}") # Should be (batch_size, max_seq_len_in_batch)
    print(f"Masks Tensor Type: {masks.dtype}") # Should be torch.bool

    # Optional: Print a slice of a sequence and its mask to check padding
    if sequences_padded.shape[0] > 0 and sequences_padded.shape[1] > 5: # If batch not empty and sequence long enough
         print("\nExample Sequence (First 5 steps, First Sample):")
         print(sequences_padded[0, :5, :]) # Print first 5 time steps for first subject in batch
         print("\nCorresponding Mask (First 5 steps, First Sample):")
         print(masks[0, :5])
         print(f"(Original length for this sample was: {lengths[0].item()})")


print(f"\n--- Finished testing {num_batches_to_test} batches. ---")
print("Check shapes, types, lengths, and masks carefully.")

# Optional: Initialize W&B here if you want to log test results
# run_test = wandb.init(...)
# run_test.log({'test_batch_sequence_shape': list(sequences_padded.shape)})
# run_test.finish()

In [None]:
# --- Finish W&B Run ---
print("\n--- DataLoader Testing complete. Finishing W&B run. ---")
if run:
    # Optional: Log success confirmation or any test metrics
    # run.log({"dataloader_test_status": "success"})
    run.finish()
    print("W&B run finished.")
else:
    print("No active W&B run to finish.")

print("\nScript execution finished.")

## Conclusion

If the batches iterated successfully and the shapes/types look correct (sequences are padded, lengths match, targets are present, masks align with lengths), the data loading pipeline defined in `src/datasets.py` is working correctly with the preprocessors fitted in Notebook 04. The data is now ready to be used by a sequence model.