# Getting Started with Tensor Parallelism using the SageMaker Model Parallelism Library

This notebook walks you through how to use the tensor parallelism feature provided by the SageMaker model parallelism library. You'll learn how to train the GPT-J model with tensor parallelism on a synthetic text data.

**Note**: To run this example training job, you must be in `us-west-2`. The preview version of container images are available only in those two regions.

## Install and Upgrade Libraries

The SageMaker model parallelism library's tensor parallelism feature requires the SageMaker Python SDK and the SageMaker Experiments library. Run the following cell to install or upgrade the libraries.

**Note:** To finish applying the changes, you must restart the kernel.

Import and check if the SageMaker Python SDK version is successfully set to the latest version

In [None]:
install_needed = True  # should only be True once
# install_needed = False

In [None]:
import sys
import IPython

if install_needed:
    print("installing deps and restarting kernel")
#     !{sys.executable} -m pip install -U split-folders tqdm albumentations crc32c wget
    !{sys.executable} -m pip install 'sagemaker[local]' --upgrade
    !{sys.executable} -m pip install -U smdebug sagemaker-experiments
    !{sys.executable} -m pip install -U sagemaker
    !{sys.executable} -m pip install -U datasets transformers
    !/bin/bash ./local/local_change_setting.sh
    IPython.Application.instance().kernel.do_shutdown(True)

## Amazon SageMaker Initialization

This private preview feature is available to use in `us-east-1` and `us-west-2`.
Throughout this example, you'll use a training script of GPT model and a text dataset.

Run the following cell to import SageMaker modules and retrieve information of your current SageMaker work environment: your AWS account ID, the AWS Region you are using to run the notebook, and the ARN of your Amazon SageMaker execution role.

In [None]:
%%time
import os

import sagemaker
from sagemaker import get_execution_role
from sagemaker.huggingface import HuggingFace
from smexperiments.experiment import Experiment
from smexperiments.trial import Trial
import boto3

# If running in Sagemaker notebook this can stay commented
# os.environ["AWS_PROFILE"] = "sm"

# supported regions only us-west-2 and us-east-1
# preview images are only in these two regions
os.environ["AWS_DEFAULT_REGION"] = "us-west-2"

role = get_execution_role() # provide a pre-existing role ARN as an alternative to creating a new role
print(f'SageMaker Execution Role:{role}')

client = boto3.client('sts')
account = client.get_caller_identity()['Account']
print(f'AWS account:{account}')

session = boto3.session.Session()
region = session.region_name
print(f'AWS region:{region}')

sm_boto_client = boto3.client("sagemaker")
sagemaker_session = sagemaker.session.Session(boto_session=session)

In [None]:
print(sagemaker.__version__)

## Specify Amazon S3 Bucket Paths

Here you need to specify the paths for training data to be used by your job. The bucket used must be in the same region as where training will run. As part of the private preview artifacts, we provide a synthetic dataset that you can use to quickly get started in 'smdistributed-modelparallel-preview' bucket. This bucket is in us-west-2, and we recommend you copy the data to your own bucket and update the paths in the next cell to avoid any cross-account permission issues depending on your IAM role permissions.

After you successfully run this example tensor parallel training job, you can modify the S3 bucket to where your own dataset is stored.

In [None]:
external_dataset='s3://sagemaker-sample-files/datasets/binary/bert/hdf5_lower_case_1_seq_len_128_max_pred_20_masked_lm_prob_0.15_random_seed_12345_dupe_factor_5/wikicorpus_en_abstract'

In [None]:
data_path = './dataset/wikicorpus_en_abstract'

