In [None]:
import numpy as np
import pandas as pd
import os

In [None]:
model_name = "EleutherAI/gpt-j-6B"
use_gpu = True
num_workers = 16
cpus_per_worker = 8

In [None]:
import ray

ray.init(runtime_env={"env_vars": {"NCCL_SOCKET_IFNAME": "ens5"}})

In [None]:
from datasets import load_dataset

print("Loading tiny_shakespeare dataset")
current_dataset = load_dataset("tiny_shakespeare")
current_dataset

In [None]:
import ray.data

ray_datasets = ray.data.from_huggingface(current_dataset)
ray_datasets

In [None]:
from ray.data.preprocessors import BatchMapper

block_size = 1024

def split_column_with_one_string(df: pd.DataFrame) -> pd.DataFrame:
    data = df["text"].iloc[0]
    df = pd.DataFrame()
    df["text"] = [
        data[i : i + block_size].strip() for i in range(0, len(data), block_size)
    ]
    return df

string_splitter = BatchMapper(split_column_with_one_string, batch_format="pandas")

In [None]:
def tokenize_captions(batch: dict) -> dict:
    tokenizer = AutoTokenizer.from_pretrained(
        model_name
    )
    tokenizer.pad_token = tokenizer.eos_token
    tokens = tokenizer(
        list(batch["text"]),
        truncation=True,
        max_length=tokenizer.model_max_length,
        padding="max_length",
        return_tensors="np",
    )
    tokens["labels"] = tokens["input_ids"].copy()
    return {k: v for k, v in tokens.items()}

tokenizer = BatchMapper(tokenize_captions, batch_format="numpy")

In [None]:
import evaluate
from transformers import Trainer, TrainingArguments
from transformers import (
    GPTJForCausalLM,
    AutoTokenizer,
    default_data_collator,
)
from transformers.utils.logging import disable_progress_bar, enable_progress_bar
import torch
import math

from ray.air import session


def trainer_init_per_worker(train_dataset, eval_dataset=None, **config):
    os.environ["OMP_NUM_THREADS"] = str(session.get_trial_resources().bundles[-1].get("CPU", 1))
    # Enable tf32 for better performance
    torch.backends.cuda.matmul.allow_tf32 = True

    batch_size = config.get("batch_size", 4)
    epochs = config.get("epochs", 2)
    warmup_steps = config.get("warmup_steps", 0)
    learning_rate = config.get("learning_rate", 0.002)
    weight_decay = config.get("weight_decay", 0.01)

    deepspeed = {
        "fp16": {
            "enabled": "auto",
            "initial_scale_power": 32,
        },
        "bf16": {"enabled": "auto"},
        "optimizer": {
            "type": "AdamW",
            "params": {
                "lr": "auto",
                "betas": "auto",
                "eps": "auto",
            },
        },
        "zero_optimization": {
            "stage": 3,
            "offload_optimizer": {
                "device": "cpu",
                "pin_memory": False,
            },
            # No need to offload params on A100
            # "offload_param": {
            #     "device": "cpu",
            #     "pin_memory": False,
            # },
            "overlap_comm": True,
            "contiguous_gradients": True,
            "reduce_bucket_size": "auto",
            "stage3_prefetch_bucket_size": "auto",
            "stage3_param_persistence_threshold": "auto",
            "gather_16bit_weights_on_model_save": True,
            "round_robin_gradients": True,
        },
        "gradient_accumulation_steps": "auto",
        "gradient_clipping": "auto",
        "steps_per_print": 1,
        "train_batch_size": "auto",
        "train_micro_batch_size_per_gpu": "auto",
        "wall_clock_breakdown": False,
    }

    print("Preparing training arguments")
    training_args = TrainingArguments(
        "output",
        per_device_train_batch_size=batch_size,
        logging_steps=1,
        save_strategy="steps",
        save_steps=200,
        per_device_eval_batch_size=batch_size,
        learning_rate=learning_rate,
        weight_decay=weight_decay,
        warmup_steps=warmup_steps,
        label_names=["input_ids", "attention_mask"],
        num_train_epochs=epochs,
        push_to_hub=False,
        disable_tqdm=True,  # declutter the output a little
        bf16=True,
        gradient_checkpointing=True,
        deepspeed=deepspeed,
    )
    disable_progress_bar()

    tokenizer = AutoTokenizer.from_pretrained(model_name)
    tokenizer.pad_token = tokenizer.eos_token

    print("Loading model")

    model = GPTJForCausalLM.from_pretrained(model_name, use_cache=False)
    model.resize_token_embeddings(len(tokenizer))

    print("Model loaded")

    enable_progress_bar()

    metric = evaluate.load("accuracy")

    def compute_metrics(eval_pred):
        logits, labels = eval_pred
        predictions = np.argmax(logits, axis=-1)
        return metric.compute(predictions=predictions, references=labels)

    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=eval_dataset,
        compute_metrics=compute_metrics,
        tokenizer=tokenizer,
        data_collator=default_data_collator,
    )
    return trainer

In [None]:
from ray.train.huggingface import HuggingFaceTrainer
from ray.air.config import RunConfig, ScalingConfig, CheckpointConfig
from ray.air.integrations.mlflow import MLflowLoggerCallback
from ray.tune import SyncConfig
from ray.data.preprocessors import Chain


trainer = HuggingFaceTrainer(
    trainer_init_per_worker=trainer_init_per_worker,
    trainer_init_config={
        "batch_size": 6,
        "epochs": 1,
    },
    scaling_config=ScalingConfig(
        num_workers=num_workers, use_gpu=use_gpu, resources_per_worker={"GPU": 1, "CPU": cpus_per_worker}
    ),
    datasets={"train": ray_datasets["train"], "evaluation": ray_datasets["validation"]},
    run_config=RunConfig(
        local_dir="/mnt/cluster_storage/",
        sync_config=SyncConfig(syncer=None),
        callbacks=[MLflowLoggerCallback(experiment_name=model_name.split("/")[-1])],
        checkpoint_config=CheckpointConfig(
            num_to_keep=1,
            checkpoint_score_attribute="eval_loss",
            checkpoint_score_order="min",
        ),
    ),
    preprocessor=Chain(
        string_splitter, tokenizer
    ),
)

In [None]:
results = trainer.fit()

In [None]:
checkpoint = results.checkpoint
checkpoint

In [None]:
from ray.train.huggingface import HuggingFacePredictor, HuggingFaceCheckpoint
from transformers import set_seed


@ray.remote(num_gpus=1)
def predict(uri, seed=None):
    if seed is None:
        rng = np.random.default_rng(seed=None)
        seed = rng.integers(0, 2**16)
    print(f"seed: {seed}")
    set_seed(seed)
    checkpoint = HuggingFaceCheckpoint.from_uri(uri)
    predictor = HuggingFacePredictor.from_checkpoint(
        checkpoint, task="text-generation", device=0, torch_dtype=torch.bfloat16
    )
    return predictor.predict(
        pd.DataFrame([["Romeo:"]]),
        do_sample=True,
        max_new_tokens=256,
        top_k=50,
        top_p=0.95,
        num_return_sequences=10,
    )

In [None]:
prediction_task = predict.remote(checkpoint.uri)
predictions = ray.get(prediction_task)
predictions