In [None]:
# Only execute on Colab
try:
    import google.colab

    IN_COLAB = True
except:
    IN_COLAB = False

if IN_COLAB:
    # Install packages
    %pip install einops

    # Code to make sure output widgets display
    from google.colab import output

    output.enable_custom_widget_manager()

    !wget -q https://github.com/EffiSciencesResearch/ML4G-2.0/archive/refs/heads/master.zip
    !unzip -o /content/master.zip 'ML4G-2.0-master/workshops/image_memory_network/*'
    !mv --no-clobber ML4G-2.0-master/workshops/image_memory_network/* .
    !rm -r ML4G-2.0-master

    print("Imports & installations complete!")

else:
    from IPython import get_ipython

    ipython = get_ipython()
    ipython.run_line_magic("load_ext", "autoreload")
    ipython.run_line_magic("autoreload", "2")

<a href="https://colab.research.google.com/github/EffiSciencesResearch/ML4G-2.0/blob/master/workshops/image_memory_network/image_memory_network_normal.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>
# Learning To Reproduce a Picture

In this exercise you will train a neural network to memorize a picture of your choice! Your network will implement a function from the $(x, y)$ coordinates of a pixel to three numbers $(R, G, B)$ representing the color of that pixel. Implement the `ImageMemorizer` network with three Linear layers and two ReLUs (generally, you don't want a ReLU after the last Linear layer). Test that your model matches the reference.

In [None]:
import torch
from torch import nn
from torch.functional import F
from PIL import Image
from pathlib import Path
import matplotlib.pyplot as plt
from torchvision import transforms
from torch.utils.data import DataLoader, TensorDataset
import einops
from tqdm.notebook import tqdm

import image_memoriser_tests as tests

In [None]:
class ImageMemorizer(nn.Module):
    """A simple MLP that takes the coordinates (x, y) of a pixel and outputs the pixel's RGB color."""

    def __init__(self, in_dim: int, hidden_dim: int, out_dim: int):
        # TODO: define the network's architecture.
        # Feel free to go back to the Pytorch basics' https://pytorch.org/tutorials/beginner/basics/intro.html
        ...  # TODO: ~22 words

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # TODO: implement the forward pass of the network
        ...  # TODO: ~4 words


tests.test_mlp(ImageMemorizer)

<details>
<summary>Show solution</summary>

```python
class ImageMemorizer(nn.Module):
    """A simple MLP that takes the coordinates (x, y) of a pixel and outputs the pixel's RGB color."""

    def __init__(self, in_dim: int, hidden_dim: int, out_dim: int):
        # TODO: define the network's architecture.
        # Feel free to go back to the Pytorch basics' https://pytorch.org/tutorials/beginner/basics/intro.html
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(in_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, out_dim),
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # TODO: implement the forward pass of the network
        return self.layers(x)


tests.test_mlp(ImageMemorizer)
```

</details>



Choose a picture and save it on the filesystem, or use the provided image. If your chosen image is much larger than 1 million pixels, crop it with `img.crop((left, top, right, bottom))` and/or resize it with `img.resize((width, height))`.



In [None]:
image_path = Path(".") / "w1d4_vangogh.jpg"
img = Image.open(image_path)
print(f"Image size in pixels: {img.size[0]} x {img.size[1]} = {img.size[0] * img.size[1]}")
plt.imshow(img);


## Data Preprocessing

Most of the work in training a neural network is getting the data in top condition first. The relevant saying is "garbage in, garbage out".

The `preprocess_image` function does the following:

- Use `transforms.ToTensor()(img)` to obtain a tensor of shape `(channels, height, width)`.
- Remove the fourth (alpha) channel if present and just use the first three channels which are R, G, B values.
- Build a tensor of all combinations of `(x, y)` from `(0, 0)` up to `(height, width)`. Then, scale these coordinates down to the range `[-1, 1]`. These will be the inputs to your model. Without scaling them down, the training would either be very slow or not work at all.
- Build a tensor of the corresponding RGB values and scale each color to the range `[-1, 1]`. These will be the labels.
- Return the inputs and labels wrapped in a `TensorDataset`. 

### TensorDataset
The class `torch.utils.data.dataset.TensorDataset` is a convenient wrapper for passing around multiple tensors that have the same size in the first dimension. The most common example of this is in supervised learning, where you have one tensor of inputs and a second tensor with corresponding labels. Often these tensors will have different `dtype`s, so it doesn't make sense to `torch.stack` them into one big tensor, and it be cumbersome to pass them around as separate variables or as a tuple.

`TensorDataset` accepts and stores any number of tensors in the constructor along with implementing `__getitem__` so that `my_dataset[n]` returns a tuple containing element `n` from each stored `Tensor`. Similarly, `my_dataset[:5]` returns a tuple containing the first five elements from each stored `Tensor`.

There's a bonus exercise at the end of this notebook to re-implement TensorDataset from scratch.

Execute the following cell, and make sure you understand the input and output of those functions.

In [None]:
def all_coordinates_scaled(height: int, width: int) -> torch.Tensor:
    """Return a tensor of shape (height*width, 2) where each row is a (x, y) coordinate.

    The range of x and y should be from [-1, 1] in both height and width dimensions.
    """
    xs = einops.repeat(torch.arange(width, dtype=torch.float32), "w -> (h w)", h=height) / width
    ys = einops.repeat(torch.arange(height, dtype=torch.float32), "h -> (h w)", w=width) / height
    return torch.stack((xs, ys), dim=1) * 2.0 - 1.0


def preprocess_image(img: Image.Image) -> TensorDataset:
    """Convert an image into a supervised learning problem predicting (R, G, B) given (x, y).

    Return: TensorDataset wrapping input and label tensors.
    input: shape (num_pixels, 2)
    label: shape (num_pixels, 3)
    """
    img_t = transforms.ToTensor()(img)[:3, :, :]
    _, height, width = img_t.shape
    X = all_coordinates_scaled(height, width)
    labels = einops.rearrange(img_t, "c h w -> (h w) c") * 2.0 - 1.0
    return TensorDataset(X, labels)


all_data = preprocess_image(img)

print("All coordinates:", all_coordinates_scaled(3, 4))
print("TensorDataset:", all_data, all_data.tensors)


### Train-Test Split

Next, we will randomly split the data into 
1. a training set that the model will use for computing gradients,
2. a validation set that will be used later for choosing hyperparameters, and 
3. a held-out test set that will tell us how well the model is generalizing. For validation and test statistics to be a reliable measure of generalization, it is necessary for the training set to not overlap with the validation or test sets.

This was relatively straightforward in the era of small datasets that could be thoroughly inspected by humans, but is increasingly an issue as models are trained on massive piles of haphazardly cleaned Internet data. When reading ML papers, it's important to evaluate the potential for "leakage" between sets.

You'll see rules of thumb online about how much of your data to use for training/validation/test sets, such as a "80%/10%/10% split". In deep learning, these are generally wrong. The size of the validation and test sets only need to be big enough that sampling error doesn't introduce too much noise into the resulting estimate.

For example, ImageNet has around 1.3 million training images and only 50K validation images. The percentage (under 4%) is irrelevant and what matters is that 50K is large enough in absolute terms to achieve some standard error of the mean. Implement `train_test_split` below to split the dataset as described.

Hint: use [`torch.randperm`](https://pytorch.org/docs/stable/generated/torch.randperm.html).


In [None]:
def train_test_split(
    all_data: TensorDataset, train_frac=0.8, val_frac=0.01, test_frac=0.01
) -> list[TensorDataset]:
    """Return [train, val, test] datasets containing the specified fraction of examples.

    If the fractions add up to less than 1, some of the data is not used.
    """

    ...  # TODO: ~34 words


all_data = preprocess_image(img)
train_data, val_data, test_data = train_test_split(all_data)
# If you used the default image, this should print
# 106396, 1329 and 1329 (±1 depending on how you rounded the fractions)
print(f"Dataset sizes: train {len(train_data)}, val {len(val_data)} test {len(test_data)}")

<details>
<summary>Show solution</summary>

```python
def train_test_split(
    all_data: TensorDataset, train_frac=0.8, val_frac=0.01, test_frac=0.01
) -> list[TensorDataset]:
    """Return [train, val, test] datasets containing the specified fraction of examples.

    If the fractions add up to less than 1, some of the data is not used.
    """

    n = len(all_data)
    perm = torch.randperm(n)
    start = 0
    out = []
    for frac in [train_frac, val_frac, test_frac]:
        split_size = int(n * frac)
        idx = perm[start : start + split_size]
        out.append(TensorDataset(*all_data[idx]))
        start += split_size
    return out


all_data = preprocess_image(img)
train_data, val_data, test_data = train_test_split(all_data)
# If you used the default image, this should print
# 106396, 1329 and 1329 (±1 depending on how you rounded the fractions)
print(f"Dataset sizes: train {len(train_data)}, val {len(val_data)} test {len(test_data)}")
```

</details>




### Visualizing the Training Data

Many times, I've made errors in the preprocessing step and not noticed because my model still trains and learns anyway, just at a lower accuracy than was possible. One way to reduce the chance of this happening is to inspect the preprocessed data carefully to see if it still makes sense.

We make a zero tensor of shape `(height, width, 3)` representing the grid of pixels, that we display with `plt.imshow`.

Just execute the following cell:


In [None]:
def to_grid(X: torch.Tensor, Y: torch.Tensor, width: int, height: int) -> torch.Tensor:
    """Convert preprocessed data from the format used in the Dataset back to an image tensor.

    X: shape (n_pixels, dim=2)
    Y: shape (n_pixels, channel=3)

    Return: shape (height, width, channels=3)
    """
    X = ((X + 1.0) / 2.0 * torch.tensor([width, height]) + 0.5).long()
    x_coords = X[:, 0]
    y_coords = X[:, 1]
    Y = (Y + 1.0) / 2.0
    grid = torch.zeros((height, width, 3))
    grid[y_coords, x_coords] = Y
    return grid


width, height = img.size
X, Y = train_data.tensors
plt.figure()
plt.imshow(to_grid(X, Y, width, height));

<details>
<summary> What are the black pixels in the image? </summary>

The black pixels are the pixels that are not in the train dataset. They are the pixels that will be used for validation and test... or won't be used at all because our fractions did not add up to 1.0.

You can also visualize the validation and test pixels.
</details>


## DataLoaders

Today, our `Dataset` is small enough to fit in memory, so we could just use `torch.randperm` on our training set to fetch random batches from it.

In general, we only want to load parts of our dataset as they're needed because our dataset may be too large to fit in memory, it may take too long to preprocess the entire dataset, or we may just want the GPU to be active as much as possible instead of waiting for data to be ready.

This is where `torch.DataLoader` comes in. A `DataLoader` instance is responsible for spawning multiple worker processes which load data in parallel and communicate back to the `DataLoader`. Ideally, the `DataLoader` can prepare the next batch while the GPU is processing the current one, eliminating GPU downtime.

We'll implement our own version of this another day when we're dealing with parallelism, and just use the PyTorch implementation today. We've provided DataLoaders with `shuffle=True` for the train loader. What would happen if you didn't shuffle the training data?

<details>
<summary>Answer - Shuffling Training Data</summary>

If our training data was sorted and we didn't shuffle it at least once, then the learning process could oscillate instead of converging. Suppose that the top half of the image was mostly blue sky and the bottom half was mostly green grass. The model would get gradients that first suggest "everything is mostly blue" and later "everything is mostly green" successively. In this case, we already used `randperm` above so our training data has been shuffled regardless.

In practice, SGD is relatively insensitive to whether you shuffle on every epoch, just once, or even sample each minibatch with replacement from the full dataset. For some theory behind this, see [this paper](https://arxiv.org/pdf/2106.06880.pdf).

</details>


In [None]:
train_loader = DataLoader(train_data, batch_size=256, shuffle=True)
val_loader = DataLoader(val_data, batch_size=256)
test_loader = DataLoader(test_data, batch_size=256)


Implement the `train_one_epoch` function below.

- Use the `to()` method of a `Tensor` to send the data to the device indicated by the global variable `device`.
- You can convert a one-element tensor to a regular Python number using the `item` method.

<details>

<summary>It's not working and I'm confused!</summary>

- Did you remember to call `optimizer.zero_grad()` before each forward pass?
- Does `model.parameters()` return what you expect?
- Are you calling `backward()` on the mean loss over the batch items? Note that if you don't use the mean, the magnitude of the gradients scales up linearly with the batch size, which is not what you want.

</details>


In [None]:
def train_one_epoch(model: ImageMemorizer, dataloader: DataLoader) -> float:
    """Show each example in the dataloader to the model once.

    Use `torch.optim.Adam` for the optimizer.
    Use `F.l1_loss(prediction, actual)` for the loss function. This just puts less weight on very bright or dark pixels, which seems to produce nicer images.

    Return: the average loss per example seen, i.e. sum of losses of each batch weighted by the size of the batch, divided by the total number of examples seen
    """

    ...  # TODO: ~56 words


tests.test_train(train_one_epoch)


def evaluate(model: ImageMemorizer, dataloader: DataLoader) -> float:
    """Return the total L1 loss over the provided data divided by the number of examples."""
    ...  # TODO: ~42 words


tests.test_evaluate(evaluate)

<details>
<summary>Show solution</summary>

```python
def train_one_epoch(model: ImageMemorizer, dataloader: DataLoader) -> float:
    """Show each example in the dataloader to the model once.

    Use `torch.optim.Adam` for the optimizer.
    Use `F.l1_loss(prediction, actual)` for the loss function. This just puts less weight on very bright or dark pixels, which seems to produce nicer images.

    Return: the average loss per example seen, i.e. sum of losses of each batch weighted by the size of the batch, divided by the total number of examples seen
    """

    model.train()  # Does nothing on this particular model, but good practice to have it
    optim = torch.optim.Adam(model.parameters())
    loss_sum = 0.0
    datapoint_seen = 0
    for X, y in tqdm(dataloader):
        optim.zero_grad()
        pred = model(X)
        loss = F.l1_loss(pred, y)
        loss.backward()
        optim.step()
        datapoint_seen += len(X)
        loss_sum += loss.item() * len(X)
    return loss_sum / datapoint_seen


tests.test_train(train_one_epoch)


def evaluate(model: ImageMemorizer, dataloader: DataLoader) -> float:
    """Return the total L1 loss over the provided data divided by the number of examples."""
    with torch.inference_mode():
        model.eval()  # Does nothing on this particular model, but good practice to have it
        loss_sum = 0.0
        datapoint_seen = 0
        for X, y in dataloader:
            loss_sum += F.l1_loss(model(X), y).item() * len(X)
            datapoint_seen += len(X)
        return loss_sum / datapoint_seen


tests.test_evaluate(evaluate)
```

</details>




The following cell creates a model with 400 neurons in each hidden layer and trains it for an epoch.

If no errors appeared, do a few more epochs and plot the training loss and validation loss over time as a function of number of epochs. Compute the validation loss using your `evaluate` function. I was able to reach a validation loss below 0.2 after 40 epochs. Your image might be easier or harder to learn.


In [None]:
model = ImageMemorizer(2, 400, 3)
train_losses = []
val_losses = []

num_epochs = 1  # Increase this a bit if/once it works
bar = tqdm(range(num_epochs))
for epoch in bar:
    train_losses.append(train_one_epoch(model, train_loader))
    val_loss = evaluate(model, val_loader)
    bar.set_description(f"val loss: {val_loss:.3f}")
    bar.refresh()
    val_losses.append(val_loss)

fig, ax = plt.subplots()
ax.plot(train_losses, label="Training loss")
ax.plot(val_losses, label="Validation loss")
ax.set(xlabel="Epochs", ylabel="L1 Loss")
fig.legend();



Finally, execute this cell to display the image your network has memorized:


In [None]:
X = all_coordinates_scaled(height, width)
with torch.inference_mode():
    Y = model(X).cpu()
grid = to_grid(X, Y, width, height)
grid.clip_(0, 1)
fig, ax = plt.subplots(figsize=(12, 12))
ax.imshow(grid)
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)

# ax.set_position([0, 0, 1, 1])
# fig.savefig("w1d4_vangogh_solution.jpg")


Share your image with your friends if you like it! Here's the one my network learned:

![Alt text](https://github.com/EffiSciencesResearch/ML4G/blob/main/mlab/w1d4_vangogh_solution.jpg?raw=true "a title")

# Bonus: Re-implement more tools from the pytorch library

## Build Your Own TensorDataset

The class `torch.utils.data.dataset.TensorDataset` is a convenient wrapper for passing around multiple tensors that have the same size in the first dimension. The most common example of this is in supervised learning, where you have one tensor of inputs and a second tensor with corresponding labels. Often these tensors will have different `dtype`s, so it doesn't make sense to `torch.stack` them into one big tensor, and it be cumbersome to pass them around as separate variables or as a tuple.

`TensorDataset` accepts and stores any number of tensors in the constructor along with implementing `__getitem__` so that `my_dataset[n]` returns a tuple containing element `n` from each stored `Tensor`. Similarly, `my_dataset[:5]` returns a tuple containing the first five elements from each stored `Tensor`.

### Slice Objects in Python

`slice` is a built-in type containing `start`, `stop`, and `step` fields which can be integers or `None`. Given `x=[1,2,3,4,5,6,7]`, writing `x[1:5:2]` is syntactic sugar for `x[slice(1, 5, 2)]`.

### Dunder (Magic) Methods in Python

`__getitem__` is an example of a "dunder" or "magic" method in Python. These are methods that are called implicitly by Python in certain situations. For example, `x + y` is syntactic sugar for `x.__add__(y)`. `__getitem__` is called when you write `x[y]` and `__len__` is called when you write `len(x)`.

In [None]:
class TensorDataset:
    def __init__(self, *tensors: torch.Tensor):
        """Validate the sizes and store the tensors in a field named `tensors`."""
        ...  # TODO: ~24 words

    def __getitem__(self, index: int | slice) -> tuple[torch.Tensor, ...]:
        """Return a tuple of length len(self.tensors) with the index applied to each."""
        ...  # TODO: ~9 words

    def __len__(self):
        """Return the size in the first dimension, common to all the tensors."""
        ...  # TODO: ~6 words


# if MAIN:
# w1d4_part1_test.test_tensor_dataset(TensorDataset)

<details>
<summary>Show solution</summary>

```python
class TensorDataset:
    def __init__(self, *tensors: torch.Tensor):
        """Validate the sizes and store the tensors in a field named `tensors`."""
        if tensors:
            size = tensors[0].shape[0]
            assert all(
                tensor.shape[0] == size for tensor in tensors
            ), "Size mismatch between tensors"
        self.tensors = tensors

    def __getitem__(self, index: int | slice) -> tuple[torch.Tensor, ...]:
        """Return a tuple of length len(self.tensors) with the index applied to each."""
        return tuple(tensor[index] for tensor in self.tensors)

    def __len__(self):
        """Return the size in the first dimension, common to all the tensors."""
        return self.tensors[0].shape[0]


# if MAIN:
# w1d4_part1_test.test_tensor_dataset(TensorDataset)
```

</details>

