I'm submitting the task with a week delay, using 1-7 delay days

Video - https://drive.google.com/file/d/16pZXmXpUv61Nbulge2dy2OZWhrrDtrro/view?usp=sharing

In [None]:
import torch
from torch import tensor, unsqueeze, cat, broadcast_tensors
from itertools import zip_longest

# Allowed methods squeeze unsqueeze cat stack x.reshape x.reshape_as x.clone


# Part A - implementing expand_as

In [165]:
def my_expand_as(A: tensor,B: tensor):
  """
    Expand tensor A to B's size
  """
  print("len", len(A.shape), len(B.shape))
  if len(B.shape) < len(A.shape):
    raise RuntimeError(f"The number of sizes provided {len(B.shape)} must be greater or equal to the number of dimensions in the tensor {len(A.shape)}")
  
  if len(A.shape) == 0 or len(B.shape) == 0:
    raise RuntimeError(f"One of the tensors has shape of size 0")

  # Verify dimenstions 
  print("A.shape", A.shape, "B.shape", B.shape)
  for A_i, B_i in zip(reversed(A.shape), reversed(B.shape)):
    print("A_i: ", A_i, "B_i: ", B_i)
    if A_i > B_i:
      raise RuntimeError(f"Number of sizes at {B_i} must be greater or equal to the number of sizes in the source tensor {A_i}")

    # Can't broadcast if size is too short or the target dimension doens't match A's non-1 dimension
    if (A_i != B_i) and A_i != 1:
      raise RuntimeError(f"The size of the target tenor {B_i} must be equal to {A_i} or {A_i} should be equal to 1")

  A = A.clone() 

  for i in range(len(B.shape) - len(A.shape)):
    A = unsqueeze(A,0)
  
  for i, (A_i, B_i) in enumerate(zip(reversed(A.shape), reversed(B.shape))):
    if A_i < B_i:
      A = cat([A for z in  range(B_i)], dim=len(B.shape) - i - 1)

  return A
  

### Sanity testing

In [None]:
def test_expand_as(A: tensor,B: tensor):
  original_expand_as_result = None
  try:
    original_expand_as_result = A.expand_as(B)
  except Exception as e:
    print(e)
  
  my_expand_as_result = None

  try:
    my_expand_as_result = my_expand_as(A, B)
  except Exception as e:
    print(e)

  if original_expand_as_result == None and my_expand_as_result == None:
    print(f"Test passed, both results are None")
    return
  
  if (original_expand_as_result == None and my_expand_as_result != None) or (original_expand_as_result != None and my_expand_as_result == None) :
    print(f"Test failed, original_expand_as_result: {original_expand_as_result}, my_expand_as_result: {my_expand_as_result}")
    return
  
  if torch.equal(original_expand_as_result, my_expand_as_result):
    print(f"Test passed with tensors {A}, {B}")
  else:
    print(f"Test Failed with tensors {A}, {B}")

A = torch.zeros(3,3) 
B = torch.zeros(3)

test_expand_as(B,A)
test_expand_as(A,B)

len 1 2
A.shape torch.Size([3]) B.shape torch.Size([3, 3])
A_i:  3 B_i:  3
Test passed with tensors tensor([0., 0., 0.]), tensor([[0., 0., 0.],
        [0., 0., 0.],
        [0., 0., 0.]])
expand(torch.FloatTensor{[3, 3]}, size=[3]): the number of sizes provided (1) must be greater or equal to the number of dimensions in the tensor (2)
len 2 1
the number of sizes provided 1 must be greater or equal to the number of dimensions in the tensor 2
Test passed, both results are None


# Part B - validate mutual broadcastability

