# Exercise 11 - Broadcasting
### Zvi Badash 214553034

As instructed, I recorded a video of myself explaining the exercise and the code I wrote for it.
The video unfortunately has a watermark, but I hope it won't be in the way of understanding the code.

[Exercise 11 - Broadcasting](https://youtu.be/KXYBFXhQpkE)

In [33]:
import torch
from typing import Tuple, Optional, List

In [34]:
Tensor = torch.Tensor
Size = torch.Size

In [35]:
## Copied from chatGPT ##

def print_with_color(color: int, msg: str) -> None:
    """
    Prints a message with a given color.
    :param color: 1 for red, 2 for green, 3 for yellow, 4 for blue, 5 for magenta, 6 for cyan, 7 for white
    :param msg: the message to print
    """
    print(f'\033[38;5;{color}m{msg}\033[0m')

## Question A - ```expand_as```

In [36]:
def expand_as(A: Tensor, B: Tensor, suppress_len_check: bool=False) -> Tensor:
    """
    Expands tensor A to the size of tensor B.
    :param A: The tensor to expand
    :param B: The tensor to expand to
    :param suppress_len_check: If True, the function will not check if A has more dimensions than B
    :return: The expanded tensor
    """

    # Define A's and B's sizes for later use
    A_size: Size = A.shape
    B_size: Size = B.shape

    # Check if A can be expanded to B's size
    ### Check if A has more dimensions than B
    if not suppress_len_check and len(A_size) > len(B_size):
        raise RuntimeError(f'The number of sizes provided for tensor B ({len(B_size)}) must be greater or equal to the number of dimensions in tensor A ({len(A_size)})')
   
    ### Check if some dimensions are incompatible
    reversed_sizes_zip = zip(A_size[::-1], B_size[::-1])
    for i, (a, b) in enumerate(reversed_sizes_zip): # Loop through the sizes in reverse
        if (a != 1 and b != 1 and a != b) or a > b:
            raise RuntimeError(f'The size of tensor A ({a}) must match the size of tensor B ({b}) at non-singleton dimension {i}')

    # Clone A and unsqueeze it along missing A dimensions (Append 1 sized dimensions)
    expanded_tensor = A.clone()
    for _ in range(len(B_size) - len(A_size)):
        expanded_tensor = torch.unsqueeze(expanded_tensor, 0)

    # Find dimensions that needs repeating along
    broadcast_along_dims = [i for i, (c, b) in enumerate(zip(expanded_tensor.shape, B_size)) if c == 1]

    # Repeat the tensor along each dimension
    for dim in broadcast_along_dims:
        expanded_tensor = torch.cat([expanded_tensor] * B_size[dim], dim)
    
    # Return the expanded tensor
    return expanded_tensor

## Question B - ```is_broadcastable```

In [37]:
def are_broadcastable(A: Tensor, B: Tensor) -> Tuple[bool, Optional[Size]]:
    """
    Checks if tensors A and B can be broadcasted together.
    :param A: The first tensor
    :param B: The second tensor
    :return: A tuple of a boolean and a torch.Size object.
             The boolean indicates if the tensors can be broadcasted together.
             The torch.Size object is the broadcasted size of the tensors.
    """

    # Define A's and B's sizes for later use
    A_size: List = list(A.shape)
    B_size: List = list(B.shape)

    # Check that A can be broadcasted to B's size
    ### Check if some dimensions are incompatible
    reversed_sizes_zip = list(zip(A_size[::-1], B_size[::-1]))
    for i, (a, b) in enumerate(reversed_sizes_zip): # Loop through the sizes in reverse
        if a != 1 and b != 1 and a != b:
            return False, Size() # Return an empty size and False

    # Determine the broadcasted tensor size
    ### Insert 0s at the beginning of the shorter dimension size
    if len(A_size) != len(B_size):
        shorter = A_size if len(A_size) < len(B_size) else B_size
        while len(A_size) != len(B_size):
            shorter.insert(0, 0)

    # At this point both tensors must have the same number of dimensions
    # (Either because they had the same length to begin with or we appended
    # enough 0s to the shorter tensor sizes)
    assert len(A_size) == len(B_size)

    # Return the broadcasted tensor size
    return True, torch.Size([max(a, b) for a, b in zip(A_size, B_size)])

## Question C - ```broadcast_tensors```

In [38]:
def broadcast_tensors(A: Tensor, B: Tensor) -> Tuple[Tensor, Tensor]:
    """
    Broadcasts tensors A and B together.
    :param A: The first tensor
    :param B: The second tensor
    :return: The broadcasted tensors
    """
    _, broadcasted_size = are_broadcastable(A, B) # Get the broadcasted size

    # Expand A and B to the broadcasted size and return them
    return expand_as(A, torch.empty(broadcasted_size), suppress_len_check=True), \
           expand_as(B, torch.empty(broadcasted_size), suppress_len_check=True)

## Question D - Testing

### In the following test I'm generating two random tensors (using `randn`) and I try to first expand them to each-others sizes and then broadcast them together

I also check if operations between the broadcasted tensors is the same as between the original tensors (in that case the operation has been automatically 
lifted by `torch`).


In [39]:
# Create tests list
As = []
Bs = []

### For the first simple test, I chose two random tensors `A` and `B` which have the same shape.


In [40]:
# First test
As.append(torch.randn(3, 2))
Bs.append(torch.randn(3, 2))

### For the second test, I chose two random tensors `A` and `B` which are broadcastable but `A` can't be expanded to `B`'s size (or vice versa).


In [41]:
# Second test
#                        |     |
#                        ⌄     ⌄
As.append(torch.randn(1, 2, 4, 1))
Bs.append(torch.randn(4, 1, 4, 6))

### For the third test, I chose two random tensors `A` and `B` with a different number of dimensions to make sure this capability works as well.


In [42]:
# Third test
#                              |     |
#                              ⌄     ⌄
As.append(torch.randn(         1, 7, 2))
Bs.append(torch.randn(4, 1, 5, 6, 7, 1))

### For the last test, I chose two random tensors `A` and `B` with a different number of dimensions, such that `A` can be expanded to `B`'s size but `B` can't be expanded to `A`'s size.


In [43]:
# Fourth test
As.append(torch.randn(         1, 7, 2))
Bs.append(torch.randn(4, 1, 5, 6, 7, 2))

In [44]:
for A, B in zip(As, Bs):
    print(f'*** Checking tensors of shape {A.shape} and {B.shape} ***')

    # Check if the tensors can be expanded to each-others sizes
    try:
        if torch.all(A.expand_as(B) == expand_as(A, B)).item():
            print_with_color(2, f'\t+ Expansion from tensor of shape {A.shape} to shape {B.shape} succeeded')
    except RuntimeError:
        print_with_color(1, f'\t- Expansion between tensor of shape {A.shape} to shape {B.shape} failed')

    try:
        if torch.all(B.expand_as(A) == expand_as(B, A)).item():
            print_with_color(2, f'\t+ Expansion from tensor of shape {B.shape} to shape {A.shape} succeeded')
    except RuntimeError:
        print_with_color(1, f'\t- Expansion between tensor of shape {B.shape} to shape {A.shape} failed')

    # Check if the broadcasted tensors are of correct size
    broadcastable, size = are_broadcastable(A, B)
    if broadcastable:
        assert (A + B).shape == size
    else:
        print_with_color(1, f'\t- Tensors of shapes {A.shape} and {B.shape} are non-compatible for broadcasting.')

    # Check that the broadcasting itself is correct
    if broadcastable:
        C, D = broadcast_tensors(A, B)
        assert torch.all(A + B == C + D).item()
        assert torch.all(A * B == C * D).item()
        assert torch.all(A / B == C / D).item()
        print_with_color(2, f'\t+ Broadcasting between tensors of shapes {A.shape} and {B.shape} succeeded')

*** Checking tensors of shape torch.Size([3, 2]) and torch.Size([3, 2]) ***
[38;5;2m	+ Expansion from tensor of shape torch.Size([3, 2]) to shape torch.Size([3, 2]) succeeded[0m
[38;5;2m	+ Expansion from tensor of shape torch.Size([3, 2]) to shape torch.Size([3, 2]) succeeded[0m
[38;5;2m	+ Broadcasting between tensors of shapes torch.Size([3, 2]) and torch.Size([3, 2]) succeeded[0m
*** Checking tensors of shape torch.Size([1, 2, 4, 1]) and torch.Size([4, 1, 4, 6]) ***
[38;5;1m	- Expansion between tensor of shape torch.Size([1, 2, 4, 1]) to shape torch.Size([4, 1, 4, 6]) failed[0m
[38;5;1m	- Expansion between tensor of shape torch.Size([4, 1, 4, 6]) to shape torch.Size([1, 2, 4, 1]) failed[0m
[38;5;2m	+ Broadcasting between tensors of shapes torch.Size([1, 2, 4, 1]) and torch.Size([4, 1, 4, 6]) succeeded[0m
*** Checking tensors of shape torch.Size([1, 7, 2]) and torch.Size([4, 1, 5, 6, 7, 1]) ***
[38;5;1m	- Expansion between tensor of shape torch.Size([1, 7, 2]) to shape to