In [2]:
import os
import sys
import glob
import torch
import torch.distributed as dist
import logging
import json

project_root = os.getcwd()  
if project_root not in sys.path:
    sys.path.append(project_root)
    
from classes.experiment_config import ExperimentConfig
from classes.experiment_runner import ExperimentRunner

from accelerate import notebook_launcher
from datasets import load_dataset

import logging
logging.getLogger("codecarbon").setLevel(logging.ERROR)

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Adjust paths
project_root = os.getcwd()  
if project_root not in sys.path:
    sys.path.append(project_root)
helper_functions_path = os.path.join(project_root, "helper_functions")
if helper_functions_path not in sys.path:
    sys.path.append(helper_functions_path)

# Import helper functions and classes (adjust these imports as needed)
from _1_distributed_setup import get_original_generate_method, get_accelerator, load_model_tokenizer
from _3_prompt_processing import filter_n_prompts, sort_prompts
from _4_setup_energy_tracking import start_energy_tracking, stop_energy_tracking
from _6_run_inference_by_task import run_gen_inference
from _7_get_experiment_info import get_experiment_setup, get_experimental_variables, get_model_architecture
from _8_get_inference_results import combine_inference_metrics
from _9_get_compute_info import combine_comp_metrics
from _10_get_energy_metrics import combine_energy_metrics
from _12_save_results import save_raw_results, save_final_results, get_persistent_unique_id

# A helper to aggregate energy metrics from files
def aggregate_results(results_dir):
    files = glob.glob(os.path.join(results_dir, "energy_metrics_rank_*.json"))
    aggregated = []
    for file in files:
        with open(file, "r") as f:
            aggregated.append(json.load(f))
    return aggregated

# -----------------------------------------------------------------------------
def get_shared_unique_id(accelerator):
    """
    Generate a unique ID on the main process and broadcast it to all workers.
    Uses torch.distributed.broadcast_object_list to ensure all processes
    get the same unique ID.
    """
    unique_id_list = [""]
    if accelerator.is_main_process:
        unique_id_list[0] = get_persistent_unique_id()  # Your function that returns a unique string.
    # Ensure the distributed group is initialized and all processes call this.
    if dist.is_available() and dist.is_initialized():
        dist.broadcast_object_list(unique_id_list, src=0)
    return unique_id_list[0]

def aggregate_results(results_dir):
    files = glob.glob(os.path.join(results_dir, "energy_metrics_rank_*.json"))
    aggregated = []
    for file in files:
        with open(file, "r") as f:
            aggregated.append(json.load(f))
    return aggregated