In [None]:
def validate_mutual_broadcastability(A: tensor,B: tensor):
  if len(A.shape) == 0 or len(B.shape) == 0:
    raise RuntimeError(f"One of the tensors has shape of size 0")
    return None
  
  # sizes of the mutualy broadcasted shape
  result_size = []

  for A_i, B_i in zip_longest(reversed(A.shape), reversed(B.shape)):
    # If B is longer
    if A_i is None:
      result_size.insert(0, B_i)

    # If A is longer
    elif B_i is None:
      result_size.insert(0, A_i)
    elif A_i > B_i and B_i !=1 :
      return False, None, f"Can't mutualy broadcast, if one size isn't singleton, one size can't be greater, A_i: {A_i}, B_i: {B_i}"
    elif B_i > A_i and A_i !=1 :
      return False, None,f"Can't mutualy broadcast, if one size isn't singleton, one size can't be greater, A_i: {A_i}, B_i: {B_i}"
    else:
      result_size.insert(0, max(A_i, B_i))
  
  return True, torch.Size(result_size),""


## Sanity check of 

In [None]:
# Should work
A = torch.zeros(3,2,1)
B = torch.zeros(3,1,1,2)

print("A", A.shape, "B", B.shape)

is_broadcastible, size, err = validate_mutual_broadcastability(A,B)

print(is_broadcastible, size, err)

# Shouldn't work
A = torch.zeros(3,2,1)
B = torch.zeros(3,1,3,2)

print("A", A.shape, "B", B.shape)

is_broadcastible, size, err = validate_mutual_broadcastability(A,B)

print(is_broadcastible, size, err)

A torch.Size([3, 2, 1]) B torch.Size([3, 1, 1, 2])
True torch.Size([3, 3, 2, 2]) 
A torch.Size([3, 2, 1]) B torch.Size([3, 1, 3, 2])
False None Can't mutualy broadcast, if one size isn't singleton, one size can't be great, A_i: 2, B_i: 3


# Part C - Mutual broadcasting

In [None]:
def my_broadcast_tensors(A: tensor,B: tensor):
  is_broadcastible, size, err = validate_mutual_broadcastability(A,B)

  if not is_broadcastible:
    raise RuntimeError(f"Not mutualy broadcastible, err: {err}")

  temp_tensor = torch.zeros(size)

  return my_expand_as(A, temp_tensor),my_expand_as(B, temp_tensor)

## Sanity testing of my_broadcast_tensors

In [None]:
def test_broadcast_tensors(A: tensor,B: tensor):
  original_broadcast_tensors_result = (None, None)
  try:
    original_broadcast_tensors_result = broadcast_tensors(A, B)
  except Exception as e:
      print(e)
  
  my_broadcast_tensors_result = (None, None)

  try:
    my_broadcast_tensors_result = my_broadcast_tensors(A, B)
  except Exception as e:
    print(e)

  if original_broadcast_tensors_result == None and my_broadcast_tensors_result == None:
    print(f"Test passed, both results are None")
    return
  
  if (((original_broadcast_tensors_result[0] == None and my_broadcast_tensors_result[0] != None) or
      (original_broadcast_tensors_result[1] == None and my_broadcast_tensors_result[1] != None)) or
     ((original_broadcast_tensors_result[0] != None and my_broadcast_tensors_result[0] == None) or
      (original_broadcast_tensors_result[1] != None and my_broadcast_tensors_result[1] == None))):
    print(f"Test failed, original_expand_as_result: {original_broadcast_tensors_result}, my_expand_as_result: {my_broadcast_tensors_result}")
    return
  
  if ((original_broadcast_tensors_result[0] == None and my_broadcast_tensors_result[0] == None) and
      (original_broadcast_tensors_result[1] == None and my_broadcast_tensors_result[1] == None)):
    print("Test passed, both results are None")
    return
  if torch.equal(original_broadcast_tensors_result[0], my_broadcast_tensors_result[0]) and torch.equal(original_broadcast_tensors_result[1], my_broadcast_tensors_result[1]):
    print(f"Test passed with tensors {A.shape}, {B.shape}")
  else:
    print(f"Test Failed with tensors {A.shape}, {B.shape}")


A = torch.rand(3,2,1)
B = torch.rand(3,1,1,2)

test_broadcast_tensors(A,B)

