# SageMaker distributed data parallelism (SMDDP)
https://docs.aws.amazon.com/sagemaker/latest/dg/data-parallel-intro.html
The SageMaker distributed data parallelism (SMDDP) library is a collective communication library that improves compute performance of distributed data parallel training. The SMDDP library addresses communications overhead of the key collective communication operations by offering the following.
1. The library offers AllReduce optimized for AWS. AllReduce is a key operation used for synchronizing gradients across GPUs at the end of each training iteration during distributed data training.
2. The library offers AllGather optimized for AWS. AllGather is another key operation used in sharded data parallel training, which is a memory-efficient data parallelism technique offered by popular libraries such as the SageMaker model parallelism (SMP) library, DeepSpeed Zero Redundancy Optimizer (ZeRO), and PyTorch Fully Sharded Data Parallelism (FSDP).
3. The library performs optimized node-to-node communication by fully utilizing AWS network infrastructure and the Amazon EC2 instance topology.

The following is the high-level workflow of the SMDDP AllReduce operation.

1. The library assigns ranks to GPUs (workers).
2. At each iteration, the library divides each global batch by the total number of workers (world size) and assigns small batches (batch shards) to the workers.
   1. The size of the global batch is (number of nodes in a cluster) * (number of GPUs per node) * (per batch shard).
   2. A batch shard (small batch) is a subset of dataset assigned to each GPU (worker) per iteration.
3. The library launches a training script on each worker.
4. The library manages copies of model weights and gradients from the workers at the end of every iteration.
5. The library synchronizes model weights and gradients across the workers to aggregate a single trained model.

AllGather is heavily used in distributed training techniques such as sharded data parallelism where each individual worker holds a fraction of a model, or a sharded layer. The workers call AllGather before forward and backward passes to reconstruct the sharded layers. The forward and backward passes continue onward after the parameters are all gathered. During the backward pass, each worker also calls ReduceScatter to collect (reduce) gradients and break (scatter) them into gradient shards to update the corresponding sharded layer.

# How to use SMDDP

```python
# configuration for running training on smdistributed Data Parallel
distribution = {'smdistributed':{'dataparallel':{ 'enabled': True }}}

# git configuration to download our fine-tuning script
git_config = {'repo': 'https://github.com/huggingface/transformers.git','branch': 'v4.26.0'}

instance_type='ml.p3.16xlarge'
instance_count=2
volume_size=200

huggingface_estimator = HuggingFace(entry_point='run_qa.py',
                                    source_dir='./examples/pytorch/question-answering',
                                    git_config=git_config,
                                    instance_type=instance_type,
                                    instance_count=instance_count,
                                    volume_size=volume_size,
                                    distribution= distribution)
```

In [2]:
!pip install "sagemaker>=2.48.0"  --upgrade

