# Summary

Notebook for distriputed training.

# Imports/Setup

In [1]:
from accelerate import Accelerator, notebook_launcher
import torch
import numpy as np
import matplotlib.pyplot as plt
from wandb_helper import init_wandb, save_model_architecture, finish_run
from torch import nn
import torch.optim as optim
from safetensors.torch import load_file
from diffusers import UNet2DModel
import data
import dataset
import model
import conditional
import math
import utility

2025-01-27 18:12:56.451745: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-01-27 18:12:56.466336: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:485] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2025-01-27 18:12:56.485147: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:8454] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2025-01-27 18:12:56.490925: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1452] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-01-27 18:12:56.504446: I tensorflow/core/platform/cpu_feature_guar

In [2]:
class Config:    
    # dataset
    path = '/data/users/jupyter-dam724/colliding_solutions'
    solver = 'ros2'
    fixed_seq_len = 216
    ahead = 1
    tail = 1
    aug = True
    upsample_size = 96

    # device (not used but needed for dataset)
    device_pref = 'cuda'
    device_ind = None
    
    # distributed training
    num_processes = 3
    per_gpu_batch_size = 3
    total_batch_size = per_gpu_batch_size * num_processes # (temporarily removed)
    workers_per_gpu = 6
    tworkers = workers_per_gpu * num_processes
    vworkers = workers_per_gpu * num_processes
    grad_accumulate = 8
    
    # optimization
    base_lr = 1e-5
    max_lr = 1e-4
    lr = base_lr * math.sqrt(total_batch_size / (per_gpu_batch_size))  # sqrt scaling
    
    # training
    epoches = 100
    timesteps = 4000
    loss_type = "simple"
    sample_delay = 10
    
    # experimentations
    project_name = "Operator Guided Diffusion"
    experiment_name = 'init-conditional-opout-resumed'
    save_path = f'/data/users/jupyter-dam724/time-invariant-operator/checkpoint/{experiment_name}/'
    utility.validate_and_create_save_path(save_path, experiment_name)
    from_checkpoint = None 
    op_ckpt = '/data/users/jupyter-dam724/time-invariant-operator/checkpoint/operator-training-adjusted-dropoutbigger/valid/model.safetensors'

In [3]:
init_wandb(
    project_name=Config.project_name,
    run_name=Config.experiment_name,
    config_class=Config,
    save_path=Config.save_path
)

