# Test multinode multigpu training with deepspeed and ray

Tokenized datasets are cached under `data/main_cache`. 

In [1]:
print("CIAO FROM CYPHER, BIATCH")

CIAO FROM CYPHER, BIATCH


In [None]:
import ray
ray.init(address="10.141.1.24:6379")

if ray.is_initialized():
    print("RAy already up and running")

2025-07-02 17:56:46,292	INFO worker.py:1694 -- Connecting to existing Ray cluster at address: 10.141.1.24:6379...
[2025-07-02 17:56:51,315 W 3415416 3415416] gcs_rpc_client.h:151: Failed to connect to GCS at address 10.141.1.24:6379 within 5 seconds.


In [2]:
ray.cluster_resources()

{'object_store_memory': 318480340991.0,
 'accelerator_type:A100': 2.0,
 'node:__internal_head__': 1.0,
 'memory': 743120795649.0,
 'GPU': 8.0,
 'CPU': 96.0,
 'node:10.141.1.24': 1.0,
 'node:10.141.1.44': 1.0}

In [13]:
# Ray-aware DeepSpeed ZeRO-3 strategy
import json
from pathlib import Path
config_path = Path("/davinci-1/home/abeatini/pycharmProjects/shallowMind/config/ds_config.json")

# Read the file’s contents and turn it into a Python dict
with config_path.open() as f:
    ds_config = json.load(f)          # <-- use json.load, not json.loads
strategy = RayDeepSpeedStrategy(ds_config)


[2025-06-26 18:21:42,475] [INFO] [real_accelerator.py:239:get_accelerator] Setting ds_accelerator to cuda (auto detect)


2025-06-26 18:21:42,789 - INFO - gcc -pthread -B /davinci-1/home/abeatini/.conda/envs/shallow/compiler_compat -Wno-unused-result -Wsign-compare -DNDEBUG -fwrapv -O2 -Wall -fPIC -O2 -isystem /davinci-1/home/abeatini/.conda/envs/shallow/include -fPIC -O2 -isystem /davinci-1/home/abeatini/.conda/envs/shallow/include -fPIC -c /tmp/tmp4pe4kps9/test.c -o /tmp/tmp4pe4kps9/test.o
2025-06-26 18:21:42,810 - INFO - gcc -pthread -B /davinci-1/home/abeatini/.conda/envs/shallow/compiler_compat /tmp/tmp4pe4kps9/test.o -laio -o /tmp/tmp4pe4kps9/a.out
/davinci-1/home/abeatini/.conda/envs/shallow/compiler_compat/ld: cannot find -laio: No such file or directory
collect2: error: ld returned 1 exit status
2025-06-26 18:21:43,418 - INFO - gcc -pthread -B /davinci-1/home/abeatini/.conda/envs/shallow/compiler_compat -Wno-unused-result -Wsign-compare -DNDEBUG -fwrapv -O2 -Wall -fPIC -O2 -isystem /davinci-1/home/abeatini/.conda/envs/shallow/include -fPIC -O2 -isystem /davinci-1/home/abeatini/.conda/envs/shallow

In [14]:
ds_cfg = {
    "train_micro_batch_size_per_gpu": 4,
    "gradient_clipping": 1.0,
    "fp16": {"enabled": True},
    "zero_optimization": {
        "stage": 3,
        "offload_optimizer": {"device": "cpu", "pin_memory": True},
        "offload_param":    {"device": "cpu", "pin_memory": True},
        "overlap_comm": True,
        "contiguous_gradients": True,
        "reduce_bucket_size":            500_000_000,
        "stage3_prefetch_bucket_size":   500_000_000,
        "stage3_param_persistence_threshold": 1_000_000
    }
}


# Ray-aware DeepSpeed ZeRO-3 strategy
strategy = RayDeepSpeedStrategy(stage=3, config=ds_cfg)

In [None]:
# train_llm_ray_lightning.py
import torch
import pytorch_lightning as pl                  # new import style in Lightning ≥2.0
from transformers import AutoModelForCausalLM, AutoTokenizer
from src.data.data_manager import LightningDataModule

