In [1]:
from transformers import TextDataset, DataCollatorForLanguageModeling
from transformers import GPT2Tokenizer, GPT2LMHeadModel
from transformers import Trainer, TrainingArguments
from ray.tune.schedulers import PopulationBasedTraining
import ray

In [2]:
def load_dataset(file_path, tokenizer, block_size = 128):
    dataset = TextDataset(
        tokenizer = tokenizer,
        file_path = file_path,
        block_size = block_size,
    )
    return dataset

In [3]:
def load_data_collator(tokenizer, mlm = False):
    data_collator = DataCollatorForLanguageModeling(
        tokenizer=tokenizer,
        mlm=mlm,
    )
    return data_collator

In [8]:
def train(train_file_path, model_name,
          output_dir,
          overwrite_output_dir,
          per_device_train_batch_size,
          num_train_epochs,
          save_steps,
          stop_token="###",
          num_gpus=2): 

    tokenizer = GPT2Tokenizer.from_pretrained(model_name)
    tokenizer.add_special_tokens({'additional_special_tokens': [stop_token]})
    train_dataset = load_dataset(train_file_path, tokenizer)
    data_collator = load_data_collator(tokenizer)

    tokenizer.save_pretrained(output_dir)

    model = GPT2LMHeadModel.from_pretrained(model_name)

    model.save_pretrained(output_dir)

    training_args = TrainingArguments(
        output_dir=output_dir,
        overwrite_output_dir=overwrite_output_dir,
        per_device_train_batch_size=per_device_train_batch_size,
        num_train_epochs=num_train_epochs,
    )
    ray.init(ignore_reinit_error=True)
    def train_func(config):
        trainer = Trainer(
            model=model,
            args=TrainingArguments(
                output_dir=output_dir,
                overwrite_output_dir=overwrite_output_dir,
                per_device_train_batch_size=config["per_device_train_batch_size"],
                num_train_epochs=config["num_train_epochs"],
            ),
            data_collator=data_collator,
            train_dataset=train_dataset,
        )

        trainer.train()
        trainer.save_model()
    config_space = {
        "per_device_train_batch_size": ray.tune.choice([2, 4, 8]),
        "num_train_epochs": ray.tune.choice([1, 2, 3]),
    }
    pbt_scheduler = PopulationBasedTraining(
        time_attr="training_iteration",
        metric="eval_loss",  # Change it to your evaluation metric
        mode="min",
        perturbation_interval=2,
        hyperparam_mutations=config_space,
    )
    analysis = ray.tune.run(
        train_func,
        config=config_space,
        scheduler=pbt_scheduler,
        stop={"training_iteration": 10},  # Adjust stopping criteria
        num_samples=4,  # Number of trials
        resources_per_trial={"gpu": num_gpus},  # Specify the number of GPUs
    )
    ray.actor.exit_actor()
    print("Best config:", analysis.get_best_config(metric="eval_loss"))
    ray.shutdown()

In [None]:
  train(
        train_file_path="data/train_10.txt",
        model_name="gpt2",
        output_dir="distributed_model",
        overwrite_output_dir=True,
        per_device_train_batch_size=2,
        num_train_epochs=1,
        save_steps=100,
        stop_token="###",
        num_gpus=2 
    )

2024-01-23 11:11:25,246	INFO worker.py:1558 -- Calling ray.init() again after it has already been called.
2024-01-23 11:11:25,249	INFO tune.py:583 -- [output] This uses the legacy output and progress reporter, as Jupyter notebooks are not supported by the new engine, yet. For more information, please see https://github.com/ray-project/ray/issues/36949


0,1
Current time:,2024-01-23 11:20:25
Running for:,00:08:57.70
Memory:,68.1/2015.5 GiB

Trial name,status,loc,num_train_epochs,per_device_train_bat ch_size
train_func_2782d_00000,PENDING,,1,4
train_func_2782d_00001,PENDING,,3,4
train_func_2782d_00002,PENDING,,1,2
train_func_2782d_00003,PENDING,,3,2