len 3 4
A.shape torch.Size([3, 2, 1]) B.shape torch.Size([3, 3, 2, 2])
A_i:  1 B_i:  2
A_i:  2 B_i:  2
A_i:  3 B_i:  3
len 4 4
A.shape torch.Size([3, 1, 1, 2]) B.shape torch.Size([3, 3, 2, 2])
A_i:  2 B_i:  2
A_i:  1 B_i:  2
A_i:  1 B_i:  3
A_i:  3 B_i:  3
Test passed with tensors torch.Size([3, 2, 1]), torch.Size([3, 1, 1, 2])


# Part D - Testing all parts

## Test my_expand_as

In [None]:
A = torch.rand(1, 2, 3, 4, 5)
B = torch.rand(1, 1, 5)
print(A.shape, B.shape, sep='\n') 

test_expand_as(B,A)

# A is of size 5 and B is of size 3, we can't expand tensor to smaller size so it should fail 
test_expand_as(A,B)

A = torch.arange(18).reshape(3,6)
B = torch.arange(6).reshape(1,6)
print(A.shape, B.shape, sep='\n') 

test_expand_as(B,A)

# At the first dimension the size in A is 1, and in B is 3, so destination size is smaller than the source size so it should fail
test_expand_as(A,B)

A = torch.rand(2, 1, 5)
B = torch.rand(1, 1, 5)
print(A.shape, B.shape, sep='\n') 

# We're trying to make the first dimenstion smaller, so that should fail
test_expand_as(A,B)

torch.Size([1, 2, 3, 4, 5])
torch.Size([1, 1, 5])
len 3 5
A.shape torch.Size([1, 1, 5]) B.shape torch.Size([1, 2, 3, 4, 5])
A_i:  5 B_i:  5
A_i:  1 B_i:  4
A_i:  1 B_i:  3
Test passed with tensors tensor([[[0.1826, 0.8777, 0.4390, 0.9826, 0.8338]]]), tensor([[[[[0.8167, 0.7188, 0.1496, 0.5927, 0.9660],
           [0.5437, 0.2411, 0.9818, 0.3280, 0.4661],
           [0.1807, 0.4303, 0.2685, 0.5582, 0.2657],
           [0.8400, 0.8352, 0.4889, 0.3994, 0.9722]],

          [[0.6251, 0.6912, 0.2186, 0.4248, 0.0131],
           [0.1500, 0.8083, 0.8817, 0.5281, 0.7723],
           [0.3662, 0.0310, 0.3030, 0.8163, 0.6196],
           [0.2361, 0.9853, 0.3651, 0.5926, 0.9404]],

          [[0.5980, 0.1837, 0.3343, 0.7355, 0.6312],
           [0.8616, 0.6511, 0.5522, 0.1581, 0.6187],
           [0.9083, 0.2138, 0.0144, 0.0054, 0.0998],
           [0.7673, 0.4111, 0.6741, 0.8734, 0.8022]]],


         [[[0.3352, 0.0864, 0.3538, 0.2165, 0.0216],
           [0.4882, 0.7049, 0.9421, 0.6090, 0.1396],

## Test broadcast_tensors  

In [None]:
A = torch.zeros(3,2,1)
B = torch.zeros(3,1,1,2)

test_broadcast_tensors(B,A)

A = torch.zeros(3,2,1)
B = torch.zeros(3,4,1,2)

# At dimenstion 1, the size is non-singleton and 3 != 4 so they're not broadcastible and it shouldn't work
test_broadcast_tensors(B,A)

len 4 4
A.shape torch.Size([3, 1, 1, 2]) B.shape torch.Size([3, 3, 2, 2])
A_i:  2 B_i:  2
A_i:  1 B_i:  2
A_i:  1 B_i:  3
A_i:  3 B_i:  3
len 3 4
A.shape torch.Size([3, 2, 1]) B.shape torch.Size([3, 3, 2, 2])
A_i:  1 B_i:  2
A_i:  2 B_i:  2
A_i:  3 B_i:  3
Test passed with tensors torch.Size([3, 1, 1, 2]), torch.Size([3, 2, 1])
The size of tensor a (4) must match the size of tensor b (3) at non-singleton dimension 1
Not mutualy broadcastible, err: Can't mutualy broadcast, if one size isn't singleton, one size can't be great, A_i: 4, B_i: 3
Test passed, both results are None
