In [1]:
import os

os.environ["RANK"] = "0"
os.environ["WORLD_SIZE"] = "1"
os.environ["MASTER_ADDR"] = '127.0.0.1'
os.environ["MASTER_PORT"] = '29500'

In [2]:
config_file = "/mloscratch/homes/solergib/s-ai/nanotron/examples/debug_llama_1GPU_dummy.yaml"

In [3]:
from nanotron import logging
from nanotron.config import (
    PretrainDatasetsArgs,
)
from nanotron.dataloader import (
    clm_process,
    dummy_infinite_data_generator,
    get_datasets,
    get_train_dataloader,
)
from nanotron.logging import log_rank
from nanotron.parallel.pipeline_parallel.utils import get_input_output_pp_ranks
from nanotron.trainer import DistributedTrainer
from nanotron.utils import (
    main_rank_first,
)

try:
    from huggingface_hub import __version__ as hf_hub_version
    from transformers import AutoTokenizer
    from transformers import __version__ as tf_version
except ImportError:
    hf_hub_version = None
    tf_version = None

logger = logging.get_logger(__name__)

  from .autonotebook import tqdm as notebook_tqdm


In [4]:
trainer = DistributedTrainer(config_file)

03/09/2024 21:35:59 [INFO|DP=0|PP=0|TP=0]: Setting max_position_embeddings to 4096. Previous value was 2048.
03/09/2024 21:35:59 [INFO|DP=0|PP=0|TP=0]: Config:
03/09/2024 21:35:59 [INFO|DP=0|PP=0|TP=0]: Config(general=GeneralArgs(project='debug',
03/09/2024 21:35:59 [INFO|DP=0|PP=0|TP=0]:                            entity=None,
03/09/2024 21:35:59 [INFO|DP=0|PP=0|TP=0]:                            run='llama',
03/09/2024 21:35:59 [INFO|DP=0|PP=0|TP=0]:                            seed=42,
03/09/2024 21:35:59 [INFO|DP=0|PP=0|TP=0]:                            step=None,
03/09/2024 21:35:59 [INFO|DP=0|PP=0|TP=0]:                            consumed_train_samples=None,
03/09/2024 21:35:59 [INFO|DP=0|PP=0|TP=0]:                            benchmark_csv_path=None,
03/09/2024 21:35:59 [INFO|DP=0|PP=0|TP=0]:                            ignore_sanity_checks=True),
03/09/2024 21:35:59 [INFO|DP=0|PP=0|TP=0]:        parallelism=ParallelismArgs(dp=1,
03/09/2024 21:35:59 [INFO|DP=0|PP=0|TP=0]:         

In [5]:
# OG Dataloader
def get_dataloader(trainer: DistributedTrainer):
    """Returns a dataloader for training."""

    # First, we need to know which ranks to feed the dataloader to
    input_pp_rank, output_pp_rank = get_input_output_pp_ranks(model=trainer.model)

    # Case 1: Dummy data generator
    if trainer.config.data.dataset is None:
        log_rank("Using dummy data generator", logger=logger, level=logging.INFO, rank=0)
        dataloader = dummy_infinite_data_generator(
            micro_batch_size=trainer.micro_batch_size,
            sequence_length=trainer.sequence_length,
            input_pp_rank=input_pp_rank,
            output_pp_rank=output_pp_rank,
            vocab_size=trainer.model_config.vocab_size,
            seed=trainer.config.data.seed,
            parallel_context=trainer.parallel_context,
        )()

    # Case 2: HuggingFace datasets
    elif isinstance(trainer.config.data.dataset, PretrainDatasetsArgs):
        log_rank("Using `datasets` library", logger=logger, level=logging.INFO, rank=0)
        tokenizer_path = trainer.config.tokenizer.tokenizer_name_or_path
        log_rank(
            f"Loading tokenizer from {tokenizer_path} and transformers/hf_hub versions {tf_version, hf_hub_version}",
            logger=logger,
            level=logging.INFO,
            rank=0,
        )

        # We need to the 1st device to process dataset and cache it, then other devices load from cache
        with main_rank_first(trainer.parallel_context.world_pg):
            # TODO @nouamanetazi: this may timeout before 1st device finishes processing dataset. Can we have a ctxmanager to modify timeout?
            # TODO: generalise to include  for validation/test splits

            # We load the raw dataset
            raw_dataset = get_datasets(
                hf_dataset_or_datasets=trainer.config.data.dataset.hf_dataset_or_datasets,
                splits=trainer.config.data.dataset.hf_dataset_splits,
            )["train"]

            tokenizer = AutoTokenizer.from_pretrained(tokenizer_path)
            tokenizer.pad_token = tokenizer.eos_token
            tokenizer.padding_side = "left"

            # We apply the Causal Language Modeling preprocessing
            train_dataset = clm_process(
                raw_dataset=raw_dataset,
                tokenizer=tokenizer,
                text_column_name=trainer.config.data.dataset.text_column_name,
                dataset_processing_num_proc_per_process=trainer.config.data.dataset.dataset_processing_num_proc_per_process,
                dataset_overwrite_cache=trainer.config.data.dataset.dataset_overwrite_cache,
                sequence_length=trainer.sequence_length,
            )

            # We load the processed dataset on the ranks requiring it
            dataloader = get_train_dataloader(
                train_dataset=train_dataset,
                sequence_length=trainer.sequence_length,
                parallel_context=trainer.parallel_context,
                input_pp_rank=input_pp_rank,
                output_pp_rank=output_pp_rank,
                micro_batch_size=trainer.micro_batch_size,
                consumed_train_samples=trainer.consumed_train_samples,
                dataloader_num_workers=trainer.config.data.num_loading_workers,
                seed_worker=trainer.config.data.seed,
                dataloader_drop_last=True,
            )
            # Check if we have enough samples for train_steps
            total_tokens_dataset = len(dataloader.dataset) * trainer.sequence_length
            num_tokens_needed_for_training = (
                (trainer.config.tokens.train_steps - trainer.start_iteration_step)
                * trainer.global_batch_size
                * trainer.sequence_length
            )
            assert num_tokens_needed_for_training <= total_tokens_dataset, (
                f"Dataset is too small for steps ({total_tokens_dataset} < {num_tokens_needed_for_training}), "
                f"Try train_steps<={len(dataloader.dataset) // trainer.global_batch_size + trainer.start_iteration_step}"
            )
    else:
        raise ValueError(f"Unhandled case of `self.config.data.dataset`. Got: {trainer.config.data.dataset}")

    return dataloader

dataloader = get_dataloader(trainer)

03/09/2024 21:36:00 [INFO|DP=0|PP=0|TP=0]: Using `datasets` library
03/09/2024 21:36:00 [INFO|DP=0|PP=0|TP=0]: Loading tokenizer from /mloscratch/homes/solergib/models/Llama-2-7b-chat-hf and transformers/hf_hub versions ('4.37.2', '0.20.3')


In [6]:
type(dataloader)

torch.utils.data.dataloader.DataLoader

In [17]:
# En training_step hace 
n_micro_batches_per_batch = 2
batch=(next(dataloader) for _ in range(n_micro_batches_per_batch))

generator

In [18]:
# Luego en OneForwardOneBackwardPipelineEngine.train_batch_iter hace
batch_iter = iter(batch)

In [19]:
batch_iter_next = next(batch_iter)

TypeError: 'DataLoader' object is not an iterator

In [20]:
for micro_batch in batch_iter:
    print('222')
    print(micro_batch)

In [7]:
for data in dataloader: # Imprime con microbatchsize 4 el resultado que queremos ver dentro del engine. NO conseguimos replicar lo del iter y los next que hace, ni idea, pero pasamos a megatron
    print(data)
    break

{'input_ids': tensor([[  297,  3122, 29889,  ...,  7432,   338,   451],
        [ 7047, 29892,   470,  ..., 27477,   839,   263],
        [  813,  3802,   394,  ..., 29879, 29892,   338],
        [13630,   293, 27881,  ...,   393,   723, 29548]]), 'input_mask': tensor([[True, True, True,  ..., True, True, True],
        [True, True, True,  ..., True, True, True],
        [True, True, True,  ..., True, True, True],
        [True, True, True,  ..., True, True, True]]), 'label_ids': tensor([[ 3122, 29889,    13,  ...,   338,   451,  3307],
        [29892,   470,   491,  ...,   839,   263,   365],
        [ 3802,   394,  3687,  ..., 29892,   338,   263],
        [  293, 27881,   310,  ...,   723, 29548,   470]]), 'label_mask': tensor([[True, True, True,  ..., True, True, True],
        [True, True, True,  ..., True, True, True],
        [True, True, True,  ..., True, True, True],
        [True, True, True,  ..., True, True, True]])}


In [14]:
dataloader.dataset

Dataset({
    features: ['input_ids'],
    num_rows: 3097
})

# Megatron

In [8]:
from nanotron.trainer import DistributedTrainer
trainer = DistributedTrainer(config_file)

03/09/2024 20:01:46 [INFO|DP=0|PP=0|TP=0]: Setting max_position_embeddings to 4096. Previous value was 2048.
03/09/2024 20:01:46 [INFO|DP=0|PP=0|TP=0]: Config:
03/09/2024 20:01:46 [INFO|DP=0|PP=0|TP=0]: Config(general=GeneralArgs(project='debug',
03/09/2024 20:01:46 [INFO|DP=0|PP=0|TP=0]:                            entity=None,
03/09/2024 20:01:46 [INFO|DP=0|PP=0|TP=0]:                            run='llama',
03/09/2024 20:01:46 [INFO|DP=0|PP=0|TP=0]:                            seed=42,
03/09/2024 20:01:46 [INFO|DP=0|PP=0|TP=0]:                            step=None,
03/09/2024 20:01:46 [INFO|DP=0|PP=0|TP=0]:                            consumed_train_samples=None,
03/09/2024 20:01:46 [INFO|DP=0|PP=0|TP=0]:                            benchmark_csv_path=None,
03/09/2024 20:01:46 [INFO|DP=0|PP=0|TP=0]:                            ignore_sanity_checks=True),
03/09/2024 20:01:46 [INFO|DP=0|PP=0|TP=0]:        parallelism=ParallelismArgs(dp=1,
03/09/2024 20:01:46 [INFO|DP=0|PP=0|TP=0]:         

In [7]:
trainer.config.data.dataset.hf_dataset_or_datasets # Tendriamos que editar esto

'stas/openwebtext-10k'

In [2]:
data_path = "/mloscratch/homes/solergib/s-ai/nanotron/datasets/europarl-gpt_text_document"

In [None]:
import dataclasses
@dataclasses.dataclass
class DataCollatorForCLM:
    """
    Data collator used for causal language modeling.

    - input_pp_rank: Discards last input id token
    - output_pp_rank: Discards first label id token
    - other pp ranks: Don't have data. Instead, we use `TensorPointer` to point to the rank having the data.
    """

    sequence_length: int
    input_pp_rank: int
    output_pp_rank: int
    parallel_context: ParallelContext

    def __call__(self, examples: List[Dict[str, List[np.ndarray]]]) -> Dict[str, Union[torch.Tensor, TensorPointer]]:
        # Process the case when current rank doesn't require data. We return `TensorPointer` that points to ranks having the data.
        current_pp_rank = dist.get_rank(self.parallel_context.pp_pg)
        if current_pp_rank not in [
            self.input_pp_rank,
            self.output_pp_rank,
        ]:
            assert all(len(example) == 0 for example in examples)
            return {
                "input_ids": TensorPointer(group_rank=self.input_pp_rank),
                "input_mask": TensorPointer(group_rank=self.input_pp_rank),
                "label_ids": TensorPointer(group_rank=self.output_pp_rank),
                "label_mask": TensorPointer(group_rank=self.output_pp_rank),
            }

        # Make sure we load only what's necessary, ie we only load a `input_ids` column.
        assert all(list(example.keys()) == ["input_ids"] for example in examples)

        # TODO @nouamanetazi: Is it better to have examples as np.array or torch.Tensor?
        input_ids = np.vstack([examples[i]["input_ids"] for i in range(len(examples))])  # (b, s)
        batch_size, expanded_input_length = input_ids.shape

        result: Dict[str, Union[np.ndarray, TensorPointer]] = {}

        result["input_ids"] = TensorPointer(group_rank=self.input_pp_rank)
        result["input_mask"] = TensorPointer(group_rank=self.input_pp_rank)
        result["label_ids"] = TensorPointer(group_rank=self.output_pp_rank)
        result["label_mask"] = TensorPointer(group_rank=self.output_pp_rank)

        assert (
            expanded_input_length == self.sequence_length + 1
        ), f"Samples should be of length {self.sequence_length + 1} (seq_len+1), but got {expanded_input_length}"

        # Process inputs: last token is the label
        if current_pp_rank == self.input_pp_rank:
            result["input_ids"] = input_ids[:, :-1]
            result["input_mask"] = np.ones((batch_size, self.sequence_length), dtype=np.bool_)

        # Process labels: shift them to the left
        if current_pp_rank == self.output_pp_rank:
            result["label_ids"] = input_ids[:, 1:]
            result["label_mask"] = np.ones((batch_size, self.sequence_length), dtype=np.bool_)

        if isinstance(result["input_ids"], torch.Tensor) and result["input_ids"].shape[-1] != self.sequence_length:
            raise ValueError(
                f"`labels` are incorrectly preprocessed. `labels` length is {result['input_ids'].shape[-1]}, but should be"
                f" {self.sequence_length}."
            )
        if isinstance(result["label_ids"], torch.Tensor) and result["label_ids"].shape[-1] != self.sequence_length:
            raise ValueError(
                f"`labels` are incorrectly preprocessed. `labels` length is {result['label_ids'].shape[-1]}, but should be"
                f" {self.sequence_length}."
            )

        # Cast np.array to torch.Tensor
        result = {k: v if isinstance(v, TensorPointer) else torch.from_numpy(v) for k, v in result.items()}
        return result

In [None]:
from nanotron.parallel import ParallelContext
from torch.utils.data import BatchSampler, DataLoader
from nanotron import distributed as dist
from nanotron.dataloader import EmptyInfiniteDataset

def get_megatron_train_dataloader(
    train_dataset,
    sequence_length: int,
    parallel_context: ParallelContext,
    input_pp_rank: int,
    output_pp_rank: int,
    micro_batch_size: int,
    consumed_train_samples: int,
    dataloader_num_workers: int,
    seed_worker: int,
    dataloader_drop_last: bool = True,
    dataloader_pin_memory: bool = True,
    use_loop_to_round_batch_size: bool = False,
) -> DataLoader:
    #if not isinstance(train_dataset, datasets.Dataset): TODO: Poner bien esto
    #    raise ValueError(f"training requires a datasets.Dataset, but got {type(train_dataset)}")

    # Case of ranks requiring data
    if dist.get_rank(parallel_context.pp_pg) in [
        input_pp_rank,
        output_pp_rank,
    ]:
        train_dataset = train_dataset.with_format(type="numpy", columns=["input_ids"], output_all_columns=True)

    # Case of ranks not requiring data. We give them an infinite dummy dataloader # TODO: No es un dataloader, es un dataset, que luego con el collator se transforma. No engañeis a mi padre
    else:
        #
        # TODO: Solamente se tiene que coger la length del dataset y hacer el empty 
        assert train_dataset.column_names == ["input_ids"], (
            f"Dataset has to have a single column, with `input_ids` as the column name. "
            f"Current dataset: {train_dataset}"
        )
        dataset_length = len(train_dataset)
        train_dataset = train_dataset.remove_columns(column_names="input_ids")
        assert (
            len(train_dataset) == 0
        ), f"Dataset has to be empty after removing the `input_ids` column. Current dataset: {train_dataset}"
        # HACK as if we remove the last column of a train_dataset, it becomes empty and it's number of rows becomes empty.
        # TODO: Este hack solo quiere decir que se elimina el dataset
        train_dataset = EmptyInfiniteDataset(length=dataset_length)
        # No need to spawn a lot of workers, we can just use main
        dataloader_num_workers = 0

    data_collator = DataCollatorForCLM(
        sequence_length=sequence_length,
        input_pp_rank=input_pp_rank,
        output_pp_rank=output_pp_rank,
        parallel_context=parallel_context,
    )

    # Compute size and rank of dataloader workers
    dp_ranks_size = parallel_context.dp_pg.size()
    dp_rank = parallel_context.dp_pg.rank()

    # TODO @nouamanetazi: Remove unused columns: https://github.com/huggingface/transformers/blob/47e1676255e5dd86b9541f734cd4f4bdcbb50f4a/src/transformers/trainer.py#L852
    # TODO @nouamanetazi: Support torch.utils.data.IterableDataset: https://github.com/huggingface/transformers/blob/47e1676255e5dd86b9541f734cd4f4bdcbb50f4a/src/transformers/trainer.py#L855-L872

    train_sampler = _get_train_sampler(
        train_dataset=train_dataset,
        dl_ranks_size=dp_ranks_size,
        dl_rank=dp_rank,
        seed=seed_worker,
        use_loop_to_round_batch_size=use_loop_to_round_batch_size,
        micro_batch_size=micro_batch_size,
        drop_last=dataloader_drop_last,
        consumed_train_samples=consumed_train_samples,
    )

    return DataLoader(
        train_dataset,
        batch_size=micro_batch_size,
        sampler=train_sampler,
        collate_fn=data_collator,
        drop_last=dataloader_drop_last,  # we also drop_last in `clm_process()`
        num_workers=dataloader_num_workers,
        pin_memory=dataloader_pin_memory,
        worker_init_fn=get_dataloader_worker_init(dp_rank=dp_rank),
        # TODO @thomasw21: I'm not sure but this doesn't seem to work at all.
        # pin_memory_device="cuda",
    )

In [None]:
# OG Dataloader
def get_dataloader(trainer: DistributedTrainer, meg_data_path):
    """Returns a dataloader for training."""

    # First, we need to know which ranks to feed the dataloader to
    input_pp_rank, output_pp_rank = get_input_output_pp_ranks(model=trainer.model)

    # Case 3: Megatron datasets
    if isinstance(trainer.config.data.dataset, PretrainDatasetsArgs):
        log_rank("Using `megatron!!!`", logger=logger, level=logging.INFO, rank=0)

        

        # We load the processed dataset on the ranks requiring it
        dataloader = get_train_dataloader(
            train_dataset=train_dataset,
            sequence_length=trainer.sequence_length,
            parallel_context=trainer.parallel_context,
            input_pp_rank=input_pp_rank,
            output_pp_rank=output_pp_rank,
            micro_batch_size=trainer.micro_batch_size,
            consumed_train_samples=trainer.consumed_train_samples,
            dataloader_num_workers=trainer.config.data.num_loading_workers,
            seed_worker=trainer.config.data.seed,
            dataloader_drop_last=True,
        )
        # Check if we have enough samples for train_steps
        total_tokens_dataset = len(dataloader.dataset) * trainer.sequence_length
        num_tokens_needed_for_training = (
            (trainer.config.tokens.train_steps - trainer.start_iteration_step)
            * trainer.global_batch_size
            * trainer.sequence_length
        )
        assert num_tokens_needed_for_training <= total_tokens_dataset, (
            f"Dataset is too small for steps ({total_tokens_dataset} < {num_tokens_needed_for_training}), "
            f"Try train_steps<={len(dataloader.dataset) // trainer.global_batch_size + trainer.start_iteration_step}"
        )
    else:
        raise ValueError(f"Unhandled case of `self.config.data.dataset`. Got: {trainer.config.data.dataset}")

    return dataloader

dataloader = get_dataloader(trainer, data_path)