# -----------------------------------------------------------------------------
class ExperimentRunner: 
    def __init__(self, experiment_config, prompts, **inference_kwargs):
        self.config = experiment_config
        self.prompts = prompts
        self.inference_kwargs = inference_kwargs
        
    def run_torch(self):
        # Extract configuration parameters...
        model_name       = self.config.model_name
        fp_precision     = self.config.fp_precision
        inference_type   = self.config.inference_type 
        num_input_prompts= self.config.num_input_prompts
        max_input_tokens = self.config.max_input_tokens
        gpu_list         = self.config.gpu_list
        prompts          = self.prompts

        # Initialize Accelerator early.
        accelerator = get_accelerator(gpu_list)
        accelerator.print("Accelerator set up")

        # Generate and share unique ID.
        unique_id = get_shared_unique_id(accelerator)
        accelerator.print(f"Using unique experiment id: {unique_id}")

        # Load model and tokenizer on main process first.
        with accelerator.main_process_first():
            model_undistributed, tokenizer = load_model_tokenizer(
                model_name=model_name, 
                backend=None, 
                fp_precision=fp_precision
            )
        accelerator.print(f"{model_name} loaded with precision {fp_precision}")

        # Save original generate method.
        orig_generate_method = get_original_generate_method(model_undistributed)
        if orig_generate_method is None:
            logger.warning("Could not locate the original generate method.")

        # Prepare model/tokenizer for distributed use.
        model, tokenizer = accelerator.prepare(model_undistributed, tokenizer)
        accelerator.print("Model and tokenizer prepared")

        # Reassign generate method.
        if orig_generate_method:
            if hasattr(model, "module"):
                model.module.generate = orig_generate_method
            model.generate = orig_generate_method
            accelerator.print("Original generate method reassigned")
        logger.info(f"[Process {os.getpid()}] Model is on device: {accelerator.device}")

        # Dummy forward pass.
        dummy_input = tokenizer("Hello world", return_tensors="pt", truncation=True, max_length=max_input_tokens).input_ids.to(accelerator.device)
        with torch.no_grad():
            _ = model(dummy_input)
        logger.info(f"[Process {os.getpid()}] Dummy forward pass complete")
        accelerator.wait_for_everyone()

        # Process prompts.
        prompts_n_filtered = filter_n_prompts(prompts=prompts, num_input_prompts=num_input_prompts)
        prompts_sorted = sort_prompts(prompts_n_filtered)
        accelerator.print(f"Prompts processed: {len(prompts_sorted)} prompts.")

        # Start energy tracking.
        self.tracker = start_energy_tracking()
        accelerator.print("Energy tracking started")
        
        # Run inference.
        if inference_type == "pure_generative":
            accelerator.print(f"{inference_type} task type")
            self.raw_text_outputs, input_ids, raw_inference_results = run_gen_inference(
                model=model,
                experiment_config=self.config,
                prompts=prompts_sorted,
                tokenizer=tokenizer,
                accelerator=accelerator
            )
            logger.info(f"[Process {os.getpid()}] Inference complete")

        # Stop energy tracking.
        codecarbon_data = stop_energy_tracking(self.tracker)
        accelerator.print("Energy tracking stopped")
        
        # Compute experiment-wide meta info.
        self.experiment_setup     = get_experiment_setup(experiment_config=self.config, model=model, codecarbon_data=codecarbon_data)
        self.experiment_variables = get_experimental_variables(experiment_config=self.config, model=model, accelerator=accelerator)
        self.model_architecture   = get_model_architecture(model=model)

        # Save experiment-wide results (only main process).
        if accelerator.is_main_process:
            inference_metrics = combine_inference_metrics(raw_inference_results, accelerator)
            comp_metrics      = combine_comp_metrics(model=model, device=accelerator.device, tokenised_input_ids=input_ids, accelerator=accelerator)
            save_raw_results(unique_id, "inference_metrics", inference_metrics)
            save_raw_results(unique_id, "compute_metrics", comp_metrics)
            logger.info("Main process saved inference and computation metrics.")
        accelerator.print("Experiment-wide metrics saved")
        accelerator.wait_for_everyone()

        # Save per-process energy metrics.
        local_energy_results = combine_energy_metrics(codecarbon_data, accelerator)
        save_raw_results(unique_id, "local_energy_results", local_energy_results, pid=accelerator.local_process_index)
        accelerator.print(f"Process {accelerator.local_process_index} saved its energy metrics.")
        accelerator.wait_for_everyone()

        # (Do not aggregate here—perform aggregation as a separate post-processing step.)

        # Save final experiment results (only main process).
        if accelerator.is_main_process:
            experiment_title = f"EXPERIMENT_{unique_id}"
            experiment_results = {
                "setup": self.experiment_setup,
                "variables": self.experiment_variables,
                "model_architecture": self.model_architecture,
                "results": {
                    "inference_metrics": "inference_metrics.json",  # paths or filenames as needed
                    "compute_metrics": "compute_metrics.json",
                    "energy_metrics": "local_energy_results_*.json"
                }
            }
            benchmark_results = {experiment_title: experiment_results}
            output_json_path = save_final_results(task_type, benchmark_results)
            logger.info(f"Benchmark results saved to {output_json_path}")
        else:
            benchmark_results = None

        return benchmark_results

# -----------------------------------------------------------------------------
def run_experiment(experiment_config, prompts):
    runner = ExperimentRunner(experiment_config, prompts)
    return runner.run_torch()