# --- Ray Train imports -------------------------------------------------------
from lightning.pytorch.loggers import TensorBoardLogger, WandbLogger
from ray.train import ScalingConfig, RunConfig, CheckpointConfig
from ray.train.torch import TorchTrainer        # replaces RayLightningTrainer
from ray.train.lightning import (               # V2 Lightning helpers
    RayDeepSpeedStrategy,                       # Ray-compatible ZeRO-3
    RayLightningEnvironment,                    # cluster-aware env plugin
    RayTrainReportCallback,                     # metrics/ckpt reporter
    prepare_trainer,                            # patches the PL trainer
)

import os
import json
import ray

# -----------------------------------------------------------------------------


class LLMModel(pl.LightningModule):
    def __init__(self, model_name, learning_rate=1e-5):
        super().__init__()
        self.save_hyperparameters()
        #default is crossentropy
        self.model = AutoModelForCausalLM.from_pretrained(model_name)

    def forward(self, input_ids, attention_mask, labels=None, **kwargs):
        out = self.model(
            input_ids=input_ids,
            attention_mask=attention_mask,
            labels=input_ids,
        )
        return out.loss

    def training_step(self, batch,  _):
        loss = self(**batch)
        self.log("loss", loss, prog_bar=True, on_step=True, on_epoch=True)
        return loss

    def configure_optimizers(self):
        # with Deepspeed You are using ZeRO-Offload with a client provided
        #  optimizer (<class 'torch.optim.adamw.AdamW'>) which in most cases will yield poor performance.
        #  Please either use deepspeed.ops.adam.DeepSpeedCPUAdam or set an optimizer in your ds-config 
        # return torch.optim.AdamW(self.parameters(), lr=self.hparams.learning_rate)
        return None

# -----------------------------------------------------------------------------


def train_loop_per_worker(config):
    """Executed once per Ray worker."""
    model_name = config.get("model_name", "gpt2")
    tokenizer = AutoTokenizer.from_pretrained(model_name)

    datamodule = LightningDataModule(
        tokenizer=tokenizer,
        dataset_configs={"wikitext": {}},
        batch_size=2,
        max_length=512,
    )

    model = LLMModel(model_name=model_name)

    ds_cfg_gpu = {
        # ---- batching ----------------------------------------------------------
        "train_micro_batch_size_per_gpu": 4,      # fits comfortably on an A100
         #"gradient_accumulation_steps": 2,         # 4 × 2 × 8 GPUs  = 64/global step
        "gradient_clipping": 1.0,

        # ---- numerics ----------------------------------------------------------
        "fp16": { "enabled": True },              # or switch to "bf16":{…}

        # ---- ZeRO ----------------------------------------------------------------
        "zero_optimization": {
            "stage": 3,
            "overlap_comm": True,
            "contiguous_gradients": True,
            "reduce_bucket_size":            500_000_000,
            "stage3_prefetch_bucket_size":   500_000_000,
            "stage3_param_persistence_threshold": 1_000_000
        },

        # ---- optimizer created by DeepSpeed (GPU AdamW fused) -------------------
        "optimizer": {
            "type": "AdamW",
            "params": {
                "lr": 1e-5,
                "betas": [0.9, 0.999],
                "eps": 1e-8,
                "weight_decay": 0.01
            }
        },

        # ---- optional LR scheduler ---------------------------------------------
        "scheduler": {
            "type": "WarmupDecayLR",
            "params": {
                "total_num_steps": 10_000,
                "warmup_num_steps": 1_000,
                "warmup_min_lr": 0
            }
        }
    }


    # Ray-aware DeepSpeed ZeRO-3 strategy
    strategy = RayDeepSpeedStrategy(stage=3, config=ds_cfg_gpu)
    
    tb_logger = TensorBoardLogger(save_dir="tb_logs", name="run1")

    # Standard Lightning trainer plus Ray plugins/callback
    pl_trainer = pl.Trainer(
        strategy=strategy,
        accelerator="auto",           # Ray sets CUDA_VISIBLE_DEVICES per worker
        devices="auto",
        precision="16-mixed",
        max_epochs=3,
        plugins=[RayLightningEnvironment()],
        callbacks=[RayTrainReportCallback()],
        log_every_n_steps=5,
        logger= tb_logger
    )

    # Patch the trainer for Ray & validate the config
    pl_trainer = prepare_trainer(pl_trainer)

    # Launch training
    pl_trainer.fit(model, datamodule=datamodule)


