In [None]:
# Load data
import pandas as pd
import numpy as np
import torch 
import os
from datetime import datetime

# Section: Generate-paths
exp_dir = "/mnt/data/sur/users/mrivera/Train-sims/4379fd40-9f0a"
A_dir = os.path.join(exp_dir, "A-mat")
tgt_dir = os.path.join(exp_dir, "GNN-targets")
odes_path = os.path.join(exp_dir, "raw-ODEs")
data_path = os.path.join(exp_dir, "parameters-sims.tsv")

# Generate ID for training.
timeID = datetime.now().strftime("Y%YM%mD%d")

#  Load-data
data_ids = pd.read_csv(data_path, sep="\t", usecols=['id'])['id']

In [None]:
# SECTION: Load-function
from torch_geometric.data import Data
import pyarrow.feather as feather

def load_single_data(id, A_dir, tgt_path):
    # Load adjacency matrix 
    A_path = os.path.join(A_dir, f"A_{id}.feather")
    A = pd.read_feather(A_path).to_numpy(dtype=np.float32)
    # Vector of edge weights
    row_idx, col_idx = np.nonzero(A)
    edge_weights = A[row_idx, col_idx]
    # Convert to torch tensors efficiently
    edge_index = torch.from_numpy(np.vstack([row_idx, col_idx]).astype(np.int64))
    edge_weights = torch.from_numpy(edge_weights)
    # Load target features 
    tgt_path = os.path.join(tgt_dir, f"tgt_{id}.feather")
    tgt_table =  feather.read_table(tgt_path, columns=['K_s'])
    y_tensor = torch.from_numpy(tgt_table.to_pandas().to_numpy(dtype=np.float32))   
    # Node features - simple ones vector
    n = A.shape[0]
    x_tensor = torch.ones(n, 1, dtype=torch.float32)
    # Clean up large intermediate
    del A, tgt_table
    # Create Data object
    data = Data(
        x=x_tensor,
        edge_weights=edge_weights,
        edge_index=edge_index,
        y=y_tensor
    )
    return data



In [None]:
# SECTION: Divide-data
import random

# Load all data samples (for demo, we use only first 100 samples)
indices = list(range(1, len(data_ids)))  # Indices 1-100
random.shuffle(indices)  # Uses Python's random module (already seeded)

# Now select first 80 for training, rest for validation
indx = round(len(indices) * .8)
train_indices = indices[:indx]            # First 80 shuffled indices
val_indices = indices[indx:]              # Last 20 shuffled indices

## Performance Comparison: Sequential vs Parallel Data Loading

Compare loading times between `load_single_data` (sequential) and `generate_data_parallel` (parallel processing).

In [None]:
# Section: Declare new function
from concurrent.futures import ThreadPoolExecutor
import time

def generate_data_parallel(idx, A_dir, tgt_dir, num_workers=4):  # idx is a list
    with ThreadPoolExecutor(max_workers=num_workers) as executor:
        data_list = list(executor.map(
            load_single_data,           
            idx,                       # List of IDs to iterate over
            [A_dir]*len(idx),          # Repeat A_dir for each ID
            [tgt_dir]*len(idx)         # Repeat tgt_dir for each ID
        ))
    return data_list

#-----------------------------------------------------------
# Generate data with method 1
x = train_indices[:1000]

start = time.time()
data = [load_single_data(data_ids[id], A_dir, tgt_dir) for id in x] 
not_par_time = time.time() - start

# Generate data with method 2
start = time.time()
batch_size = 100
batching_ids = data_ids[x]
num_batches = (len(batching_ids) + batch_size - 1) // batch_size
for i in range(0, num_batches):
    batch_ids = batching_ids[i:i + batch_size]
    data = generate_data_parallel(batch_ids, A_dir, tgt_dir, num_workers=4)            # First 80 after shuffling
    # print(f"Batching {i} completed...")
    print(data)

par_time = time.time() - start
print(f">> The not parallelized time is of: {not_par_time:.2f}, while the parallel time is of: {par_time:.2f}")

## Parallel Data Loading of training dataset and save files

Load data using parallelize function `generate_data_parallel`. We then save data in batches at a specific directory.

In [None]:
from concurrent.futures import ThreadPoolExecutor
import time

def generate_data_parallel(idx, A_dir, tgt_dir, num_workers=4):  # idx is a list
    with ThreadPoolExecutor(max_workers=num_workers) as executor:
        data_list = list(executor.map(
            load_single_data,           
            idx,                       # List of IDs to iterate over
            [A_dir]*len(idx),          # Repeat A_dir for each ID
            [tgt_dir]*len(idx)         # Repeat tgt_dir for each ID
        ))
    return data_list

# Generate data and separate in batches
path = '/mnt/data/sur/users/mrivera/Train-sims/4379fd40-9f0a/batching'
os.makedirs(path, exist_ok=True)

start = time.time()
batch_size = 1000
batching_ids = data_ids[train_indices] #fixme
num_batches = (len(batching_ids) + batch_size - 1) // batch_size
for i in range(0, num_batches):
    start_idx = i * batch_size
    end_idx = start_idx + batch_size
    batch_ids = data_ids[start_idx:end_idx]
    # Load data
    data = generate_data_parallel(batch_ids, A_dir, tgt_dir, num_workers=6)
    name = os.path.join(path, f'TrainBatch_{i}.pt')
    torch.save(data, name)  # Actually save the data
    del data  # Free memory immediately
    print(f"Saved batch {i}/{num_batches-1}: {name}")

par_time = time.time() - start
print(f">> The batch size is of {batch_size:.2f}, the number of batches was {num_batches}, while the elapsed time is of: {par_time:.2f}")


In [None]:
# Load batch
# Add PyG Data objects to safe globals
torch.serialization.add_safe_globals([Data])

# Now load normally
start = time.time()
for i in range(0, num_batches):
    data= torch.load(f'/mnt/data/sur/users/mrivera/Train-sims/4379fd40-9f0a/batching/BatchedData_{i}.pt',  weights_only=False)

elapsed = time.time() - start
print(f">> The elapsed time is {elapsed:.2f}.")


## Parallel Data Loading of validation dataset and save files

In [None]:
# Same idea just for validation

start = time.time()
batch_size = 1000
batching_ids = data_ids[val_indices] #fixme
num_batches = (len(batching_ids) + batch_size - 1) // batch_size
for i in range(0, num_batches):
    start_idx = i * batch_size
    end_idx = start_idx + batch_size
    batch_ids = data_ids[start_idx:end_idx]
    # Load data
    data = generate_data_parallel(batch_ids, A_dir, tgt_dir, num_workers=6)
    name = os.path.join(path, f'ValBatch_{i}.pt')
    torch.save(data, name)  # Actually save the data
    del data  # Free memory immediately
    print(f"Saved batch {i}/{num_batches-1}: {name}")