# -----------------------------------------------------------------------------
if __name__ == "__main__":
    experiment_config = ExperimentConfig(
        model_name="TinyLlama/TinyLlama-1.1B-Chat-v1.0",
        is_encoder_decoder="decoder_only",
        task_type="text_generation",
        inference_type="pure_generative",  # must match your ExperimentRunner expectations
        max_input_tokens=512,
        max_output_tokens=50,
        num_input_prompts=5,
        gpu_list=[0, 1],
        num_processes=2,
        batching_options={
            "fixed_max_batch_size": 2, # this is max batch size if adaptive batching on; fixed batch size if it's off
            "adaptive_batching": False,
            "adaptive_max_tokens": 256  
        },
        sharding_config={
            "fsdp_config": {
                "use_orig_params": True,
                "cpu_offload": True
            },
            "sharding_strategy": "NO_SHARD"
        },
        query_rate=1,
        decoder_temperature=1,
        fp_precision="float16",
        backend="pytorch"
    )

    # Load prompts (here, using the 'arxiv' split from the lighteval/pile_helm dataset)
    ds = load_dataset("lighteval/pile_helm", "arxiv")["test"]
    prompts = [sample["text"] for sample in ds]

    # Define a function to run the experiment.
    def run_experiment():
        runner = ExperimentRunner(experiment_config, prompts)
        runner.run_torch()

    # Launch the experiment across the specified number of processes.
    notebook_launcher(run_experiment, num_processes=experiment_config.num_processes)


Launching training on 2 GPUs.
Accelerator set up
Using unique experiment id: 0071
TinyLlama/TinyLlama-1.1B-Chat-v1.0 loaded with precision float16


INFO:__main__:[Process 1409255] Model is on device: cuda:1


Model and tokenizer prepared
Original generate method reassigned


INFO:__main__:[Process 1409254] Model is on device: cuda:0
INFO:__main__:[Process 1409254] Dummy forward pass complete
INFO:__main__:[Process 1409255] Dummy forward pass complete


Prompts processed: 5 prompts.
Energy tracking started
pure_generative task type
Fixed batching (non-adaptive): created 3 batches.


INFO:__main__:[Process 1409254] Inference complete
INFO:__main__:[Process 1409255] Inference complete


Energy tracking stopped


[2025-03-22 11:45:48,248] torch.distributed.elastic.multiprocessing.api: [ERROR] failed (exitcode: 1) local_rank: 0 (pid: 1409254) of fn: run_experiment (start_method: fork)


ChildFailedError: 
============================================================
run_experiment FAILED
------------------------------------------------------------
Failures:
  <NO_OTHER_FAILURES>
------------------------------------------------------------
Root Cause (first observed failure):
[0]:
  time      : 2025-03-22_11:45:46
  host      : ds01
  rank      : 0 (local_rank: 0)
  exitcode  : 1 (pid: 1409254)
  error_file: /tmp/torchelastic_oquyznl1/none_uccg3vsv/attempt_0/0/error.json
  traceback : Traceback (most recent call last):
    File "/home/228755@hertie-school.lan/thesis/thesis/lib/python3.10/site-packages/torch/distributed/elastic/multiprocessing/errors/__init__.py", line 346, in wrapper
      return f(*args, **kwargs)
    File "/tmp/ipykernel_1409010/2860231139.py", line 246, in run_experiment
      runner.run_torch()
    File "/tmp/ipykernel_1409010/2860231139.py", line 168, in run_torch
      comp_metrics      = combine_comp_metrics(model=model, device=accelerator.device, tokenised_input_ids=input_ids, accelerator=accelerator)
    File "/home/228755@hertie-school.lan/thesis/helper_functions/_9_get_compute_info.py", line 115, in combine_comp_metrics
      utilisation = get_gpu_cpu_utilisation(pid, device)
  TypeError: get_gpu_cpu_utilisation() takes 1 positional argument but 2 were given
  
============================================================