<a href="https://colab.research.google.com/github/Frutta111/Deep-Learning-In-PyTorch/blob/main/1_PyTorch_Broadcasting_Procedures.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

3/4/2024


#**Implemention the broadcast functionality of PyTorch**

Welcome to this notebook on PyTorch and working with tensors.

In this session, we will explore the broadcast functionality.

`Broadcasting` is a method that allows PyTorch to perform element-wise operations on tensors of different shapes by automatically expanding their dimensions to match each other. It is needed to simplify and optimize operations on tensors, making code more concise and efficient without requiring manual reshaping.

Specifically, we will get into three important methods:

1.  Implement the [`A.expand_as(B)`](https://pytorch.org/docs/stable/generated/torch.Tensor.expand_as.html#torch.Tensor.expand_as) method.
2.  Determine if two tensors can be broadcasted together.
3.  Implement the [`torch.broadcast_tensors(A,B)`](https://pytorch.org/docs/stable/generated/torch.broadcast_tensors.html) method.

For all these tasks, we will avoid using PyTorch's built-in functions/methods that perform broadcasting. Instead, we will build our implementations from scratch to gain a deeper understanding of how these operations work.

Let's get started!

In [1]:
import torch

##**1. expand_as method**

We follow the General Broadcasting Rules and carry out the implementation step by step:

First, for tensors of different dimensions, check if they are compatible:
Start from the last dimension (rightmost) of both tensors and check,

   * If they are equal to each other, or
   * One of them is equal to 1.

If none of these conditions holds true, an error will occur, move one dimension to the left on each tensor and repeat the above check.

In [2]:
def expandability_check(tensorA, tensorB) :

  '''
  Function to check if tensorA can be expanded to the shape of tensorB

  Args:
      tensorA (torch.Tensor): The tensor to be expanded.
      tensorB (torch.Tensor): The tensor whose shape is to be matched.

  Returns:
      bool: True if tensorA can be expanded to the shape of tensorB, False otherwise.
  '''

  sizeA = tensorA.size() # Get the size of tensorA
  sizeB = tensorB.size() # Get the size of tensorB

  dimA = len(sizeA) # Get the number of dimensions of tensorA
  dimB = len(sizeB) # Get the number of dimensions of tensorB

  # Ensure tensorB has at least as many dimensions as tensorA
  if dimB < dimA :
    print("Error: the number of size provided must be greater or equal to the number of dimensions in tensor A")
    return False
    # raise RuntimeError("The number of size provided must be greater or equal to the number of dimensions in tensor A")

  i = 1

  # Check each dimension from the end to the beginning
  while i <= dimA:
    if sizeA[-i] != sizeB[-i] and sizeA[-i] != 1:
      print((f"The expanded size of the tensor ({sizeB[-i]}) must match the existing size ({sizeA[-i]}) at non-singleton dimension {dimB-i}. Target sizes: {sizeB}.  Tensor sizes: {sizeA}"))
      return False
      # raise RuntimeError((f"The expanded size of the tensor ({sizeB[-i]}) must match the existing size ({sizeA[-i]}) at non-singleton dimension {dimB-i}. Target sizes: {sizeB}.  Tensor sizes: {sizeA}"))
    i += 1

  return True

Second, once we have traversed through all dimensions of at least one tensor from right to left and no error occurred, we can proceed to the broadcasting stage. During the broadcasting stage, the tensor values are multiplied according to the following rules:
1. If one tensor has fewer dimensions than the other (i.e., smaller size), pad it at the beginning with 1s until the number of dimensions in both tensors is equal.
2. In case of any disagreement in dimensionality, the compatibility check for broadcasting is successfully completed, hence one tensor has a dimension of 1, which is then duplicated along this dimension.

In [3]:
def broadcasting (tensorA, sizeB) :
  '''
  Function to broadcast tensorA to the shape of sizeB

  Args:
      tensorA (torch.Tensor): The tensor to be broadcast.
      sizeB (torch.Size): The target shape for broadcasting.

  Returns:
      torch.Tensor: The broadcasted tensorA.
  '''

  sizeA = tensorA.size() # Get the size of tensorA

  dimA = len(sizeA) # Get the number of dimensions of tensorA
  dimB = len(sizeB)  # Get the number of dimensions of tensorB

  tensorC = tensorA.clone()
  dimC = dimA

  # Padding degenerate dimensions on the left of tensor A until its dimension equals tensor B's dimension

  while dimC < dimB :
    tensorC.unsqueeze_(0) # Add a dimension of size 1 at the beginning
    dimC += 1

  sizeC = tensorC.size() # Get the size of tensorC after padding

  # Expanding degenerate dimensions by # replicate tensorC along dimension i to match sizeB[i]
  for i  in range(dimC) :
    if sizeC[i] == 1:
      tensorC = torch.cat([tensorC] * sizeB[i], dim=i)  #Concatenates the given sequence in the given dimension.
    else :
      continue

  return tensorC

Finally we can combime the procedures above to one for getting the implementation of our version to `expand_as`:


In [4]:
def my_expand_as(tensorA, tensorB) :
  '''
  Function to expand tensorA to the size of tensorB using the custom expandability_check and broadcasting functions

  Args:
      tensorA (torch.Tensor): The tensor to be expanded.
      tensorB (torch.Tensor): The tensor whose size is to be matched.

  Returns:
      torch.Tensor: The expanded tensorA.
  '''
  # Check if tensorA can be expanded to the shape of tensorB
  if expandability_check(tensorA, tensorB) :
    return broadcasting(tensorA, tensorB.size())

Let's see some usage examples and check if our implementation  `my_expand_as` , is correct by comparing it with PyTorch's built-in `expand_as` method

Example 1:  case when where A is expandable to B's dimensions

In [5]:
tensorA = torch.randint(10 , size=[4])
tensorB = torch.arange(2*3*4).reshape(2,3,4)
print(tensorA , tensorA.size())
print(tensorB , tensorB.size())

tensorC = my_expand_as(tensorA, tensorB)
tensorD = tensorA.expand_as(tensorB)

# Check if the custom and built-in "expend as" are equal
if torch.equal(tensorC,tensorD):
    print("\nmy_expand_as matches Torch's expand_as:")
    print(tensorC , tensorC.size())
else:
    print("my_expand_as doesn't matches Torch's expand_as")

tensor([6, 7, 9, 7]) torch.Size([4])
tensor([[[ 0,  1,  2,  3],
         [ 4,  5,  6,  7],
         [ 8,  9, 10, 11]],

        [[12, 13, 14, 15],
         [16, 17, 18, 19],
         [20, 21, 22, 23]]]) torch.Size([2, 3, 4])

my_expand_as matches Torch's expand_as:
tensor([[[6, 7, 9, 7],
         [6, 7, 9, 7],
         [6, 7, 9, 7]],

        [[6, 7, 9, 7],
         [6, 7, 9, 7],
         [6, 7, 9, 7]]]) torch.Size([2, 3, 4])


Example 2:  case when where A can not be expandabled to B's dimensions

In [6]:
tensorA = torch.randint(10 , size=[2,2,4])
tensorB = torch.arange(3*4).reshape(3,4)

print("my_expand_as results:")
tensorC = my_expand_as(tensorA, tensorB)

print("torch expand_as results:")
try:
  tensorD = tensorA.expand_as(tensorB)
except RuntimeError as e:
  print(e)


my_expand_as results:
Error: the number of size provided must be greater or equal to the number of dimensions in tensor A
torch expand_as results:
expand(torch.LongTensor{[2, 2, 4]}, size=[3, 4]): the number of sizes provided (2) must be greater or equal to the number of dimensions in the tensor (3)


Example 3:  case when where A can not be expandabled to B's dimensions

In [7]:
tensorA = torch.ones(size=[1,3])
tensorB = torch.arange(3).reshape(3,1)

print("my_expand_as results:")
tensorC = my_expand_as(tensorA, tensorB)

print("torch expand_as results:")
try:
  tensorD = tensorA.expand_as(tensorB)
except RuntimeError as e:
  print(e)

my_expand_as results:
The expanded size of the tensor (1) must match the existing size (3) at non-singleton dimension 1. Target sizes: torch.Size([3, 1]).  Tensor sizes: torch.Size([1, 3])
torch expand_as results:
The expanded size of the tensor (1) must match the existing size (3) at non-singleton dimension 1.  Target sizes: [3, 1].  Tensor sizes: [1, 3]


##**2. Determine if two tensors can be broadcasted together**

There are cases where tensors cannot be expanded directly but can be broadcasted together.

We should first verify if broadcasting is possible before comparing the results.


The `tensors_expandability_check` function determines if two tensors can be broadcasted together and computes the shape of the resulting tensor if broadcasting is possible.

It accepts two tensors as input, and checks if they can be broadcasted together, according to broadcasting rules.
The function will return a True/False value as well as additional output if they are broadcastable together: The size to which the tensors will be expanded to.

In [8]:
def tensors_expandability_check(tensorA, tensorB) :
  """
  Check if two tensors can be broadcast together and determine the shape of the resulting tensor.

  Args:
        tensorA (torch.Tensor): The first tensor.
        tensorB (torch.Tensor): The second tensor.

  Returns:
        tuple: A tuple where the first element is a boolean indicating if broadcasting is possible,
               and the second element is the shape of the resulting tensor if broadcasting is possible.
  """
  sizeA = tensorA.size() # Get the size of tensorA
  sizeB = tensorB.size() # Get the size of tensorB

  dimA = len(sizeA) # Get the number of dimensions of tensorA
  dimB = len(sizeB) # Get the number of dimensions of tensorB

  # Initialize shape to be the dimension of the larger tensor (in terms of dimensions)
  if dimA < dimB :
    shape = list(sizeB)
  else :
    shape = list(sizeA)

  # Compare the dimensions of the tensors and update shape according to the broadcast rules or return False
  i = 1
  while i <= min(dimA,dimB):
    if sizeA[-i] != sizeB[-i] and min(sizeA[-i], sizeB[-i]) != 1:
      print((f"The size of tensor a ({sizeA[-i]}) must match the size of tensor b ({sizeB[-i]}) at non-singleton dimension {len(shape)-i}"))
      return False
    shape[-i] = (max(sizeA[-i], sizeB[-i]))
    i += 1

  # Return True and the shape of the mutual broadcat
  return True, shape


Let's see some usage examples and check if our implementation  `my_broadcast_tensors` is correct

Example 1: case when A and B can be broadcasted together

In [9]:
tensorA = torch.randint(10 , size=[1,3])
tensorB = torch.arange(3).reshape(3,1)

print("tensors_expandability_check results:")
print(tensors_expandability_check(tensorA, tensorB))

print("\ntorch.broadcast_tensors results:")
tensorA1 , tensorB1 = torch.broadcast_tensors(tensorA, tensorB)
print(tensorA1.size(), tensorB1.size() , sep = "\n")

tensors_expandability_check results:
(True, [3, 3])

torch.broadcast_tensors results:
torch.Size([3, 3])
torch.Size([3, 3])


Example 2: case when A and B can be broadcasted together

In [10]:
tensorA = torch.randint(10 , size=[2,1,4,7])
tensorB = torch.arange(5*2*3*1*7).reshape(5,2,3,1,7)

print("tensors_expandability_check results:")
print(tensors_expandability_check(tensorA, tensorB))

print("\ntorch.broadcast_tensors results:")
tensorA1 , tensorB1 = torch.broadcast_tensors(tensorA, tensorB)
print(tensorA1.size(), tensorB1.size() , sep = "\n")

tensors_expandability_check results:
(True, [5, 2, 3, 4, 7])

torch.broadcast_tensors results:
torch.Size([5, 2, 3, 4, 7])
torch.Size([5, 2, 3, 4, 7])


Example 3: case when A and B cannot be broadcasted together

In [11]:
tensorA = torch.randint(10 , size=[2,1,4,5])
tensorB = torch.arange(5*2*3*1*7).reshape(5,2,3,1,7)

print("tensors_expandability_check results:")
print(tensors_expandability_check(tensorA, tensorB))

print("\ntorch.broadcast_tensors results:")
try:
  tensorA1 , tensorB1 = torch.broadcast_tensors(tensorA, tensorB)
except RuntimeError as e:
  print(e)

tensors_expandability_check results:
The size of tensor a (5) must match the size of tensor b (7) at non-singleton dimension 4
False

torch.broadcast_tensors results:
The size of tensor a (5) must match the size of tensor b (7) at non-singleton dimension 4


Example 4: case when A and B cannot be broadcasted together

In [12]:
tensorA = torch.randint(10 , size=[8,3,3,2,1,4,7])
tensorB = torch.arange(5*2*3*4*7).reshape(5,2,3,4,7)

print("tensors_expandability_check results:")
print(tensors_expandability_check(tensorA, tensorB))

print("\ntorch.broadcast_tensors results:")
try:
  tensorA1 , tensorB1 = torch.broadcast_tensors(tensorA, tensorB)
except RuntimeError as e:
  print(e)

tensors_expandability_check results:
The size of tensor a (3) must match the size of tensor b (5) at non-singleton dimension 2
False

torch.broadcast_tensors results:
The size of tensor a (3) must match the size of tensor b (5) at non-singleton dimension 2


##**3. torch.broadcast_tensors**

The function `my_broadcast_tensors` - receives as input two tensors, and broadcasts them together. The function will return as output two new tensors


*   It first checks if the tensors can be broadcasted together using the `tensors_expandability_check` function (defined at part 2).
*   If the tensors can be broadcasted (`tensors_expandability_check` returns True), it retrieves the shape for broadcasting.
*   It then broadcasts both tensors A and B to the common shape using a  `broadcasting` function (defined at part 1)






In [13]:
def my_broadcast_tensors(tensorA, tensorB) :
  """
   Broadcast two tensors to a common shape if they are broadcast-compatible.

   Args:
        tensorA (torch.Tensor): The first tensor.
        tensorB (torch.Tensor): The second tensor.

   Returns:
        tuple: A tuple of two tensors, each broadcasted to the common shape,
               or None if the tensors cannot be broadcast together.
  """
  # Check if tensors A and B can be broadcasted together
  if tensors_expandability_check(tensorA, tensorB) :
    # If tensors are expandable, get the shape for broadcasting
    check, shape = tensors_expandability_check(tensorA, tensorB)
    # Broadcast tensors A and B to the common shape and return the broadcasted tensors
    return broadcasting(tensorA, shape) , broadcasting(tensorB, shape)


Let's see some usage examples and check if our implementation  `my_broadcast_tensors` , is correct by comparing it with PyTorch's built-in `torch.broadcast_tensors` method

Example 1:  Broadcastable tensors

In [14]:
tensorA = torch.randint(10 , size=[1,3])
tensorB = torch.arange(3).reshape(3,1)

print( tensorA , tensorA.size())
print(tensorB , tensorB.size())

tensorA1 , tensorB1 = my_broadcast_tensors(tensorA, tensorB)
tensorA2 , tensorB2 = torch.broadcast_tensors(tensorA, tensorB)

# Check if the tensors are equal
if torch.equal(tensorA1, tensorA2) & torch.equal(tensorB1, tensorB2):
    print("\nmy_broadcast_tensors matches Torch's torch.broadcast_tensors:\n")
    print( tensorA1 , tensorA1.size())
    print(tensorB1 , tensorB1.size())
else:
    print("\nmy_broadcast_tensors doesn't matches Torch's torch.broadcast_tensors:")


tensor([[0, 0, 0]]) torch.Size([1, 3])
tensor([[0],
        [1],
        [2]]) torch.Size([3, 1])

my_broadcast_tensors matches Torch's torch.broadcast_tensors:

tensor([[0, 0, 0],
        [0, 0, 0],
        [0, 0, 0]]) torch.Size([3, 3])
tensor([[0, 0, 0],
        [1, 1, 1],
        [2, 2, 2]]) torch.Size([3, 3])


Example 2:  Broadcastable tensors

In [15]:
tensorA = torch.randint(10 , size=[2,1,4,7])
tensorB = torch.arange(5*2*3*1*7).reshape(5,2,3,1,7)

tensorA1 , tensorB1 = my_broadcast_tensors(tensorA, tensorB)
tensorA2 , tensorB2 = torch.broadcast_tensors(tensorA, tensorB)

# Check if the tensors are equal
if torch.equal(tensorA1, tensorA2) & torch.equal(tensorB1, tensorB2):
    print("\nmy_broadcast_tensors matches Torch's torch.broadcast_tensors\n")
else:
    print("\nmy_broadcast_tensors doesn't matches Torch's torch.broadcast_tensors")



my_broadcast_tensors matches Torch's torch.broadcast_tensors



Example 3: Unbroadcastable tensors example

In [16]:
tensorA = torch.randint(10 , size=[2,1,4,5])
tensorB = torch.arange(5*2*3*1*7).reshape(5,2,3,1,7)

print("my broadcast_tensors results:")
my_broadcast_tensors(tensorA, tensorB)

print("\ntorch.broadcast_tensors results:")
try:
  torch.broadcast_tensors(tensorA, tensorB)
except RuntimeError as e:
  print(e)

my broadcast_tensors results:
The size of tensor a (5) must match the size of tensor b (7) at non-singleton dimension 4

torch.broadcast_tensors results:
The size of tensor a (5) must match the size of tensor b (7) at non-singleton dimension 4


Example 4: Unbroadcastable tensors example

In [17]:
tensorA = torch.randint(10 , size=[8,3,3,2,1,4,7])
tensorB = torch.arange(5*2*3*4*7).reshape(5,2,3,4,7)

print("my broadcast_tensors results:")
my_broadcast_tensors(tensorA, tensorB)

print("\ntorch.broadcast_tensors results:")
try:
  torch.broadcast_tensors(tensorA, tensorB)
except RuntimeError as e:
  print(e)

my broadcast_tensors results:
The size of tensor a (3) must match the size of tensor b (5) at non-singleton dimension 2

torch.broadcast_tensors results:
The size of tensor a (3) must match the size of tensor b (5) at non-singleton dimension 2