# -----------------------------------------------------------------------------
def init_ray():
    if ray.is_initialized():
        print("RAy already up and running")
        return
    else:
        ray.init(address="10.141.1.24:6379")
        if ray.is_initialized():
            print("RAy already up and running")
        else: 
            raise Exception


if __name__ == "__main__":

    init_ray()
    scaling = ScalingConfig(
        num_workers=8,               # 8 GPUs total
        use_gpu=True,
        resources_per_worker={"CPU": 2, "GPU": 1},
        placement_strategy="SPREAD"
    )

    trainer = TorchTrainer(          # <- NEW
        train_loop_per_worker=train_loop_per_worker,
        scaling_config=scaling,
        run_config=RunConfig(
            checkpoint_config=CheckpointConfig(
                num_to_keep=3,
                checkpoint_score_attribute="val_loss",
                checkpoint_score_order="min",
            )
        ),
    )

    result = trainer.fit()
    print(result)


2025-06-26 19:01:27,363	INFO tune.py:616 -- [output] This uses the legacy output and progress reporter, as Jupyter notebooks are not supported by the new engine, yet. For more information, please see https://github.com/ray-project/ray/issues/36949


RAy already up and running
== Status ==
Current time: 2025-06-26 19:01:27 (running for 00:00:00.11)
Using FIFO scheduling algorithm.
Logical resource usage: 17.0/96 CPUs, 8.0/8 GPUs (0.0/2.0 accelerator_type:A100)
Result logdir: /var/tmp/pbs.1874768.davinci-mgt01/ray/session_2025-06-26_17-23-15_385852_3806738/artifacts/2025-06-26_19-01-27/TorchTrainer_2025-06-26_19-01-27/driver_artifacts
Number of trials: 1/1 (1 PENDING)


== Status ==
Current time: 2025-06-26 19:01:32 (running for 00:00:05.12)
Using FIFO scheduling algorithm.
Logical resource usage: 17.0/96 CPUs, 8.0/8 GPUs (0.0/2.0 accelerator_type:A100)
Result logdir: /var/tmp/pbs.1874768.davinci-mgt01/ray/session_2025-06-26_17-23-15_385852_3806738/artifacts/2025-06-26_19-01-27/TorchTrainer_2025-06-26_19-01-27/driver_artifacts
Number of trials: 1/1 (1 PENDING)