In [None]:
!aws s3 cp $external_dataset $data_path --recursive
!rm -rf $data_path/train $data_path/test
!mkdir $data_path/train $data_path/test
!mv $data_path/*_training_* $data_path/train/
!mv $data_path/*_test_* $data_path/test/

The below bucket will store output artifacts of the training job. You can modify this as needed.

In [None]:
data_bucket = 'dataset-us-west-2-cyj'  #<== 데이터셋이 들어있는 bucket 이름으로 변경
input_data = f's3://{data_bucket}/wikicorpus_en_abstract'

In [None]:
!aws s3 sync ./dataset/wikicorpus_en_abstract $input_data

## Setup fsx and use fsx for data channels and checkpoints

While the above option is easier to setup, using an FSX can be beneficial for performance when dealing with large input sizes and large model sizes. If you are using models above 13B, checkpointing should be done using FSX. 

Please see the instructions [here](https://github.com/aws/amazon-sagemaker-examples/blob/master/advanced_functionality/distributed_tensorflow_mask_rcnn/mask-rcnn-scriptmode-fsx.ipynb), to create the FSx lustre filesystem and import the dataset from the S3 bucket to your fsx filesystem. Note that the FSX must be created in a private subnet with internet gateway to ensure that training job has access to the internet. 

In [None]:
# Instructions obtained from:
# https://github.com/aws/amazon-sagemaker-examples/blob/master/advanced_functionality/distributed_tensorflow_mask_rcnn/mask-rcnn-scriptmode-fsx.ipynb

use_fsx = False

if use_fsx:
    from sagemaker.inputs import FileSystemInput

    # Specify FSx Lustre file system id.
    file_system_id = "<your-file-system-id>"

    # Specify the SG and subnet used by the FSX, these are passed to SM Estimator so jobs use this as well
    fsx_security_group_id = "<your-security-group-id>"
    fsx_subnet = "<your-subnet>"

    # Specify directory path for input data on the file system.
    # You need to provide normalized and absolute path below.
    # Your mount name can be provided by you when creating fsx, or generated automatically.
    # You can find this mount_name on the FSX page in console.
    # Example of fsx generated mount_name: "3x5lhbmv"
    base_path = "<your-mount-name>"

    # Specify your file system type.
    file_system_type = "FSxLustre"

    fs_train = FileSystemInput(file_system_id=file_system_id,
                            file_system_type=file_system_type,
                            directory_path=train_base_path,
                            file_system_access_mode="rw")
    fs_test = FileSystemInput(file_system_id=file_system_id,
                            file_system_type=file_system_type,
                            directory_path=test_base_path,
                            file_system_access_mode="rw")

    data_channels = {"train": fs_train, "test": fs_test}

## Set Up Hyperparameters, Metric Definitions, and MPI Options
The following `hyperparameters` dictionary is to pass arguments to the training script (`train_gptj_simple.py`) and set the model parallel configuration when creating the training job.

You can also add custom mpi flags. By default, we have `--mca btl_vader_single_copy_mechanism none` to remove unnecessary logs.

Next we add a base metric definitions to enable the metric upload in SageMaker. You can add any further metric definitions.

In [None]:
metric_definitions=[
     {'Name': 'Batch', 'Regex': 'Batch:(.*?),'},
     {'Name': 'train:Loss', 'Regex': 'Train loss:(.*?),'},
     {'Name': 'train:speed', 'Regex': 'Train speed:(.*?),'},
     {'Name': 'validation:Loss', 'Regex': 'Validation loss:(.*?),'},
     {'Name': 'validation:perplexity', 'Regex': 'Validation perplexity:(.*?),'},    
]

In [None]:
hyperparameters = {'max_steps': 50,
                   'seed': 12345,
                   'fp16': 1,
                   'lr': 2.e-4,
                   'lr_decay_iters': 125000,
                   'min_lr': 0.00001,
                   'lr-decay-style': 'linear',
                   'warmup': 0.01,
                   'num_kept_checkpoints': 5,
                   'checkpoint_freq': 200,
                   'logging_freq': 10,
                   'save_final_full_model': 1,
                   'skip_full_optimizer': 1,
                   'shard_optimizer_state': 1,
                   'activation_checkpointing': 1,
                   'activation_strategy': 'each',
                   'optimize': 'speed',
                   'use_bert_data': 1,
                   'epochs': 20,
                    # below flag loads model and optimizer state from checkpoint_s3_uri
                    # 'load_partial': 1,
                  }



if input_data.split('/')[-1] != 'wikicorpus_en_abstract':
    # those flags are used when training with the openwebtext dataset
    hyperparameters["zipped_data"] = 0
    hyperparameters["validation_freq"] = 20
    hyperparameters["use_wiki_data"] = 0
    
    
# if use_fsx:
#     # make sure to update paths for training-dir and test-dir based on the paths of datasets in fsx
#     # If you want to resume training, set checkpoint-dir to the same path as a previous job.
#     SM_TRAIN_DIR = "/opt/ml/input/data/train"
#     hyperparameters['checkpoint-dir'] = f"{SM_TRAIN_DIR}/checkpointdir-job2"
#     hyperparameters['model-dir'] = f"{SM_TRAIN_DIR}/modeldir-job2"
#     hyperparameters['training-dir'] = f"{SM_TRAIN_DIR}/datasets/pytorch_gpt2/train_synthetic"
#     hyperparameters['test-dir'] = f"{SM_TRAIN_DIR}/datasets/pytorch_gpt2/val_synthetic"

# The checkpoint path (hyperparameters['checkpoint-dir'] or checkpoint_s3_uri) is not unique per job. 
# You need to modify as needed for different runs. 
# If same path is used for unrelated runs, this may increase time when downloading unnecessary checkpoints, 
# and cause conflicts when loading checkpoints.


hyperparameters

Set the model configuration below.

In [None]:
model_config = 'gptj-6b'

if model_config == 'gptj-6b':
    model_params = {        
        'max_context_width': 512, 
        'hidden_width': 4096, 
        'num_layers': 28, 
        'num_heads': 16,
        
        'tensor_parallel_degree': 4,
        'pipeline_parallel_degree': 2,

        'train_batch_size': 8,
        'val_batch_size': 8,
        'prescaled_batch': 1,
    }
else:
    raise RuntimeError("Unknown model config")

for k, v in model_params.items():
    hyperparameters[k] = v

In [None]:
## for local mode
# model_config = 'gptj-6b'

# if model_config == 'gptj-6b':
#     model_params = {        
#         'max_context_width': 512, 
#         'hidden_width': 1024, 
#         'num_layers': 12, 
#         'num_heads': 8,
        
#         'tensor_parallel_degree': 4,
#         'pipeline_parallel_degree': 2,

#         'train_batch_size': 8,
#         'val_batch_size': 8,
#         'prescaled_batch': 1,
#     }
# else:
#     raise RuntimeError("Unknown model config")

# for k, v in model_params.items():
#     hyperparameters[k] = v

## Set Up SageMaker Experiment
Create or load [SageMaker Experiment](https://docs.aws.amazon.com/sagemaker/latest/dg/experiments.html) for the example training job. This will create an experiment trial object in SageMaker.

In [None]:
def create_experiment(experiment_name):
    try:
        sm_experiment = Experiment.load(experiment_name)
    except:
        sm_experiment = Experiment.create(experiment_name=experiment_name)

In [None]:
def create_trial(experiment_name, i_type, i_cnt, pp_degree, tp_degree, batch_size):
    create_date = strftime("%m%d-%H%M%s")

    i_tag = 'test'
    if i_type == 'ml.p4d.24xlarge':
        i_tag = 'p4d'    
        
    trial = "-".join([i_tag,str(i_cnt),f"tp{tp_degree}",f"pp{pp_degree}", f"bs{batch_size}"])
       
    sm_trial = Trial.create(trial_name=f'{experiment_name}-{trial}-{create_date}',
                            experiment_name=experiment_name)

    job_name = f'{sm_trial.trial_name}'
    return job_name

In [None]:
from time import gmtime, strftime

# Specify your experiment name
experiment_name = "smp-gptj"

## Specify Essential Parameters for a SageMaker Training Job

Next, you will use the [`SageMaker Estimator API`](https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html) to define a SageMaker Training Job, passing values through the following parameters for training job name, the number of EC2 instances, the instance type, and the size of the volume attached to the instances. 

* `instance_count`
* `instance_type`
* `volume_size`
* `base_job_name`

### Update the Type and Number of EC2 Instance to Use

The instance type and the number of instances you specify to the `instance_type` and `instance_count` parameters, respectively, will determine the total number of GPUs (world size).

$$ \text{(world size) = (the number of GPUs on a single instance)}\times\text{(the number of instance)}$$

In [None]:
import os
from sagemaker.pytorch import PyTorch
import datetime

instance_type = "ml.p4d.24xlarge" # "ml.p3.16xlarge"
# instance_type = 'local_gpu'
instance_count = 2
# processes_per_host = 8
max_run = 4*60*60      ##### 최대 학습 시간 (28일까지 가능)

To look up the number of GPUs of different instance types, see [Amazon EC2 Instance Types](https://aws.amazon.com/ec2/instance-types/). Use the section **Accelerated Computing** to see general purpose GPU instances. Note that, for example, a given instance type `p4d.24xlarge` has a corresponding instance type `ml.p4d.24xlarge` in SageMaker.
For SageMaker supported `ml` instances and cost information, see [Amazon SageMaker Pricing](https://aws.amazon.com/sagemaker/pricing/). 

### Attach an EBS Volume to the Training Instance
The volume size you specify in `volume_size` must be larger than your input data size. In this example, the volume size is set to 500GB.

In [None]:
volume_size=500

### Specify code and output bucket

In [None]:
trainingjob_bucket='trainingjob-us-west-2-cyj/gpt-j' #<== 학습 후 결과를 저장하는 bucket 이름으로 변경  #<== 고객 환경이 맞게 변경
code_location = f's3://{trainingjob_bucket}/backup_codes'
output_path = f's3://{trainingjob_bucket}/gpt_neox_output' 
s3_log_path = f's3://{trainingjob_bucket}/logs'

### Set distributed training

In [None]:
distribution = {}
flag = 'smmp'

if flag == 'smddp':
    distribution["smdistributed"]={ 
                        "dataparallel": {
                            "enabled": True
                        }
                }

elif flag == 'smmp':
    distribution['smdistributed'] = {
        "modelparallel": {
            "enabled":True,
            "parameters": {
                "ddp": True,
                "tensor_parallel_degree": hyperparameters['tensor_parallel_degree'],
                # partitions is a required param in the current SM SDK so it needs to be passed,
                # these two map to the same config
                "partitions": hyperparameters['pipeline_parallel_degree'],
                "shard_optimizer_state": hyperparameters['shard_optimizer_state'] > 0,
                "prescaled_batch": hyperparameters['prescaled_batch'] > 0,
                "fp16_params": hyperparameters['fp16'] > 0,
                "optimize": hyperparameters['optimize'],
                "auto_partition": True,
                "default_partition": 0,                        
                "fp16_params": hyperparameters['fp16'] > 0,
                "optimize": hyperparameters['optimize'],
            }
        }   
    }
    mpioptions = "-x NCCL_DEBUG=WARN -x SMDEBUG_LOG_LEVEL=ERROR "
    mpioptions += "-x SMP_DISABLE_D2D=1 -x SMP_D2D_GPU_BUFFER_SIZE_BYTES=1 -x SMP_NCCL_THROTTLE_LIMIT=1 "
    mpioptions += "-x FI_EFA_USE_DEVICE_RDMA=1 -x FI_PROVIDER=efa -x RDMAV_FORK_SAFE=1"


    distribution["mpi"]={
        "enabled": True,
        "processes_per_host": 8, # Pick your processes_per_host
        "custom_mpi_options": mpioptions      
    }
else:
    distribution["mpi"]={"enabled": True}

### Use local mode / Script mode setting

In [None]:
kwargs = {}

if instance_type =='local_gpu':
    from sagemaker.local import LocalSession
    from pathlib import Path

    sagemaker_session = LocalSession()
    sagemaker_session.config = {'local': {'local_code': True}}
    train = f'file://{Path.cwd()}/dataset/wikicorpus_en_abstract/train'
    test = f'file://{Path.cwd()}/dataset/wikicorpus_en_abstract/test'
    data_channels = {"train": train, "test": test}
    checkpoint_s3_uri = None
else:
    sess = boto3.Session()
    sagemaker_session = sagemaker.Session()
    sm = sess.client('sagemaker')
    train = f's3://{data_bucket}/wikicorpus_en_abstract/train'
    test = f's3://{data_bucket}/wikicorpus_en_abstract/test'
    if use_fsx:
        data_channels = {"train": fs_train, "test": fs_test}

        # Use the security group and subnet that was used to create the fsx filesystem
        kwargs["security_group_ids"] = ["sg-XXXXXXXXX"]  ## 학습인스턴스 용 보안그룹
        kwargs["subnets"] = ["subnet-XXXXXXXXXXX"]       ## FSX 생성 시 설정한 동일 subnet
    else:
        data_channels = {"train": train, "test": test}
    checkpoint_s3_uri = f's3://{trainingjob_bucket}/checkpoints'

### Create a SageMaker PyTorch Estimator

The following cell constructs a PyTorch estimator using the parameters defined above. To see how the SageMaker tensor parallelism modules and functions are applied to the script, see the `train_gptj_simple.py` file and the private preview documentation. 

In [None]:
print("experiment_name : {} \ntrain_instance_type : {} \ntrain_instance_count : {} \ndistribution : {}".format(experiment_name, instance_type, instance_count, distribution))    

In [None]:
smp_estimator = HuggingFace(
        entry_point="train_gptj_simple.py",
        source_dir=os.getcwd() + "/code",
        role=role,
        instance_type=instance_type,
#         image=image_uri,
        volume_size=volume_size,
        instance_count=instance_count,
        sagemaker_session=sagemaker_session,
        distribution=distribution,
        pytorch_version='1.10',
        transformers_version='4.17',
        py_version='py38',
        code_location = code_location,
        output_path=output_path,
        disable_profiler=True,
        debugger_hook_config=False,
        checkpoint_s3_uri=checkpoint_s3_uri,
        metric_definitions=metric_definitions,
        hyperparameters=hyperparameters,
        max_run=max_run,
        **kwargs
    )

Finally, run the estimator to launch the SageMaker training job of GPT-J model with tensor parallelism.

In [None]:
# !sudo rm -rf ./code/*.json

In [None]:
data_channels

In [None]:
create_experiment(experiment_name)
job_name = create_trial(experiment_name, instance_type, instance_count, hyperparameters['pipeline_parallel_degree'], hyperparameters['tensor_parallel_degree'], hyperparameters["train_batch_size"])

smp_estimator.fit(
    inputs=data_channels, 
    job_name=job_name,
    experiment_config={
      'TrialName': job_name,
      'TrialComponentDisplayName': job_name,
    },
    wait=False,
)

In [None]:
job_name=smp_estimator.latest_training_job.name

In [None]:
sagemaker_session.logs_for_job(job_name=job_name, wait=True)

# Accessing the Training Logs

You can access the training logs from [Amazon CloudWatch](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/WhatIsCloudWatch.html). Make sure to look at the logs of algo-1 as that is the master node whose output stream will have the training job logs.

You can use CloudWatch to track SageMaker GPU and memory utilization during training and inference. To view the metrics and logs that SageMaker writes to CloudWatch, see *Processing Job, Training Job, Batch Transform Job, and Endpoint Instance Metrics* in [Monitor Amazon SageMaker with Amazon CloudWatch](https://docs.aws.amazon.com/sagemaker/latest/dg/monitoring-cloudwatch.html).

If you are a new user of CloudWatch, see [Getting Started with Amazon CloudWatch](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/GettingStarted.html). 

For additional information on monitoring and analyzing Amazon SageMaker training jobs, see [Monitor and Analyze Training Jobs Using Metrics](https://docs.aws.amazon.com/sagemaker/latest/dg/training-metrics.html).

# Deploying Trained Model for Inference

In most cases the trained model can be deployed on a single device for inference, since inference has smaller memory requirements. You can use the SMP API to create a single, unified model after training. For TensorFlow, a SavedModel can be created using `smp.DistributedModel.save_model` API, and for PyTorch, `smp.save()` can be used.

After you build and train your models, you can deploy them to get predictions in one of two ways:

* To set up a persistent endpoint to get predictions from your models, use SageMaker hosting services. For an overview on deploying a single model or multiple models with SageMaker hosting services, see [Deploy a Model on SageMaker Hosting Services](https://docs.aws.amazon.com/sagemaker/latest/dg/how-it-works-deployment.html#how-it-works-hosting).
* To get predictions for an entire dataset, use SageMaker batch transform. For an overview on deploying a model with SageMaker batch transform, see [Get Inferences for an Entire Dataset with Batch Transform](https://docs.aws.amazon.com/sagemaker/latest/dg/how-it-works-batch.html).

To learn more about deploying models for inference using SageMaker, see [Deploy Models for Inference](https://docs.aws.amazon.com/sagemaker/latest/dg/deploy-model.html). 
