<a href="https://colab.research.google.com/github/SpencerFonbuena/MentorCruise/blob/main/distributed_ops.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Imports

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# Model Parallelism

## Resources

> #### Megatron-LM: Training Multi-Billion Parameter Language Models Using Model Parallelism | [Paper](https://arxiv.org/pdf/1909.08053.pdf)
> #### Reducing Activation Recomputation in large Transformer Models | [Paper](https://docs.google.com/viewerng/viewer?url=https://arxiv.org/pdf/2205.05198.pdf)

## Tensor Parallelism

### 1. Feed Forward With Sync Point
> As shown in the Megatron-LM, one way to split the feed forward on to different processors is to cut the input column wise, and the weight matrix row wise. An explicit example is given below. The only issue, is that it requires a sync point before the Gelu. The sync point is represented by the additon on the 2 gpu example. The reason we have to sync is because GeLU(AB+A2B2) != GeLU(AB)+GeLU(A2B2)

In [None]:
#Single GPU (+1 at the end to 1 index, instead of 0 index)
A = torch.arange(12).reshape(3,4) + 1
B = torch.arange(8).reshape(4,2) + 1
z = torch.matmul(A, B)
z


tensor([[ 50,  60],
        [114, 140],
        [178, 220]])

In [None]:
#2 GPUs

    # [Splitting A columnwise means (3,4) turns into 2x(3,2)]
A = torch.tensor([[1,2],
                  [5,6],
                  [9,10]])
A2 = torch.tensor([[3,4],
                   [7,8],
                   [11,12]])

B = torch.tensor([[1,2],
                  [3,4]])
B2 = torch.tensor([[5,6],
                   [7,8]])

#   [GPU 0]               [GPU1]
x = torch.matmul(A2,B2)
y = torch.matmul(A,B)
x+y


tensor([[ 50,  60],
        [114, 140],
        [178, 220]])

### 2. Feed Forward Without Sync Point
> An alternative is to split the weight matrix column wise for the first GEMM, which gets rid of our sync point (I believe it is because we simply have to concatenate the matrices, not add them). For the second GEMM, we can split it row wise as we did before, and then reduce it.

In [None]:
#Single GPU (+1 at the end to 1 index, instead of 0 index)
A = torch.arange(12).reshape(3,4) + 1
B = torch.arange(8).reshape(4,2) + 1
torch.matmul(A, B)


tensor([[ 50,  60],
        [114, 140],
        [178, 220]])

In [None]:
# 2 GPU training
# [B & B2 represent the weight matrix split down the column]

# GPU 1
B = torch.tensor([[1],[3],[5],[7]])
# GPU 2
B2 = torch.tensor([[2],[4],[6],[8]])

# GPU 1
x = torch.matmul(A, B)
# GPU 2
y = torch.matmul(A, B2)
torch.cat([x, y], dim=-1)

tensor([[ 50,  60],
        [114, 140],
        [178, 220]])

### 3. MHA
> #### [1]In the paper, it looks like the parallelization happens once we have our attention heads. For that reason, I waited until we unbound the q, k, and v. Once they are unbound, I cut apart the heads into the number of GPUs we have access to.
> #### [2] In order to parallelize the heads, we have to split them up into different variables to place on to different processors.
> #### [General] It was at first confusing to me how it was ok to split up the MHA. But it helps to look at the dimensions of each q, k, v, and look at what they mean:
>>The dims of q,k,v = (512,120,64) => (heads (for each example), tokens, features). When we do matmul(q, k.transpose) we are multiplying each of the last two dimensions (120,64)(64,120), 512 different times. Each one of those 512 matrices are distinct. that means that the matrix multiply of the 1st head, has nothing to do with the matrix multiply of the 2nd head. That way, when we split the MHA on that dimension, there is no information corruption
> #### [3] We then have to concatenate them all back together, for the linear layer that extracts the information from the multi-headed attention layer.