[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.3.1[0m[39;49m -> [0m[32;49m23.3.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [3]:
import sagemaker.huggingface

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


In [4]:
import sagemaker
import boto3
sess = sagemaker.Session()
# sagemaker session bucket -> used for uploading data, models and logs
# sagemaker will automatically create this bucket if it not exists
sagemaker_session_bucket=None
if sagemaker_session_bucket is None and sess is not None:
    # set to default bucket if a bucket name is not given
    sagemaker_session_bucket = sess.default_bucket()

try:
    role = sagemaker.get_execution_role()
except ValueError:
    iam = boto3.client('iam')
    role = iam.get_role(RoleName='sagemaker_execution_role')['Role']['Arn']

sess = sagemaker.Session(default_bucket=sagemaker_session_bucket)

print(f"sagemaker role arn: {role}")
print(f"sagemaker bucket: {sess.default_bucket()}")
print(f"sagemaker session region: {sess.boto_region_name}")

sagemaker role arn: arn:aws:iam::802575742115:role/service-role/AmazonSageMaker-ExecutionRole-20230929T143152
sagemaker bucket: sagemaker-us-east-1-802575742115
sagemaker session region: us-east-1


## Creating an Estimator and start a training job
In this example we are going to use the capability to download/use a fine-tuning script from a git- repository. 

In [5]:
from sagemaker.huggingface import HuggingFace

# hyperparameters, which are passed into the training job
hyperparameters={
    'model_name_or_path': 'bert-large-uncased-whole-word-masking',
    'dataset_name':'squad',
    'do_train': True,
    'do_eval': True,
    'fp16': True,
    'per_device_train_batch_size': 4,
    'per_device_eval_batch_size': 4,
    'num_train_epochs': 2,
    'max_seq_length': 384,
    'max_steps': 100,
    'pad_to_max_length': True,
    'doc_stride': 128,
    'output_dir': '/opt/ml/model'
}

# configuration for running training on smdistributed Data Parallel
distribution = {'smdistributed':{'dataparallel':{ 'enabled': True }}}

# git configuration to download our fine-tuning script
git_config = {'repo': 'https://github.com/huggingface/transformers.git','branch': 'v4.26.0'}

# instance configurations
instance_type='ml.p3.16xlarge'
instance_count=2
volume_size=200

# metric definition to extract the results
metric_definitions=[
     {"Name": "train_runtime", "Regex": "train_runtime.*=\D*(.*?)$"},
     {'Name': 'train_samples_per_second', 'Regex': "train_samples_per_second.*=\D*(.*?)$"},
     {'Name': 'epoch', 'Regex': "epoch.*=\D*(.*?)$"},
     {'Name': 'f1', 'Regex': "f1.*=\D*(.*?)$"},
     {'Name': 'exact_match', 'Regex': "exact_match.*=\D*(.*?)$"}]

In [7]:
# estimator
huggingface_estimator = HuggingFace(entry_point='run_qa.py',
                                    source_dir='./examples/pytorch/question-answering',
                                    git_config=git_config,
                                    metric_definitions=metric_definitions,
                                    instance_type=instance_type,
                                    instance_count=instance_count,
                                    volume_size=volume_size,
                                    role=role,
                                    transformers_version='4.26.0',
                                    pytorch_version='1.13.1',
                                    py_version='py39',
                                    distribution= distribution,
                                    hyperparameters = hyperparameters)

# starting the train job
huggingface_estimator.fit()

Cloning into '/tmp/tmpc9ezp7cy'...
Note: switching to 'v4.26.0'.

You are in 'detached HEAD' state. You can look around, make experimental
changes and commit them, and you can discard any commits you make in this
state without impacting any branches by switching back to a branch.

If you want to create a new branch to retain commits you create, you may
do so (now or later) by using -c with the switch command. Example:

  git switch -c <new-branch-name>

Or undo this operation with:

  git switch -

Turn off this advice by setting config variable advice.detachedHead to false

HEAD is now at 820c46a70 Hotifx remove tuple for git config image processor. (#21278)
INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker:Creating training-job with name: huggingface-pytorch-training-2024-01-23-03-28-44-354


2024-01-23 03:29:08 Starting - Starting the training job
2024-01-23 03:29:08 Pending - Training job waiting for capacity............
2024-01-23 03:30:52 Pending - Preparing the instances for training.........
2024-01-23 03:32:20 Downloading - Downloading input data...
2024-01-23 03:32:45 Downloading - Downloading the training image.....................
2024-01-23 03:36:39 Training - Training image download completed. Training in progress....[34mbash: cannot set terminal process group (-1): Inappropriate ioctl for device[0m
[34mbash: no job control in this shell[0m
[34m2024-01-23 03:37:03,854 sagemaker-training-toolkit INFO     Imported framework sagemaker_pytorch_container.training[0m
[34m2024-01-23 03:37:03,918 sagemaker-training-toolkit INFO     No Neurons detected (normal if no neurons installed)[0m
[34m2024-01-23 03:37:03,930 sagemaker_pytorch_container.training INFO     Block until all host DNS lookups succeed.[0m
[34m2024-01-23 03:37:03,933 sagemaker_pytorch_container.

## Deploying the endpoint

In [8]:
predictor = huggingface_estimator.deploy(1,"ml.g4dn.xlarge")

INFO:sagemaker:Creating model with name: huggingface-pytorch-training-2024-01-23-03-45-29-539
INFO:sagemaker:Creating endpoint-config with name huggingface-pytorch-training-2024-01-23-03-45-29-539
INFO:sagemaker:Creating endpoint with name huggingface-pytorch-training-2024-01-23-03-45-29-539


--------!

In [9]:
data = {
"inputs": {
	"question": "What is used for inference?",
	"context": "My Name is Philipp and I live in Nuremberg. This model is used with sagemaker for inference."
	}
}
predictor.predict(data)

{'score': 0.9702168703079224, 'start': 68, 'end': 77, 'answer': 'sagemaker'}

In [10]:
# Delete the endpoints
predictor.delete_model()
predictor.delete_endpoint()

INFO:sagemaker:Deleting model with name: huggingface-pytorch-training-2024-01-23-03-45-29-539
INFO:sagemaker:Deleting endpoint configuration with name: huggingface-pytorch-training-2024-01-23-03-45-29-539
INFO:sagemaker:Deleting endpoint with name: huggingface-pytorch-training-2024-01-23-03-45-29-539