Failed to detect the name of this notebook, you can set it manually with the WANDB_NOTEBOOK_NAME environment variable to enable code saving.
[34m[1mwandb[0m: Currently logged in as: [33mdavid724[0m ([33mdavid724-lehigh-university[0m). Use [1m`wandb login --relogin`[0m to force relogin




# Training

In [None]:
# TODO : move to model file after works

def load_training_state(accelerator, checkpoint_path, model, optimizer, scheduler):
    # Load state dict
    state = torch.load(checkpoint_path, map_location=accelerator.device)
    
    # Restore model state
    accelerator.unwrap_model(model).load_state_dict(state['model_state_dict'])
    
    # Restore optimizer state
    optimizer.load_state_dict(state['optimizer_state_dict'])
    
    # Restore scheduler if it exists
    if scheduler is not None and state['scheduler_state_dict'] is not None:
        scheduler.load_state_dict(state['scheduler_state_dict'])
    
    # Restore RNG states
    rng_states = state['rng_states']
    random.setstate(rng_states['python'])
    np.random.set_state(rng_states['numpy'])
    torch.set_rng_state(rng_states['torch'])
    if torch.cuda.is_available() and rng_states['cuda'] is not None:
        torch.cuda.set_rng_state_all(rng_states['cuda'])
    
    return state['epoch']

In [4]:
def acelerate_ddp():
    accelerator = Accelerator(gradient_accumulation_steps=Config.grad_accumulate)
    
    data_params = {
        'path': Config.path, 
        'device_pref': Config.device_pref, 
        'solver': Config.solver, 
        'fixed_seq_len': Config.fixed_seq_len, 
        'ahead': Config.ahead, 
        'tail': Config.tail,
        'device_ind': Config.device_ind
    }

    _, (x_train_data, y_train_data), (x_valid_data, y_valid_data) = data.main(**data_params)
    
    dataset_params = {
        'x_train_data': x_train_data, 
        'x_valid_data': x_valid_data,
        'batch_size': Config.total_batch_size,
        't_timesteps': Config.timesteps,
        'tworkers': Config.tworkers, 
        'vworkers': Config.vworkers,
        'upsample_size': Config.upsample_size,
        'aug': Config.aug
    }

    train_dl, valid_dl = dataset.main(**dataset_params)
    
    unet = UNet2DModel(
        sample_size=(Config.upsample_size, Config.upsample_size),        
        in_channels=2,         
        out_channels=1,         
        layers_per_block=4,      
        block_out_channels=(64, 128, 256, 512),  
        down_block_types=(
            "DownBlock2D",      # 128 channels at 96x96
            "DownBlock2D",      # 256 channels at 48x48
            "AttnDownBlock2D",  # 384 channels at 24x24
            "AttnDownBlock2D",  # 512 channels at 12x12
        ),
        up_block_types=(
            "AttnUpBlock2D",
            "AttnUpBlock2D",
            "UpBlock2D",
            "UpBlock2D"
        )
    )
    
    save_model_architecture(unet, Config.save_path)
    
    # TODO : load from state and continue training
    
    if Config.from_checkpoint is not None:
        state_dict = load_file(Config.from_checkpoint)
        model.load_model_weights(unet, state_dict)
        
    operator = UNet2DModel(
        sample_size=(Config.upsample_size, Config.upsample_size),        
        in_channels=1,         
        out_channels=1,         
        layers_per_block=2,      
        block_out_channels=(64, 64, 128, 64),  
        down_block_types=(
            "DownBlock2D",      # 64 channels at 96x96
            "DownBlock2D",      # 64 channels at 48x48
            "AttnDownBlock2D",  # 128 channels at 24x24
            "AttnDownBlock2D"   # 64 channels at 12x12
        ),
        up_block_types=(
            "AttnUpBlock2D",
            "AttnUpBlock2D",
            "UpBlock2D",
            "UpBlock2D"
        )
    )

    if Config.op_ckpt is not None:
        state_dict = load_file(Config.op_ckpt)
        model.load_model_weights(operator, state_dict)

    optimizer = optim.AdamW(unet.parameters(), lr=Config.lr)
    
    scheduler = optim.lr_scheduler.OneCycleLR(
        optimizer,
        max_lr=Config.max_lr,
        epochs=Config.epoches,
        steps_per_epoch=len(train_dl),
        pct_start=0.1,  
        div_factor=25,  
        final_div_factor=1e4 
    )
    
    # Send everything through `accelerator.prepare`
    train_dl, valid_dl, unet, operator, optimizer, scheduler = accelerator.prepare(
        train_dl, valid_dl, unet, operator, optimizer, scheduler
    )
        
    train_log, valid_log = [], []
    
    training_params = {
        'accelerator': accelerator,
        'train': train_dl, 
        'valid': valid_dl,
        'model': unet, 
        'operator': operator,
        'epochs': Config.epoches, 
        'criterion': nn.MSELoss(), 
        'save_path': Config.save_path, 
        'loss_type': Config.loss_type,
        'train_log': train_log, 
        'optimizer': optimizer, 
        'scheduler': scheduler, 
        'sample_delay': Config.sample_delay,
        't_timesteps': Config.timesteps,
        'size': Config.upsample_size,
        'loading_bar': False
    }
    
    conditional.accelerator_train(**training_params)

In [5]:
notebook_launcher(acelerate_ddp, args=(), num_processes=Config.num_processes)

Launching training on 3 GPUs.
Now using GPU.
Now using GPU.
Now using GPU.
Train size: 145097, Percent of toal: 74.68%, Unique instances: 700
Train size: 49194, Percent of toal: 25.32%, Unique instances: 240
Train size: 145097, Percent of toal: 74.68%, Unique instances: 700
Train size: 49194, Percent of toal: 25.32%, Unique instances: 240
Train size: 145097, Percent of toal: 74.68%, Unique instances: 700
Train size: 49194, Percent of toal: 25.32%, Unique instances: 240


grad.sizes() = [256, 512, 1, 1], strides() = [512, 1, 512, 512]
bucket_view.sizes() = [256, 512, 1, 1], strides() = [512, 1, 1, 1] (Triggered internally at ../torch/csrc/distributed/c10d/reducer.cpp:327.)
  return Variable._execution_engine.run_backward(  # Calls into the C++ engine to run the backward pass
grad.sizes() = [256, 512, 1, 1], strides() = [512, 1, 512, 512]
bucket_view.sizes() = [256, 512, 1, 1], strides() = [512, 1, 1, 1] (Triggered internally at ../torch/csrc/distributed/c10d/reducer.cpp:327.)
  return Variable._execution_engine.run_backward(  # Calls into the C++ engine to run the backward pass
grad.sizes() = [256, 512, 1, 1], strides() = [512, 1, 512, 512]
bucket_view.sizes() = [256, 512, 1, 1], strides() = [512, 1, 1, 1] (Triggered internally at ../torch/csrc/distributed/c10d/reducer.cpp:327.)
  return Variable._execution_engine.run_backward(  # Calls into the C++ engine to run the backward pass


Epoch 31/100, Train Loss: 0.0019169766455888748
Epoch 32/100, Train Loss: 0.0019258473766967654
Epoch 33/100, Train Loss: 0.0018703469540923834
Epoch 34/100, Train Loss: 0.0018010828644037247
Epoch 35/100, Train Loss: 0.0018391864141449332
Epoch 36/100, Train Loss: 0.0017801886424422264
Epoch 37/100, Train Loss: 0.001698595704510808
Epoch 38/100, Train Loss: 0.0016911126440390944
Epoch 39/100, Train Loss: 0.001767714275047183
Epoch 40/100, Train Loss: 0.0016503711231052876
Epoch 41/100, Train Loss: 0.0016521355137228966
Epoch 42/100, Train Loss: 0.0016399594023823738
Epoch 43/100, Train Loss: 0.0015641842037439346
Epoch 44/100, Train Loss: 0.001596556045114994
Epoch 45/100, Train Loss: 0.001596342190168798
Epoch 46/100, Train Loss: 0.0015544978668913245
Epoch 47/100, Train Loss: 0.0015988072846084833
Epoch 48/100, Train Loss: 0.0014464225387200713
Epoch 49/100, Train Loss: 0.001478452468290925


wandb: ERROR Error while calling W&B API: context deadline exceeded (<Response [500]>)
wandb: ERROR Error while calling W&B API: context deadline exceeded (<Response [500]>)
wandb: Network error (ReadTimeout), entering retry loop.


Epoch 50/100, Train Loss: 0.0014201186131685972
Epoch 51/100, Train Loss: 0.0014467922737821937
Epoch 52/100, Train Loss: 0.0014246515929698944
Epoch 53/100, Train Loss: 0.0014211612287908792
Epoch 54/100, Train Loss: 0.0014366372488439083
Epoch 55/100, Train Loss: 0.00143334676977247
Epoch 56/100, Train Loss: 0.0013979028444737196
Epoch 57/100, Train Loss: 0.0013845607172697783
Epoch 58/100, Train Loss: 0.0014179841382429004
Epoch 59/100, Train Loss: 0.001378132845275104
Epoch 60/100, Train Loss: 0.0013421426992863417
Epoch 61/100, Train Loss: 0.001370538491755724
Epoch 62/100, Train Loss: 0.0013494747690856457
Epoch 63/100, Train Loss: 0.001302741002291441
Epoch 64/100, Train Loss: 0.0013053640723228455
Epoch 65/100, Train Loss: 0.001307563274167478
Epoch 93/100, Train Loss: 0.0011215182021260262
Epoch 94/100, Train Loss: 0.0011180544970557094
Epoch 95/100, Train Loss: 0.0011725391959771514
Epoch 96/100, Train Loss: 0.0011441658716648817
Epoch 97/100, Train Loss: 0.001116317114792764

In [6]:
finish_run()

VBox(children=(Label(value='0.655 MB of 0.655 MB uploaded (0.000 MB deduped)\r'), FloatProgress(value=1.0, max…

0,1
step,▁▁▁▁▂▂▂▂▂▃▃▃▃▃▃▄▄▄▄▄▅▅▅▅▅▅▆▆▆▆▆▇▇▇▇▇▇███
train_loss,█▂▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁

0,1
model_architecture,UNet2DModel(  (conv...
step,99
train_loss,0.00113