In [None]:
# my mock data input size
data = torch.randn(64,120,512) # (Batch, Sequence, Features) => (batch, timestep, features)
class Attention(nn.Module):
    def __init__(self,
                 dim: int,
                 num_heads: int = 8,
                 num_gpus: int = 2) -> None:
        super().__init__()
        self.num_heads = num_heads
        self.qkv = nn.Linear(dim, dim * 3) # (64, 120, 512) @ (512, 512*3) | This creates 3x the number of features. 3x because there is 1 query + 1 key + 1 value = 3 representations of our original dataset
        self.proj = nn.Linear(dim, dim)

        head_dim = dim // num_heads
        self.scale = head_dim**-0.5
        self.num_gpus = num_gpus

    def forward(self, data):
        B, S, _ = data.shape
        qkv = self.qkv(data) # (64, 120, 1536) => (64 examples, 120 tokens (in the form of timesteps), 1536 features)
        qkv = qkv.reshape(B, S, 3, -1) # (64, 120, 3, 512) => (64 examples, 120 tokens, 3 attributes of each token (QKV), 512 features)
        qkv = qkv.reshape(B, S, 3, self.num_heads, -1) # (64, 120, 3, 8, 64) => (64 examples, 120 tokens, 3 attributes of each token, 8 versions of each attribute, 64 features )
        qkv = qkv.permute(2, 0, 3, 1, 4) # (3, 64, 8, 120, 64)
        q, k, v = qkv.reshape(3, B*self.num_heads, 120, -1).unbind(0) # each q, k, v has dimensions 3x(512,120,64) => 3 of (examples, tokens, features)




        # [Add this operation for tensor parallelism]

            # [1] [I have reshaped the Qs, Ks, and Vs, in order to have the number of GPUs in the first
            #  dimension. This allows us to unbind them, and therefore split up each head onto a different GPU]
        qs = q.reshape(self.num_gpus, int(B*self.num_heads / self.num_gpus), S, -1).unbind(0)
        ks = q.reshape(self.num_gpus, int(B*self.num_heads / self.num_gpus), S, -1).unbind(0)
        vs = q.reshape(self.num_gpus, int(B*self.num_heads / self.num_gpus), S, -1).unbind(0)

            # [2] [This is my attempt to assign each one of those different chunks of the model onto different variables,
            #  This will hopefully allow us to then place them on different processors and comput in parallel]
        for i in qs:
            qs[i].to(f'cuda:{i}')
        for i in ks:
            ks[i].to(f'cuda:{i}')
        for i in vs:
            ks[i].to(f'cuda:{i}')

            # [At this point, we have the data on different processors, now we would need to copy each operation
            #  on the individual GPUs and run them. What I'm not sure of is how to get them to go at the same time.]




        attn = (q * self.scale) @ k.transpose(-2, -1) # (512, 120, 120)

        attn = attn.softmax(dim=-1)

        x = (attn @ v) # (512, 120, 64)
        x = x.view(B, self.num_heads, S, -1) # (64,8,120,64)
        x = x.permute(0, 2, 1, 3) # (64, 120, 8, 64)
        x = x.reshape(B, S, -1) # (64, 120, 512)



        #[3] [We then would have to do something like the following, assuming that we did each computation on the respective GPU, to concatenate the results and project it into a MLP]
            # [For demonstration, I assumed we only have 2 GPUs, as opposed to above where I tried to generalize the case]
        x = self.proj(torch.cat[gpu1, gpu2])


## Sequence Parallelism

### 1. Layer Norm
> #### [1]Below is a toy example to demonstrate which dimension layernorm affects. As we can see, it normalizes the channel dimension, and is unnafected by other sequence tokens. (If it were affected, there wouldn't be 0's, as each column input is different)
> #### [2]The fact that sequence tokens are unnafected by each other in this operation means that we can parallelize them. We also should be able to combine the sequence and batch dimension to separate the sequence more effectively. We compare the two outcomes from 1 and 2 and see they are the same, which means we didn't corrupt the data through our parallelization

In [None]:
#[1]

batch, sentencelength, embeddingdim = 2,2,3
embedding = torch.arange(12).reshape(batch, sentencelength, embeddingdim).float()
embedding = torch.tensor([[[2,1,4],
                           [10,10,10]],
                          [[100,100,100],
                           [1000,1000,1000]]]).float()
layernorm = nn.LayerNorm(embeddingdim)

layernorm(embedding)

tensor([[[-0.2673, -1.0690,  1.3363],
         [ 0.0000,  0.0000,  0.0000]],

        [[ 0.0000,  0.0000,  0.0000],
         [ 0.0000,  0.0000,  0.0000]]], grad_fn=<NativeLayerNormBackward0>)

In [None]:
#[2]
embedding = embedding.reshape(4,3)
embedding1 = embedding[:2,:]
embedding2 = embedding[2:,:]

layernorm1 = nn.LayerNorm(embeddingdim)
layernorm2 = nn.LayerNorm(embeddingdim)

layernorm1 = layernorm(embedding1)
layernorm2 = layernorm(embedding2)

output = torch.cat([layernorm1, layernorm2]).reshape(2,2,3)
output

tensor([[[-0.2673, -1.0690,  1.3363],
         [ 0.0000,  0.0000,  0.0000]],

        [[ 0.0000,  0.0000,  0.0000],
         [ 0.0000,  0.0000,  0.0000]]], grad_fn=<ReshapeAliasBackward0>)

### 2. Dropout
> #### Dropout is a per element operation. This means that it realistically could be implemented with either sequence, or tensor parallelism. In the paper however, it is done in the sequence parallelism, probably because that is where we are coming from I imagine.