== Status ==
Current time: 2025-06-26 19:01:37 (running for 00:00:10.14)
Using FIFO scheduling algorithm.
Logical resource usage: 17.0/96 CPUs, 8.0/8 GPUs (0.0/2.0 accelerat

2025-06-26 19:04:48,292	ERROR checkpoint_manager.py:144 -- Result dict has no key: val_loss. checkpoint_score_attr must be set to a key in the result dict. Valid keys are: ['train_loss', 'epoch', 'step', 'timestamp', 'checkpoint_dir_name', 'should_checkpoint', 'done', 'training_iteration', 'trial_id', 'date', 'time_this_iter_s', 'time_total_s', 'pid', 'hostname', 'node_ip', 'time_since_restore', 'iterations_since_restore']
2025-06-26 19:04:48,292	ERROR checkpoint_manager.py:144 -- Result dict has no key: val_loss. checkpoint_score_attr must be set to a key in the result dict. Valid keys are: ['train_loss', 'epoch', 'step', 'timestamp', 'checkpoint_dir_name', 'should_checkpoint', 'done', 'training_iteration', 'trial_id', 'date', 'time_this_iter_s', 'time_total_s', 'pid', 'hostname', 'node_ip', 'time_since_restore', 'iterations_since_restore']


== Status ==
Current time: 2025-06-26 19:04:48 (running for 00:03:20.81)
Using FIFO scheduling algorithm.
Logical resource usage: 17.0/96 CPUs, 8.0/8 GPUs (0.0/2.0 accelerator_type:A100)
Result logdir: /var/tmp/pbs.1874768.davinci-mgt01/ray/session_2025-06-26_17-23-15_385852_3806738/artifacts/2025-06-26_19-01-27/TorchTrainer_2025-06-26_19-01-27/driver_artifacts
Number of trials: 1/1 (1 RUNNING)


== Status ==
Current time: 2025-06-26 19:04:53 (running for 00:03:25.84)
Using FIFO scheduling algorithm.
Logical resource usage: 17.0/96 CPUs, 8.0/8 GPUs (0.0/2.0 accelerator_type:A100)
Result logdir: /var/tmp/pbs.1874768.davinci-mgt01/ray/session_2025-06-26_17-23-15_385852_3806738/artifacts/2025-06-26_19-01-27/TorchTrainer_2025-06-26_19-01-27/driver_artifacts
Number of trials: 1/1 (1 RUNNING)


== Status ==
Current time: 2025-06-26 19:04:58 (running for 00:03:30.85)
Using FIFO scheduling algorithm.
Logical resource usage: 17.0/96 CPUs, 8.0/8 GPUs (0.0/2.0 accelerator_type:A100)
Result logdir

2025-06-26 19:05:58,478	ERROR checkpoint_manager.py:144 -- Result dict has no key: val_loss. checkpoint_score_attr must be set to a key in the result dict. Valid keys are: ['train_loss', 'epoch', 'step', 'timestamp', 'checkpoint_dir_name', 'should_checkpoint', 'done', 'training_iteration', 'trial_id', 'date', 'time_this_iter_s', 'time_total_s', 'pid', 'hostname', 'node_ip', 'time_since_restore', 'iterations_since_restore']
2025-06-26 19:05:58,479	ERROR checkpoint_manager.py:144 -- Result dict has no key: val_loss. checkpoint_score_attr must be set to a key in the result dict. Valid keys are: ['train_loss', 'epoch', 'step', 'timestamp', 'checkpoint_dir_name', 'should_checkpoint', 'done', 'training_iteration', 'trial_id', 'date', 'time_this_iter_s', 'time_total_s', 'pid', 'hostname', 'node_ip', 'time_since_restore', 'iterations_since_restore']
2025-06-26 19:05:58,480	ERROR checkpoint_manager.py:144 -- Result dict has no key: val_loss. checkpoint_score_attr must be set to a key in the res

== Status ==
Current time: 2025-06-26 19:05:58 (running for 00:04:31.02)
Using FIFO scheduling algorithm.
Logical resource usage: 17.0/96 CPUs, 8.0/8 GPUs (0.0/2.0 accelerator_type:A100)
Result logdir: /var/tmp/pbs.1874768.davinci-mgt01/ray/session_2025-06-26_17-23-15_385852_3806738/artifacts/2025-06-26_19-01-27/TorchTrainer_2025-06-26_19-01-27/driver_artifacts
Number of trials: 1/1 (1 RUNNING)




2025-06-26 19:06:01,186	INFO tune.py:1009 -- Wrote the latest version of all result files and experiment state to '/davinci-1/home/abeatini/ray_results/TorchTrainer_2025-06-26_19-01-27' in 0.0045s.
2025-06-26 19:06:01,189	INFO tune.py:1041 -- Total run time: 273.83 seconds (273.81 seconds for the tuning loop).


== Status ==
Current time: 2025-06-26 19:06:01 (running for 00:04:33.82)
Using FIFO scheduling algorithm.
Logical resource usage: 17.0/96 CPUs, 8.0/8 GPUs (0.0/2.0 accelerator_type:A100)
Result logdir: /var/tmp/pbs.1874768.davinci-mgt01/ray/session_2025-06-26_17-23-15_385852_3806738/artifacts/2025-06-26_19-01-27/TorchTrainer_2025-06-26_19-01-27/driver_artifacts
Number of trials: 1/1 (1 TERMINATED)


Result(
  metrics={'train_loss': 2.669545888900757, 'epoch': 2, 'step': 888},
  path='/davinci-1/home/abeatini/ray_results/TorchTrainer_2025-06-26_19-01-27/TorchTrainer_32cee_00000_0_2025-06-26_19-01-27',
  filesystem='local',
  checkpoint=Checkpoint(filesystem=local, path=/davinci-1/home/abeatini/ray_results/TorchTrainer_2025-06-26_19-01-27/TorchTrainer_32cee_00000_0_2025-06-26_19-01-27/checkpoint_000002)
)
