# Setup

In [27]:
PROJECT_ID = "argolis-lsj-test"
REGION = "us-central1"
GCS_BUCKET = "pipeline-argolis-lsj-test-unique"

TRAIN_DOCKER_URI = f"us-central1-docker.pkg.dev/argolis-lsj-test/t5/finetuning_flan_t5_large:multi-node-torchrun-0905"

In [10]:
# train
#%cd torchrun
!docker build -t {TRAIN_DOCKER_URI} .
!docker push {TRAIN_DOCKER_URI}

^C
The push refers to repository [us-central1-docker.pkg.dev/argolis-lsj-test/t5/finetuning_flan_t5_large]
tag does not exist: us-central1-docker.pkg.dev/argolis-lsj-test/t5/finetuning_flan_t5_large:multi-node-torchrun-0822


# Cloud train

In [32]:
from typing import Any, List
import datetime
from pytz import timezone
import math
from google.cloud import aiplatform

def launch_job(job_name: str,
               project: str,
               location: str,
               gcs_bucket: str,
               image_uri: str,
               trainer_args: List[Any],
               num_nodes: int = 1,
               machine_type: str = "n1-starndard-8",
               num_gpus_per_node: int = 1,
               gpu_type: str = "NVIDIA_TESLA_V100",
               max_bandwith_per_node: int = 100,
               reduction_server: bool = True):
    aiplatform.init(project=project, location=location, staging_bucket=gcs_bucket)
    
    # Train contaienr spec.
    image_args = trainer_args
    train_container_spec = {
        "image_uri": image_uri,
        "args": image_args,
    }
    
    # Primary worker spec.
    primary_worker_spec = {
        "container_spec": train_container_spec,
        "machine_spec": {
            "machine_type": machine_type,
            "accelerator_type": gpu_type,
            "accelerator_count": num_gpus_per_node,
        },
        "replica_count": 1,
    }
    
    # Secondary worker spec.
    secondary_worker_spec = {}
    if num_nodes > 1:
        secondary_worker_spec = {
            "container_spec": train_container_spec,
            "machine_spec": {
                "machine_type": machine_type,
                "accelerator_type": gpu_type,
                "accelerator_count": num_gpus_per_node,
            },
            "replica_count": num_nodes - 1,
        }
    
    # Reduction server spec.
    # https://cloud.google.com/vertex-ai/docs/training/distributed-training#reduce_training_time_with_reduction_server
    max_bandwith = max_bandwith_per_node * num_nodes
    replica_count = int(math.ceil(max_bandwith / 32))
    reduction_server_spec = {}
    if reduction_server:
        reduction_server_spec = {
            "container_spec": {
                "image_uri": "us-docker.pkg.dev/vertex-ai-restricted/training/reductionserver:latest",
            },
            "machine_spec": {
                "machine_type": "n1-standard-32",
            },
            "replica_count": replica_count,
        }

    # Launch job.
    worker_pool_specs = [
        primary_worker_spec,
        secondary_worker_spec,
        reduction_server_spec,
        {},
    ]
    job = aiplatform.CustomJob(
        display_name=job_name,
        worker_pool_specs=worker_pool_specs,
    )
    job.submit(
        network=None,
        restart_job_on_worker_restart=True,
        enable_web_access=True,
    )

In [33]:
# Setup the cluster config.
num_nodes = 4
num_gpus_per_node = 2
machine_type = "n1-standard-32"
gpu_type = "NVIDIA_TESLA_T4"

# Setup job name.
timestamp = datetime.datetime.now().astimezone(timezone('US/Pacific')).strftime("%Y%m%d_%H%M%S")
job_name = f"flant5-finetuning-gpu-deepspeed-torchrun"

# Setup the trainer args.
trainer_args = [
    f"--nproc-per-node={num_gpus_per_node}",  
    "run_seq2seq_deepspeed.py",
    "--epoch=1",
    "--batch_size=8",
    "--train_dataset_path=/gcs/lsj-public/deepspeed/split_data_2/train",
    "--test_dataset_path=/gcs/lsj-public/deepspeed/split_data_2/eval",
    "--model_output_dir=$AIP_MODEL_DIR",
    "--tensorboard_log_dir=$AIP_TENSORBOARD_LOG_DIR"
    
]

launch_job(
    job_name=job_name,
    project=PROJECT_ID,
    location=REGION,
    gcs_bucket=GCS_BUCKET,
    image_uri=TRAIN_DOCKER_URI,
    trainer_args=trainer_args,
    num_nodes=num_nodes,
    machine_type=machine_type,
    num_gpus_per_node=num_gpus_per_node,
    gpu_type=gpu_type,
    reduction_server=True,
    #max_bandwith_per_node=32
)

Creating CustomJob
CustomJob created. Resource name: projects/703099487153/locations/us-central1/customJobs/4820619134947557376
To use this CustomJob in another session:
custom_job = aiplatform.CustomJob.get('projects/703099487153/locations/us-central1/customJobs/4820619134947557376')
View Custom Job:
https://console.cloud.google.com/ai/platform/locations/us-central1/training/4820619134947557376?project=703099487153
