**Aeronautics Institute of Technology – ITA**

**Computer Vision – CM-203**

**Professors:** 

Marcos Ricardo Omena de Albuquerque Maximo

Gabriel Adriano de Melo


**Instructions:**

Before submitting your lab, be sure that everything is running correctly (in sequence): first, **restart the kernel** (`Runtime->Restart Runtime` in Colab or `Kernel->Restart` in Jupyter). Then, execute all cells (`Runtime->Run All` in Colab or `Cell->Run All` in Jupyter) and verifies that all cells run without any errors, expecially the automatic grading ones, i.e. the ones with `assert`s.

**Do not delete the answer cells**, i.e. the ones that contains `WRITE YOUR CODE HERE` or `WRITE YOUR ANSWER HERE`, because they contain metadata with the ids of the cells for the grading system. For the same reason, **do not delete the test cells**, i.e. the ones with `assert`s. The autograding system executes all the code sequentially, adding extra tests in the test cells. There is no problem in creating new cells, as long as you do not delete answer or test cells. Moreover, keep your solutions within the reserved spaces.

The notebooks are implemented to be compatible with Google Colab, and they install the dependencies and download the datasets automatically. The commands which start with ! (exclamation mark) are bash commands and can be executed in a Linux terminal.

---

# Redes Neurais Convolucionais (CNNs)

Neste lab já começaramos e entender o que o FastAI faz por debaixo dos panos, isto é, desvendar os segredos do classificador Deep do Lab 1. Isto é, vamos explorar essas '3 linhas' de funções para definir e instanciar o nosso conjunto de dados (1ª linha), definir a arquitetura de rede com transfer learning (2ª linha) e efetivamente realizar o treinamento (3ª linha).

```python
dados = DataBlock(
    blocks=(ImageBlock, CategoryBlock),
    get_items=get_image_files,
    splitter=RandomSplitter(valid_pct=0.2, seed=19),
    get_y=parent_label,
    item_tfms=[Resize(192, method='squish')],
    batch_tfms=aug_transforms()
).dataloaders(base_path/'train')
caixa_preta = vision_learner(dados, resnet18, metrics=error_rate)
caixa_preta.fine_tune(1)
```

A ideia do lab é implementarmos uma versão bem simplificada do que o FastAI realiza, para entendermos a essência das partes principais de seu funcionamento, sem ter que gastar tempo implementando outras funções auxiliares.

**CUIDADO!!!**

Neste lab você já vai adquirir conhecimento suficiente para se tornar perigoso, utilize o conhecimento com cuidado!

Brincadeiras a parte (mas é verdade mesmo), as funções implementadas aqui no lab são versões muito simplificadas das presentes nas bibliotecas, em produção opte por usar as funções já prontas da biblioteca do que implementar por si próprio.

Novamente revisitando os feedbacks dos labs anteriores, deixamos o lab mais passo a passo e mais rápido (com esperança). Então atente-se: é possível que você consiga completar o lab sem entender os conceitos da forma como gostaríamos, todo o código escrito pelos professores tem um propósito, e gostaríamos que você lesse e entendesse cada linha do que ele faz (a menos que especificado o contrário, por exemplo, códigos de plot).

**SpeedRunners** : primeiro tome um tempo (sem contar/antes do seu speedrun) para apreciar o lab.

## Imports and data downloading

Todos os nossos imports serão realizados automaticamente pela FastAI. Enquanto ma maior parte das bibliotecas pode ser uma má ideia utilizar um star import (`*`) pois irá poluir o seu ambiente, o FastAI toma cuidado de limitar o que é exportado (importado pelo `*`) por meio da definição da variável `__all__` dentro de um módulo. Assim, tudo que é exportado/importado por ele é intencional.

In [None]:
from fastai.vision.all import *

O nosso dataset é um superset do Lab 1, com mais imagens que não estão enviesadas pela luminosidade. Esses comandos de bash iniciados por ponto de exclamação você não precisa se preocupar.

Eu coloco essa boleana chamada `RETRAIN` porque na correção do lab eu seto o valor dela igual a Falso para não precisar executar o treinamento final do notebook e exibir algumas imagens.

In [None]:
# dataset_lab read_only

! [ ! -d "/content/cats_vs_dogs2" ] && gdown -O /content/cats_vs_dogs2.zip "1com0nmog6FJ2KVfaGvbSPuC1j1vewNnx" &&  unzip -q /content/cats_vs_dogs2.zip -d /content && rm /content/cats_vs_dogs2.zip
! [ ! -d "/content/cats_vs_dogs2" ] && wget -P /content/ "http://ia.gam.dev/cm203/23/lab05/cats_vs_dogs2.zip" &&  unzip -q /content/cats_vs_dogs2.zip -d /content  && rm /content/cats_vs_dogs2.zip
base_path = Path("/content/cats_vs_dogs2")
RETRAIN = True
%cd /content

O device padrão a ser utilizado no treinamento:

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

## Dataset (2 points)

**Explicação sobre o assunto**

A primeira parte do lab é definir como os dados devem ser carregados e pre-processados desde o momento de sua leitura (em disco ou memória) até estar no formato exato a ser entregue ao modelo neural.

O FastAI define uma classe básica chamada de `DataBlock` que funciona como uma Fábrica Abstrata para definir o `Dataset`. A ideia é que o `DataBlock` define uma *blueprint* de como um dataset deve ser, isto é, se ele é composto de imagens `ImageBlock`, de categorias `CategoryBlock`, de bounding boxes `BBoxBlock`, de pontos `PointBlock`, de texto `TextBlock`, de categorias não-exclusivas `Block`, de máscaras `MaskBlock`, entre outros.

Dessa forma, temos uma API de alto nível que consegue definir uma ampla gama de possíveis combinações de tipos de dados que você estiver trabalhando.

Contudo, para simplificar o lab, e por se tratar apenas de um problema de classificação de imagens, vamos trabalhar apenas com um `ImageBlock` e um `CategoryBlock` e por isso já iremos criar um `DataBlock` simplificado específico apenas para .

In [None]:
class ImageClassDataBlock:
    def __init__(self, get_items, splitter, get_x, get_y, item_tfms, batch_tfms):
        self.get_items = get_items
        self.splitter = splitter
        self.get_x = get_x
        self.get_y = get_y
        self.item_tfms = Pipeline(item_tfms)
        self.batch_tfms = Pipeline(batch_tfms)

Para evitar desgaste desnecessário, não vamos implementar as funções `get_items`, `splitter`, `get_x`, `get_y`, `item_tfms`, `batch_tfms`, mas vamos entender como elas funcionam (argumentos e valores de retorno) justamente para podermos construir o pipeline:

Em primeiro lugar, temos a função `get_items`, que em sua forma genérica, recebe um 'ponteiro' para os dados (pode ser para uma matriz de dados na memória ou um caminho no sistema de arquivos em disco) e retorna uma lista com os 'ponteiros' individuais para cada item do seu conjunto de dados.

Por exemplo, neste laboratório iremos ler os dados do disco, então passamos um caminho de uma pasta com as imagens e recebemos de volta uma lista com o caminho de cada imagem. Veja o exemplo abaixo da função `get_image_files` do FastAI que retorna todas os arquivos de imagem de uma pasta (e subpastas recursivamente).

In [None]:
get_image_files(base_path)

Um bizu que eu não me lembro se escrevi mas pelo menos eu havia tentado falar, nos notebooks você pode usar o `??` antes ou depois do nome de uma função para abrir o `help` dela

In [None]:
??get_image_files

Observe acima que até a capivara, que estava na raiz do diretório também foi encontrada, são todas os arquivos de imagens. Inclusive esse foi um erro que alguns alunos cometeram no Lab 1 que só percebi agora, em vez de passar apenas o `base_path/'train'` para o learner, alguns passaram o `base_path` todo e ele não faz a divisão das subpastas `train`, `val`, `fora_distribuicao` como eu havia suposto e apenas pega todas as imagens, o que acabaria pegando também uma suposta pasta de `test` que estaria dentro desse diretório e enviesando os resultados. A menos que você deixasse o splitter com None, nesse caso ele tentaria usar o `GrandParentSplitter` que verifica se há pastas com o nome de `train` e `valid`.

Agora para o próximo item do pipeline, temos a nossa função `splitter`, 

Veja que o random spliter cria uma função específica com os hiperparâmetros da semente aleatória e a fração de dados a ser utilizada na validação.

In [None]:
random_splitter = RandomSplitter(valid_pct=0.2, seed=19)
random_splitter

Veja o exemplo de sua aplicação abaixo, ele recebe uma lista de qualquer coisa e retorna os índices dos items repartidos entre treino e validação. Obs: `L` é uma lista melhorada do FastAI que dá para indexar os índices tal qual no NumPy.

In [None]:
lista_items = L('ola0', 'tudo1', 'bem2', 'com3', 'você4', 'eu5', 'sou6', 'uma7', 'lista8', 'de9', 'qualquer10', 'coisa11', 'imagine12', 'caminhos13', '/root/teste/img14.png')
idx_train, idx_val = random_splitter(lista_items)
idx_train, idx_val, lista_items[idx_val]

A função `get_x` vai receber um item (no formato de ponteiro) e retorná-la no formato de tensor. No FastAI ele já faz isso automaticamente com base nos `blocks` que você passou para ele, por isso o `get_x` dele default é `None`, na na realidade é substituído por uma função identidade `lambda x: x`.

No nosso caso, como não temos essa criação e inferência automática dos tipos pelos `blocks`, estamos apenas simplificando, vamos carregar manualmente a imagem do caminho no formato de uma imagem Pillow, que é o que o FastAI usa. O Jeremy gosta de redefinir os tipos das bibliotecas para acrescentar novas funcionalidades, por exemplo verificar o tipo durante as transformações que veremos a seguir (o `ToTensor()`).
Se for usar o tipo primitivo do Pillow, não esqueça o `.copy()` pois quando carrega uma Imagem no PIL ele guarda os metadados (`PIL.JpegImagePlugin.JpegImageFile`) mas o FastAI faz coerção dos tipos e espera que seja simplesmente (`PIL.Image.Image`).

In [None]:
def path_to_pil_img(path: Path):
    return PILImage.create(path) # Image.open(path).copy()

capivara = path_to_pil_img(base_path / 'capivara.jpg')
type(capivara)

A função `get_y` também faz um trabalho semelhante ao `get_x`, só que agora é o Label. Para não quebrar tanto a compatibilidade quanto no caso do `get_x`, vamos mantê-lo retornando uma string, que no caso é o nome da pasta.

In [None]:
caminho_item = base_path/'train'/'cat'/'482.jpg'
parent_label(caminho_item), caminho_item.parent.name

O `item_tfms` do FastAI realiza a transformação de um único item. O item já deve estar carregado no formato correto, no nosso caso, uma `PIL.Image`.

In [None]:
item_transform_simples = Resize(192, method='squish')
item_transform_simples

Veja ela operando sobre a imagem PIL (que deve ser do tipo `PIL.Image.Image`)

In [None]:
item_transform_simples(capivara)

Podemos também compor várias transformações por meio de um Pipeline que retorna a função resultante da aplicação sequencial dos seus componentes:

In [None]:
item_transform  = Pipeline([Resize(192, method='squish'), ToTensor()])
item_transform

Observe agora que o nosso resultado é um Tensor, pois colocamos a transformação `ToTensor()` no final. No nosso caso em que estamos 'reimplementando' o FastAI é necessário colocar essa transformação explicitamente, mas ele já faz isso automaticamente quando você o usa.

Perceba que pelo fato da nossa capivara ser uma imagem do tipo `fastai.vision.core.PILImage` ele já é esperto para usar RGB e colocar os canais primeiro, com shape `[3, 192, 192]`. Mas o dtype ainda continua `uint8`.

In [None]:
capivara_tensor = item_transform(capivara)
capivara_tensor, capivara_tensor.shape

Uma coisa que pode confundi-lo (não aqui no lab), mas na sua vida real quando estiver usando a biblioteca, é que o `item_tfms` é chamado para cada elemento do argumento `blocks`, ou seja, tanto para a imagem quanto para o label, e isso acontece de forma separada se for uma função. Por isso ele faz verificação e coerção de tipos, porque você pode passar qualquer besteira para ele e ele vai retorna o próprio valor de entrada. Quando você passa uma tupla, ela opera nos valores da tupla (atenção, deve ser tupla por causa da coerção de tipo e não lista). Agora se for um objeto subclasse do `ItemTransform` ele recebe a tupla inteira e aplica da forma como quiser.

In [None]:
item_transform('capimisc'), item_transform_simples((capivara, 'capimisc'))

Agora o `batch_tfms` é aplicada já em um batch de dados, diferentemente do item_transform que é aplicado sobre um item individualmente. Dessa forma ele é melhor quando você for alterar os dados na forma de tensor e não na forma arbitrária (de imagem pillow ou string). A aplicação dele é semelhante às tuplas e verificação e coerção de tipos, de tal forma que o exemplo abaixo só muda os valores dos tipos `TensorImage`, `TensorMask`, `TensorBBox`, `TensorPoint`. E para que ele mude simultaneamente deve-se passar uma tupla com esses tensores 

In [None]:
batch_transforms = Pipeline([IntToFloatTensor()] + aug_transforms())
[IntToFloatTensor()] + aug_transforms(), batch_transforms

Como é uma transformação de batches, a dimensão esperada é de `(Batches, Channels, Height, Width)` temos que usar a nossa indexação com `None` para criar um novo eixo unitário das dimensões de batch:

In [None]:
capi_aug = batch_transforms(capivara_tensor[None])
capi_aug, capi_aug.shape

O `.show()` dessa TensorImagem apenas realoca a imagem em CPU e transpoẽm os canais de volta para virar (H, W, C) e cria uma imagem do Pillow

In [None]:
if RETRAIN:
    capi_aug[0].show()

Para os labels categóricos, o FastAI tem um `CategoryBlock`, mas para simplificar e também entender como ele funciona, implementaremos essa funcionalidade manualmente por meio de um dicionário que relaciona o label (poder ser qualquer coisa, no caso é string) com um tensor. Para classificação simples com vários labels, em geral se utiliza o one-hot encodding, no qual é uma representação da distribuição de probabilidade entre os labels que é de 100% para apenas um.

In [None]:
TensorCategory(10)

Para este lab não é necessário, mas para a vida, se você for usar a biblioteca FastAI e precisar criar a suas próprias funções de transformações, seja de itens ou de batches, dê uma olhada na documentação: [Data block tutorial](https://docs.fast.ai/tutorial.datablock.html), [Custom transforms](https://docs.fast.ai/tutorial.albumentations.html)

Pode ver o código (e copiar) do FastAI sem problemas, embora a implementação completa deles é muito mais geral do que a requerida para esse lab.

**Enunciado da Questão**

Conforme descrito acima, implemente a função `transform` abaixo. Ela representa o pipeline da transformação de um único item do seu formato de ponteiro para o seu formato final de tensor já transformado. No nosso caso concreto ele recebe um caminho `Path` de uma imagem e retorna essa imagem já carregada e transformada, no formato de `torch.tensor` e conjunto com o seu label também já no formato de tensor.

**NÃO** use LLMs (ChatGPT) para pegar a resposta pronta.

**Pode** olhar a documentação das bibliotecas (PyTorch mas todas as funções que você precisa está nas **dicas**) e **pode** (aconselhado) olhar o material de aula (slides e referências).

<details><summary><b>Dica para a resposta</b></summary>
<p>
Basta chamar as funções com os argumentos corretos, coletando os seus valores de retorno e atribuindo para o próximo estágio do pipeline segundo a sua lógica de implementação explicada na descrição. Atente-se que é apenas um item. Acesse os atributos pertinentes do objeto, o `self.item_tfms` por exemplo.

Eu consegui implementar em uma linha, e se você for muito esperto vai ver que também a solução dessa questão está dentro de um dos testes de uma questão posterior.
</p>
</details>

In [None]:
# questao_dataset autograded_answer

@patch_to(ImageClassDataBlock)
def transform(self: ImageClassDataBlock, item, label_to_tensor: dict[any, torch.tensor]) -> tuple[torch.tensor, torch.tensor]:
    """ Realiza a transformação de apenas um item e retorna tensores (se as transformações retornarem tensores)
    Dado o item, você primeiro precisa extrair os valores iniciais de x e y.
    Depois para o y que é o label, você precisa mapeá-lo a um tensor utilizado o seu dicionário
    Por fim você deve realizar a transformação conjunta e retorna o resultado final.
    
    Args:
        self: É uma referência ao próprio objeto, imagina que este método está dentro da classe
        item: É o item a ser processado, pode ser de qualquer tipo desde que seja aceito por get_x e get_y
        label_to_tensor: É um mapeamento que associa o label retornado pelo get_y a um tensor
    
    Returns:
        Tupla com os tensores x e y. Mas quem realmente garante que é tensor são as transformações.
    """
    # WRITE YOUR CODE HERE! (you can delete this comment, but do not delete this cell so the ID is not lost)
    raise NotImplementedError()

Se você usou uma LLM, escreva a sua conversa com ela aqui nesta própria célula de texto (copie a conversa inteira) ou exporte o link da conversa:

**Escreva aqui**

Lembre-se que os testes também são documentação. Agradeço ao feedback de um aluno que sugeriu que os testes fossem quebrados e explicados passo-a-passo. (Se tiverem sugestões não esqueçam de escrevê-las).

Nesse primeiro teste iremos usar operações mais simples possível: função identidade (`noop`) que retorna exatamente o mesmo valor de entrada e uma função `cte` que retorna sempre o mesmo tensor constante não importa o argumento de entrada.

Nós passamos um item definido por uma string 'ok' e um dicionário que mapeia o retorno do `get_y` a um tensor e esperamos obter justamente o tensor constante para x e para y, pois o item_tfms é aplicado na tupla que contém (x, y)

OBS: lambda é uma notação de Python (one-liner) para funções anônimas, a sintaxe é `lambda valores_de_argumentos: valores_de_retorno` que é exatamente igual a definir uma função com `def nome_funcao(valores_de_argumentos: return valores_de_retorno;`, só que no primeiro caso do lambda a função não tem nome.

In [None]:
# testa_1_questao_dataset autograder_tests 0.5

noop = lambda obj: obj
cte = lambda xy: tensor(7)

dblock0 = ImageClassDataBlock(None, None, noop, noop, cte, noop)

xres0, yres0 = dblock0.transform('ok', {'ok': tensor([19])})

assert xres0 == tensor(7)
assert yres0 == tensor(7)


Agora vamos ver se você está passando certo os argumentos para o item_transform, vou pegar uma que espera receber a tupla com o (x, y) pois ele modifica ambos de forma diferente. Como a nossa transformação já não é uma função comum, ela espera receber uma tupla com o x, y e retorna uma tupla com os valores de x e y transformados.

In [None]:
# testa_2_questao_dataset autograder_tests 0.5

class TransformaDoido(ItemTransform):
    def encodes(self, xy):
        x, y = xy
        return tensor(14), y * 2

dblock0 = ImageClassDataBlock(None, None, noop, noop, TransformaDoido(), noop)

xres0, yres0 = dblock0.transform('ok', {'ok': tensor([19])})

assert xres0 == tensor(14)
assert yres0 == tensor(19 * 2)


E finalmente aplicamos em um exemplo mais próximo da realidade, lendo imagens do sistema de arquivos, pegando o label do nome da pasta, fazendo o resize e transformando em tensores:

In [None]:
# testa_3_questao_dataset autograder_tests 1

dblock1 = ImageClassDataBlock(None, None,
    path_to_pil_img, 
    parent_label, 
    [Resize(192, method='squish'), ToTensor()],
    None)

xres, yres = dblock1.transform(base_path/'train'/'cat'/'00000.jpg', {'cat':tensor([1.,0]), 'dog': tensor([0.,1])})

assert xres.dtype == torch.uint8
assert xres.shape == (3, 192, 192)
assert torch.all(xres[0, 0, :3] == tensor([203, 206, 209]))
assert torch.all(yres == tensor([1., 0]))

if RETRAIN:
    xres.show()
yres

OBS: o nosso simples bloco de dados (`ImageClassDataBlock`) é uma fábrica concreta e apenas para Imagem com Classificação. O do FastAI é *abstrato* no sentido de que essa lógica que implementamos nele de `transform` não está presente e na realidade ele cria um outro objeto chamado `Dataset` que aí sim tem essa lógica que implementamos.

## Dataloader (2 points)

**Explicação sobre o assunto**

Agora que nós já conseguimos transformar os nossos itens em tensores, nós temos que agrupá-los em batch, e também fazer isso de uma forma inteligente para não ocupar muito espaço na memória, isto é, só ir carregando à medida que forem pedindo.

É para isso que definimos o DataLoader, ele redebe uma lista de items `list_items` (obs essa lista pode ser indexada por outra lista), a transformação de itens implementada na questão anterior `transform`, a transformação a ser aplicada sobre um batch `batch_tfms`, o tamanho do batch `bs` e para onde deve ser enviado esse batch `device`.

In [None]:
class SimpleDataLoader:
    def __init__(self, list_items, transform, batch_tfms, batch_size, device):
        self.list_items = list_items
        self.transform = transform
        self.batch_tfms = batch_tfms
        self.batch_size = batch_size
        self.device = device
        self.iteracao_atual = 0

Uma das etapas importantes que o DataLoader realiza é enviar os tensores do batch para o device especificado que no caso de ser GPU (cuda), esse comando envia o tensor para a GPU. No caso de ser CPU ele continua na memória RAM, que é o default.

In [None]:
tensor([4, 3, 2]).to(device)

**Enunciado da Questão**

Implemente a função `get_batch_at` da classe `SimpleDataLoader` abaixo. Essencialmente esse método utiliza o `transform` implementado na questão anterior para gerar os tensores `x` e `y` de cada item, sendo que o essencial aqui é formar um batch, ou seja, aglutinar esses tensores em apenas um grande tensor `X`e outro `Y`, levando-os para o `device` especificado e posteriormente aplicando a transformação do batch. Para mais instruções detalhadas estão na docstring do método e dicas no dropdown abaixo.

**NÃO** use LLMs (ChatGPT) para pegar a resposta pronta.

Atenção, nessa questão se você esquecer de colocar os tensores do `device` correto, serão descontados 0,5 pontos da resposta. Basta você fazer `.to(my_device)` em cada tensor. Se esquecer de acessar o atributo do objeto e acessar a variável global diretamente perde 0,1 pontos.

**Pode** olhar a documentação das biblitocas (PyTorch mas todas as funções que você precisa está nas **dicas**) e **pode** (aconselhado) olhar o material de aula (slides e referências).

<details><summary><b>Dica para a resposta</b></summary>
<p>
Use a função `torch.stack` para concatenar vários tensores ao longo de uma nova dimensão. Use o `.to` para enviar o tensor para o dispositivo especificado. Utilize slices para indexar. Acesse os atributos pertinentes do objeto.
</p>
</details>

In [None]:
# questao_dataloader autograded_answer

@patch_to(SimpleDataLoader)
def get_batch_at(self: SimpleDataLoader, idx: int) -> tuple[torch.tensor, torch.tensor]:
    """ Aglutina vários items em um batch cujo primeiro elemento é indexado por idx.
    Realiza todas as tranformações pertinentes, tanto para obter o tensor a partir de
    um item quanto sobre o batch como um todo.
    
    Args:
        self: Referência ao próprio objeto, esse também é um método da classe SimpleDataLoader
        idx: Inteiro positivo que define o índice do primeiro item, entre 0 e len(list_items)-1
    
    Returns:
        Tupla com os tensores X, Y já agrupados por batch na dimensão 0
    """
    # WRITE YOUR CODE HERE! (you can delete this comment, but do not delete this cell so the ID is not lost)
    raise NotImplementedError()

Se você usou uma LLM, escreva a sua conversa com ela aqui nesta própria célula de texto (copie a conversa inteira) ou exporte o link da conversa:

**Escreva aqui**

Novamente vamos iniciar os nossos testes pelo exemplo mais simples, que são funções identidades. Veja que nesse primeiro teste também já estamos forçando a barra, pois estamos passando tensores como a lista de itens.

In [None]:
# testa_1_questao_dataloader autograder_tests 0.25

noop = lambda t: t
noop2 = lambda t: (t, t)
data0 = [tensor(8), tensor(7), tensor(3), tensor(19)]

dl0 = SimpleDataLoader(data0, noop2, noop, 4, 'cpu')

batch_x0, batch_y0 = dl0.get_batch_at(0)

assert batch_x0.device.type == 'cpu'
assert batch_y0.device.type == 'cpu'
assert torch.all(batch_x0 == tensor([ 8,  7,  3, 19]))
assert torch.all(batch_y0 == tensor([ 8,  7,  3, 19]))


Aqui vamos verificar se de fato você colocou no device correto (só testa de verdade se você tiver uma GPU para o device ficar diferente da CPU).

In [None]:
# testa_2_questao_dataloader autograder_tests 0.25

dl1 = SimpleDataLoader(data0, noop2, noop, 4, device)

batch_x1, batch_y1 = dl1.get_batch_at(0)

assert batch_x1.device.type == device.type
assert batch_y1.device.type == device.type
assert torch.all(batch_x1 == tensor([ 8,  7,  3, 19], device=device))
assert torch.all(batch_y1 == tensor([ 8,  7,  3, 19], device=device))


Vamos ver se você está realizando a transformação do batch corretamente. Perceba que agora que estamos bypassando a chamada do `Pipeline` a nossa função de `batch_tfms` é normal, mas isso só nesse teste mesmo, na vida real com o FastAI você teria que fazer igual lá em cima na questão anterior e usar uma subclasse de `Transform`.

In [None]:
# testa_3_questao_dataloader autograder_tests 0.25

divide_y_por_2 = lambda t: (t, t//2)
data2 = [tensor([8, 3]), tensor([7, 9]), tensor([3, 2]), tensor([19, 23])]

dl2 = SimpleDataLoader(data2, divide_y_por_2, noop, 2, 'cpu')

batch_x2, batch_y2 = dl2.get_batch_at(2)

assert torch.all(batch_x2 == tensor([[ 3,  2], [19, 23]]))
assert torch.all(batch_y2 == tensor([[ 1,  1], [9, 11]]))


Vamos testar com tensores de dimensão maiores, para ver se você está fazendo o stacking corretamente.

In [None]:
# testa_4_questao_dataloader autograder_tests 0.25

soma_no_batch = lambda t: (t[0] + 1, t[1] + 2)

dl3 = SimpleDataLoader(data2, noop2, soma_no_batch, 1, 'cpu')

batch_x3, batch_y3 = dl3.get_batch_at(3)

assert torch.all(batch_x3 == tensor([[ 20,  24]]))
assert torch.all(batch_y3 == tensor([[ 21,  25]]))


E finalmente um teste que se aproxima mais da realidade, carregando um batch de imagens a partir da lista de itens que na realidade é uma lista dos caminhos para os arquivos de imagens.

In [None]:
# testa_5_questao_dataloader autograder_tests 1

itemtran  = Pipeline([Resize(192, method='squish'), ToTensor()])
dic = {'cat':tensor([1,0]), 'dog': tensor([0,1])}
transform = lambda p: (itemtran(PILImage.create(p)), dic[p.parent.name])
bstran = Pipeline([IntToFloatTensor()] + aug_transforms())
data4= [base_path/'train'/'cat'/'00004.jpg', base_path/'train'/'dog'/'00001.jpg']

dl4 = SimpleDataLoader(data4, transform, bstran, 2, 'cpu')

batch_x4, batch_y4 = dl4.get_batch_at(0)
assert batch_x4.dtype == torch.float32
assert batch_x4.shape == (2, 3, 192, 192)
assert torch.all(batch_y4 == torch.tensor([[1, 0],
                                           [0, 1]]))
if RETRAIN:
    batch_x4[0].show(), batch_x4[1].show()
batch_y4

Você deve ter percebido que os valores de inicialização do nosso `SimpleDataLoader` são diferentes do que o nosso `ImageClassDataBlock`, está faltando a lista de dados e os outros parâmetros. Além disso, a ideia é que nós queremos percorrer os valores do dataloader em um loop.

Antes disso, vamos definir aquele nossos mapeamento (dicionário) que relaciona o label com um tensor. Entenda que esse tensor representa na realidade uma distribuição de probabilidade entre as classes. No caso de ser uma classificação mutuamente exclusiva, então o somatório delas tem que dar igual a 1. Por exemplo, se temos 4 classes $(capivara, papagaio, gato, cachorro)$ podemos ter uma representação de probabilidade $(0.1, 0.7, 0.05, 0.15)$ que dá 10% de probabilidade para capivara, 70% para papagaio, 5% para gato e 15% para chachorro.

Assim vamos definir uma função que retorna um tensor no formato one-hot encodding, que é justamente colocar 100% de probabilidade em apenas uma classe (representada pelo índice), que é a classe que alguém anotou manualmente.

Veja o exemplo abaixo que apenas o valor na posição de índice 3 é unitário.

In [None]:
def cria_one_hot(idx: int, tamanho: int):
    x = torch.zeros(tamanho)
    x[idx] = 1
    return x

cria_one_hot(3, 10)

Agora assim podemos carregar todos os items por meio do `get_items` e separá-los em treino e validação por meio do `splitter`, além de criarmos nosso dicionário. É imprescindível fazer o shuffle dos train_idx, pois se o splitter não fizer, o seu dataset ficará comprometido, isto é, 2400 gatos e depois 2400 cachorros nessa ordem. A rede vai aprender só o viés.

O `DataLoaders` cria dois `DataLoader` por padrão, um para treino e outro para validação. Essa funcionalidade (de separar em treino e validação) no FastAI está encapsulada na classe `Dataset`, aqui ficou com esse nome apenas por simplificação. E só para deixar claro, o FastAI tem sim o `.train` e o `.valid` nos `DataLoaders`.

In [None]:
class SimpleDataLoaders:
    def __init__(self, icdb: ImageClassDataBlock, data_pointer, batch_size: int, device):
        self.icdb = icdb
        self.items = icdb.get_items(data_pointer)
        self.train_idx, self.val_idx = icdb.splitter(self.items)
        np.random.shuffle(self.train_idx)
        self.labels = sorted(list(set((icdb.get_y(d) for d in self.items[self.train_idx]))))
        labels_len = len(self.labels)
        self.label_to_tensor = {label: cria_one_hot(idx, labels_len) for idx, label in enumerate(self.labels)}
        self.train = SimpleDataLoader(self.items[self.train_idx], self.transform, icdb.batch_tfms, batch_size, device)
        self.valid = SimpleDataLoader(self.items[self.val_idx], self.transform, icdb.batch_tfms, batch_size, device)

    def transform(self, item):
        return self.icdb.transform(item, self.label_to_tensor)

Vamos acrescentar um novo método na classe `ImageClassDataBlock` para que ela instancie um `DataLoader` para nós. Observe que o DataLoader recebe já a lista de dados que vem do `self.get_items(self.data_pointer)`.

In [None]:
@patch_to(ImageClassDataBlock)
def dataloaders(self: ImageClassDataBlock, data_pointer, bs=64, device=device) -> DataLoader:
    return SimpleDataLoaders(self, data_pointer, bs, device)

Cada `DataLoader` é um objeto iterável `iterable`, é por isso que usamos `for x, y in dataloder:` . Para podermos deixar iterável, temos que implementar um método `__iter__`, além disso, ele precisa retornar um objeto iterador. Para simplificar vamos deixá-lo como sendo um próprio iterador (ele cria uma nova cópia de si mesmo com o contador zerado).

In [None]:
@patch_to(SimpleDataLoader)
def __iter__(self: SimpleDataLoader):
    return SimpleDataLoader(self.list_items, self.transform, self.batch_tfms, self.batch_size, self.device)

Só por comodidade também vamos definir a função `len` para o nosso dataloader, que dá a quantidade de iterações totais que ele tem. Esteja ciente de que a última iteração pode conter uma quantidade menor de batch, justamente se o número do batch_size não for múltiplo da quantidade de elementos.

In [None]:
@patch_to(SimpleDataLoader)
def __len__(self: SimpleDataLoader):
    return math.ceil(len(self.list_items) / self.batch_size)

E finalmente tornamos ele um iterador, que é simplementente incrementar o contador e retorna o batch:

In [None]:
@patch_to(SimpleDataLoader)
def __next__(self: SimpleDataLoader):
    if self.iteracao_atual >= len(self):
        raise StopIteration
    batch = self.get_batch_at(self.iteracao_atual * self.batch_size)
    self.iteracao_atual += 1
    return batch

Assim a implementação final fica similar ao que o FastAI faz (mas é uma apenas uma simplificação para o nosso problema):

In [None]:
simples_data_block = ImageClassDataBlock(
    get_image_files,
    RandomSplitter(valid_pct=0.2, seed=19), 
    path_to_pil_img, 
    parent_label, 
    [Resize(192, method='squish'), ToTensor()],
    [IntToFloatTensor()] + aug_transforms())

simples_dataloaders = simples_data_block.dataloaders(base_path/'valid')

if RETRAIN:
    for x, y in simples_dataloaders.valid:
        print(x.shape, y.shape, x.device)

Além do FastAI, o próprio PyTorch já tem classes de Dataset e de DataLoader, mas eu recomendo usar a do FastAI pela interface de mais alto nível.

## Architecture (2 points)

**Explicação sobre o assunto**

Vamos implementar manualmente a arquitetura da ResNet-18, que é pequena e relativamente simples, e tem uma performance 'razoável'.

<a href="https://www.researchgate.net/figure/Original-ResNet-18-Architecture_fig1_336642248"><img src="https://www.researchgate.net/profile/Sajid-Iqbal-13/publication/336642248/figure/fig1/AS:839151377203201@1577080687133/Original-ResNet-18-Architecture.png" alt="Original ResNet-18 Architecture"/></a>

Em vez de utilizarmos um average pooling local de tamanho fixo 7x7, vamos utilizar uma average pooling global, isto é, reduzindo todas as dimensões espaciais. Essa é o bizu (*trick*) para que nós possamos classificar uma imagem de qualquer tamanho. No exemplo abaixo ele utilizou imagens com tamanho de entrada 224.

<a href="https://www.researchgate.net/figure/ResNet-18-Architecture_tbl1_322476121"><img src="https://www.researchgate.net/profile/Paolo-Napoletano/publication/322476121/figure/tbl1/AS:668726449946625@1536448218498/ResNet-18-Architecture.png" alt="ResNet-18 Architecture."/></a>

Em vez de implementar toda a arquitetura vocês irão implementar apenas o módulo residual. Mas é importante que vocês entendam a arquitetura como um todo, que é a aplicação desses vários blocos residuais.

Vejamos aqui em baixo o `state_dict` que são apenas os pesos da rede já treinada. Ele é um `dict` normal de python, onde as chaves são strings que definem o nome do parâmetro e os valores são tensores que contém os pesos neurais. Veja os nomes de todos os parâmetros, um slice e o shape do bias da última camada. Você conseguê saber porque a dimensão é de 1000 elementos (perceba o dataset que ele foi treinado `IMAGENET1K_V1`).

In [None]:
weights = ResNet18_Weights.verify(ResNet18_Weights.IMAGENET1K_V1).get_state_dict("pretrained")
weights.keys(), weights['fc.bias'][:10], weights['fc.bias'].shape

Observe no `layer2` que temos uma mudança de 64 canais para 128 canais da `conv1` para a `conv2`.

In [None]:
weights['layer2.0.conv1.weight'].shape, weights['layer2.0.conv2.weight'].shape

Entenda que se fôssemos fazer a conexão residual de forma ingênua, teríamos um descasamento das nossas dimensões, pois a dimensão de saída é menor do que a de entrada, por isso precisamos de um layer de casamento (chamado de `downsample`) que é um simples kernel 1x1, ou seja, atua apenas nas dimesões dos canais e não interfere nas dimensões espaciais.

O bizu (trick) na inicialização dessa camada (não é o caso pois não estamos treinando do zero) é utilizar uma matriz identidade, para que a entrada possa ser transferida para a saída. Ainda é necessário somar um pouco de ruído para quebrar a simetria dos pesos (pois os outros canais zerados acabariam continuando com valores iguais durante o treino).

In [None]:
weights['layer2.0.downsample.0.weight'].shape

A última camada ainda é uma completamente conectada só pela herança do state_dict que não quis alterar aí em cima, é o default do próprio PyTorch. Mas lá embaixo, depois da última questão mostro como transformá-la em uma camada convolucional (neste caso teríamos uma rede completamente convolucional).

O nome que o PyTorch pega é justamente o nome que nós damos para as nossas variáveis, perceba o `state_dict` aí em cima. Isso acontece quando atribuímos um `nn.Module` (`nn.Conv2d`, `nn.Linear`, `nn.Sequential` entre outros `nn.`) a algum atributo do nosso próprio objeto. Assim quando vamos carregar ou salvar o nosso objeto ele verifica todos os atributos e verifica se é um `nn.Module`, se for, ele aplica recursivamente, e vai registrando todos os parâmetros (que são os tensores onde `requires_grad=True` e que foram devidamente adicionados ao módulo).

O `nn.Sequential` é um módulo do PyTorch que simplesmente faz a aplicação sequencial dos módulos que você para como argumento para ele (lembra o Pipeline, mas agora são módulos da rede neural). Então no caso abaixo, o `self.layer1` na realidade é composto por dois `BlocoResidual`, sendo que cada `BlocoResidual` é composto por duas camadas convolucionais.

In [None]:
class ResNet18(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.layer1 = nn.Sequential(BlocoResidual(64)         , BlocoResidual(64))
        self.layer2 = nn.Sequential(BlocoResidual(128, 64, 2) , BlocoResidual(128))
        self.layer3 = nn.Sequential(BlocoResidual(256, 128, 2), BlocoResidual(256))
        self.layer4 = nn.Sequential(BlocoResidual(512, 256, 2), BlocoResidual(512))
        self.fc = nn.Linear(512, 1000)

A propagação direta da rede é definida pela aplicação sequencial das suas camadas, primeiro a convolução `conv1` seguida da normalização `bn1`, da função de ativação ReLU e de um Max Pooling que diminui a dimensão espacial pela metade.

Assim, temos a aplicação sequencial dos módulos `layer1`,  `layer2`, `layer3` e `layer4`, sendo que cada módulo desses é composto por dois Blocos Residuais. A cada layer desses a quantidade de canais vai dobrando e as dimensões espaciais vão diminuindo pela metade.

Por final temos um pooling global no resto das dimensões espaciais, reduzindo-as para valores unitários, isto é (1, 1). Essa redução ocorre por Average Pooling (tirar a média de todas as dimensões espaciais). Aqui no PyTorch ele não tem uma função com o nome de GlobalPooling, mas tem outra mais poderosa: `adaptive_x_poolNd` no qual você pode especificar exatamente as dimensões de saída espaciais.

Perceba que esse pooling permite que passemos imagens de quaiquer tamanhos para ela (só cuidado para ser múltiplo de $2**5 = 32$ que foi a quantidade de pooling que fizemos senão a rede pode desprezar (truncar) um pouco das bordas direita e do fundo da imagem.

O final dela na realidade é uma camada densa (fully-connected) linear, que poderia ser substituída por uma convolução com kernel 1x1 de mesmos pesos se fôssemos tentar fazer uma detecção. Nesse caso, a fineza das janelas de detecção dependeriam justamente do tamanho do kernel do average pooling que agora não seria global mas com um stride bem definido, mas isso será assunto do próximo lab.

In [None]:
@patch_to(ResNet18)
def forward(self: ResNet18, x: Tensor) -> Tensor:
    x = F.relu(self.bn1(self.conv1(x)))
    x = F.max_pool2d(x, kernel_size=3, stride=2, padding=1)
    x = self.layer1(x)
    x = self.layer2(x)
    x = self.layer3(x)
    x = self.layer4(x)
    x = F.adaptive_avg_pool2d(x, (1, 1))
    x = torch.flatten(x, 1)
    return self.fc(x)

**Enunciado da Questão**

Implemente a função `forward` do `BlocoResidual` abaixo que realiza a propagação direta desse módulo. A primeira camada convolucional deve ser aplicada em conjunto com a normalização `bn1` e a função de ativação ReLU. Na segunda camada convolucional, aplique apenas a normalização, e em seguida some com a entrada inicial do bloco (corrigida as dimensões). A função de ativação ReLU deve ser aplicada apenas após a conexão residual.

<a href="https://www.researchgate.net/figure/Illustration-of-the-residual-learning-between-a-plain-network-and-b-a-residual_fig1_330750910"><img src="https://www.researchgate.net/profile/Olarik-Surinta/publication/330750910/figure/fig1/AS:720960386781185@1548901759330/Illustration-of-the-residual-learning-between-a-plain-network-and-b-a-residual.ppm" alt="Illustration of the residual learning between (a) plain network and (b) a residual network."/></a>

Nessa implementação, a camada de `downsample` realiza uma transformação identidade se a entrada tiver as mesmas dimensões da saída, se não ela realiza uma convolução para fazer as dimensões tanto do canal quando as espaciais serem iguais.

**NÃO** use LLMs (ChatGPT) para pegar a resposta pronta.

**Pode** olhar a documentação das biblitocas (PyTorch mas todas as funções que você precisa está nas **dicas**) e **pode** (aconselhado) olhar o material de aula (slides e referências).

<details><summary><b>Dica para a resposta</b></summary>
<p>
Use F.relu para a função de ativação ReLU e chame as camadas adequadamente.
</p>
</details>

In [None]:
# questao_arquitetura autograded_answer

class BlocoResidual(nn.Module):
    def __init__(self, canais:int, entrada=-1, stride=1):
        super().__init__()
        self.conv1 = nn.Conv2d(canais if entrada==-1 else entrada, canais, 3, stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(canais)
        self.conv2 = nn.Conv2d(canais, canais, 3, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(canais)
        self.downsample = (lambda x: x) if entrada==-1 else \
          nn.Sequential(nn.Conv2d(entrada, canais, 1, stride, bias=False), nn.BatchNorm2d(canais))

    def forward(self, x: Tensor) -> Tensor:
        """ Realiza a propagação direta do bloco residual de acordo com as equações acima.
        Aplique sequencialmente as transformações, e utiliza função de ativação relu após bn
        No final, faça a conexão residual antes da aplicação da função de ativação.
        
        Args:
            x: Tensor de entrada do bloco residual
        
        Returns:
            Tensor resultado das operações
        """
        # WRITE YOUR CODE HERE! (you can delete this comment, but do not delete this cell so the ID is not lost)
        raise NotImplementedError()
    

Se você usou uma LLM, escreva a sua conversa com ela aqui nesta própria célula de texto (copie a conversa inteira) ou exporte o link da conversa:

**Escreva aqui**

Vamos testar se as dimensões sem o `downsample` estão casando, vamos apenas instanciar e propagar uma imagem aleatória:

In [None]:
# testa_1_questao_arquitetura autograder_tests 0.2

bloco1 = BlocoResidual(32)
saída1 = bloco1(torch.rand((3, 32, 7, 7)))
assert saída1.shape == (3, 32, 7, 7)


Agora vamos testar quando a dimensão da saída muda:

In [None]:
# testa_2_questao_arquitetura autograder_tests 0.2

bloco2 = BlocoResidual(64, entrada = 32, stride = 2)
saída2 = bloco2(torch.rand((3, 32, 8, 8)))
assert saída2.shape == (3, 64, 4, 4)


Agora vamos ver se além das dimensões, você realmente implementou corretamente as operações (e sua ordem, por exemplo). Para isso vamos carregar pesos iguais.

Preste atenção na sua arquitetura, pode ser que o teste passe mas mesmo assim esteja errado, aqui o bloco é de apenas um canal, bem pequeno só para você olhar os parâmetros dele e entender.

In [None]:
# testa_3_questao_arquitetura autograder_tests 0.6

bloco3 = BlocoResidual(1)

bloco3.load_state_dict(OrderedDict([
             ('conv1.weight',
              tensor([[[[ 0.1038,  0.1669, -0.0433],
                        [ 0.2729,  0.2088,  0.0134],
                        [-0.0115, -0.1043, -0.2675]]]])),
             ('bn1.weight', tensor([1.])),
             ('bn1.bias', tensor([0.])),
             ('bn1.running_mean', tensor([0.])),
             ('bn1.running_var', tensor([1.])),
             ('bn1.num_batches_tracked', tensor(0)),
             ('conv2.weight',
              tensor([[[[ 0.3080, -0.1464,  0.1728],
                        [-0.0989,  0.2278,  0.0424],
                        [-0.0991, -0.0931,  0.0043]]]])),
             ('bn2.weight', tensor([1.])),
             ('bn2.bias', tensor([0.])),
             ('bn2.running_mean', tensor([0.])),
             ('bn2.running_var', tensor([1.])),
             ('bn2.num_batches_tracked', tensor(0))]))
entrada = tensor([[[[-1.3950,  0.8202,  0.1999, -0.0679,  0.4585],
                    [ 1.7895,  0.8287, -2.3500,  0.0324, -1.1553],
                    [ 1.1633,  1.4155,  0.0830, -0.7297,  0.9657],
                    [-0.8818,  0.2879, -0.2637, -0.0935,  0.2247],
                    [-1.0390, -0.9874, -0.2633, -0.3821,  0.6705]]]])

saída3 = bloco3(entrada)

assert saída3.shape == (1, 1, 5, 5)
assert torch.linalg.norm(saída3 - tensor(
      [[[[0.        , 0.72686297, 0.11289545, 0.        , 0.        ],
         [1.4164188 , 0.        , 0.        , 0.6236053 , 0.        ],
         [2.8595443 , 1.6284161 , 0.        , 0.        , 0.26629412],
         [0.6389845 , 0.6245868 , 3.0763602 , 0.        , 0.        ],
         [0.        , 0.        , 0.        , 0.23913431, 0.0933522 ]]]])) < 1e-6


Agora vamos testar com a rede inteira que nós mesmos definimos ali em cima, com o BlocoResidual que você implementou, vamos carregar os pesos já pretreinados com o Imagenet de 1000 classes e ver se o resultado é condizente.

Perceba que a rede espera uma imagem normalizada com média e desvio padrão do conjunto de treinamento, que está especificado aí embaixo.

Vamos compara a saída da nossa própria implementação da `ResNet18` com a implementação padrão do PyTorch `resnet18`. Agora sim se o resultado for semelhante (diferença menor que $10^{-6}$) podemos ter alta confiança de que a nossa implementação está correta.

In [None]:
# testa_4_questao_arquitetura autograder_tests 1

nossa_resnet18 = ResNet18()

weights = ResNet18_Weights.verify(ResNet18_Weights.IMAGENET1K_V1).get_state_dict("pretrained")
nossa_resnet18.load_state_dict(weights)

media = tensor([0.485, 0.456, 0.406]).reshape(1, 3, 1, 1)
sigma = tensor([0.229, 0.224, 0.225]).reshape(1, 3, 1, 1)
img0 = IntToFloatTensor()(ToTensor()(Resize(192, method='squish')(PILImage.create(base_path/'train'/'cat'/'00003.jpg'))))
img = ((img0 - media) / sigma)

res18_pytorch = resnet18(weights=ResNet18_Weights.IMAGENET1K_V1)

nossa_resnet18.eval()
res18_pytorch.eval()

with torch.no_grad():
    nossa_saída1000 = nossa_resnet18(img)
    pytorch_saída1000 = res18_pytorch(img)

assert torch.linalg.norm(nossa_saída1000 - pytorch_saída1000) < 1e-6


Além do `no_grad` para inferência, você deve estar se perguntando o que serve o `.eval()` chamada nos próprios modelos. É justamente por causa das camadas de normalização por batch `BatchNorm` que durante o treinamento (modo default), elas atualizam as estimativas das médias e dos desvios padrão das ativações em um batch.

Já no modo deinferência (`model.eval()`), isso seta a flag `training` de todos os submódulos recursivamente, e principalmente dos `bn` que agora não irão atualizar mais os valores de média e desvio padrão, que se tornarão fixos.

In [None]:
nossa_resnet18.training

Vejamos a saída da rede, observe qual o índice  do valor (*logit*) máximo de saída da rede:

In [None]:
torch.argmax(nossa_saída1000).item()

O que quer dizer esse número? Temos que saber qual o mapeamento entre o índice da classe e o label textual que nós possamos entender. Se o modelo tiver acertado, deve ser algum gato. Eu poderia colocar um dicionário aqui que mapeasse o índice (int) em um label (string), mas vou deixar para você ver isso manualmente.

Acesse o link [https://gist.github.com/yrevar/942d3a0ac09ec9e5eb3a](https://gist.github.com/yrevar/942d3a0ac09ec9e5eb3a) e veja ao que realmente corresponde esse índice.

Veja também a saída abaixo da rede:

In [None]:
nossa_saída1000.shape, nossa_saída1000

Você deve estar se perguntado: Uai, valores negativos? Não deveriam se probabilidades? 😱

Então, nós *esquecemos* a softmax! Mas tem um motivo para termos deixado-a de fora e veremos adiante no treinamento.

Veja uma representação textual da rede (tem outra por desenho de grafos que mostrarei no próximo lab e outra por tabela também)

In [None]:
nossa_resnet18

Se você realmente quiser uma distribuição de probabilidades na saída terá que aplicar a softmax, está faltando! O que temos nas saídas são apenas os `logits`, isto é, os valores imediatamentes antes de aplicar a softmax. São chamados de logits pois são justamentes os expoentes da softmax e se você fosse aplicar o log nela voltaria nos valores (a menos da normalização).

Mas porque então a implementação padrão já não é com a softmax? Pois já podemos incluí-la na própria loss function! Então em vez de termos que fazer uma operação de exponenciação para depois aplicarmos o log, podemos já diretamente usar os nossos valores. Na última seção nós iremos detalhar isso.

## Transfer Learning (2 points)

**Explicação sobre o assunto**

Nos testes da questão anterior, nós carregamos os pesos de uma rede neural com a mesma arquitetura (e mesmos nomes de parâmetros)

Perceba que os pesos são apenas um dicionário com o nome do parâmetro que é uma string e o seu valor que é um tensor, se você criasse uma arquitetura diferente (outras operações) mas cujos nomes dos parâmetros fosse idênticos o PyTorch iria carregar alegremente os pesos, pois ele só faz o match dos nomes. Isso dá mais flexibilidade mas também pode ser uma fonte silenciosa de erros, pois dependendo do que você mudar nas operações, pode ser que os pesos pretreinados percam completamente o seu sentido.

A ideia do transfer learning é justamente essa, mas nós só iremos mudar as camadas finais, enquanto aproveitamos os pesos de todas as camadas anteriores.

Essa é a grande sacada: enquanto as camadas iniciais aprendem o que é uma imagem e quais são as features mais interessantes a serem extraídas, as camadas finais apenas escolhem essas features para definir as classes.

Por exemplo, a última camada da nossa ResNet18 é uma uma camada densa (completamente conectada, linear, *MLP*):

In [None]:
list(res18_pytorch.modules())[-1]

**Enunciado da Questão**

Implemente a função `transfer_weights_new_dim` abaixo de acordo com a sua documentação. A ideia geral é que você carregue os pesos pré-treinado e substitua a última camada densa por uma nova com as dimenões de saída especificadas.

**NÃO** use LLMs (ChatGPT) para pegar a resposta pronta. **NÃO** pesquise a resposta pronta na internet.

**Pode** olhar a documentação das biblitocas (PyTorch mas todas as funções que você precisa está nas **dicas**) e **pode** (aconselhado) olhar o material de aula (slides e referências).

<details><summary><b>Dica para a resposta</b></summary>
<p>
Use o `.load_state_dict` para carregar os pesos já treinados na rede original e substitua a última camada (a completamente conectada) por outra que tenha a dimensão correta. Use `nn.Linear` para criar uma nova camada.
</p>
</details>

In [None]:
# questao_transfer_learning autograded_answer

def transfer_weights_new_dim(model: ResNet18, pretrained_weights: dict[str, tensor], output_dims: int):
    """ Substitui a última camada de uma ResNet18 cujos pesos devem ser os pré-treinados
    A nova camada de saída deve ter a dimensão especificada pelo parâmetro output_dims
    
    Args:
        model: Instância de uma ResNet18, pode ser tanto nossa quanto do PyTorch
        pretrained_weights: Dicionário ordenado com os pesos pré-treinados
        output_dims: Tamanho da nova dimensão de saída da rede
    
    Returns:
        None : Não há valor de retorno, mas altera partes do model.
    """
    # WRITE YOUR CODE HERE! (you can delete this comment, but do not delete this cell so the ID is not lost)
    raise NotImplementedError()

Se você usou uma LLM, escreva a sua conversa com ela aqui nesta própria célula de texto (copie a conversa inteira) ou exporte o link da conversa:

**Escreva aqui**

Primeiro vamos verificar se as dimensões que você implementou fazem sentido:

In [None]:
# testa_1_questao_transfer_learning autograder_tests 0.5

res18_pytorch = resnet18()
weights = ResNet18_Weights.verify(ResNet18_Weights.IMAGENET1K_V1).get_state_dict("pretrained")

transfer_weights_new_dim(res18_pytorch, weights, 2023)

assert res18_pytorch.fc.out_features == 2023
assert res18_pytorch.fc.weight.data.shape == (2023, 512)
assert res18_pytorch.fc.bias.data.shape == (2023, )


Vejamos se a rede ainda funciona como o esperado:

In [None]:
# testa_2_questao_transfer_learning autograder_tests 1

img_rand = torch.randn((8, 3, 127, 127))

saida_nova = res18_pytorch(img_rand)

assert saida_nova.shape == (8, 2023)


E por fim se os pesos realmente são os pré-treinados

In [None]:
# testa_3_questao_transfer_learning autograder_tests 0.5

assert torch.linalg.norm(weights['conv1.weight'] - res18_pytorch.conv1.weight) < 1e-6
assert torch.linalg.norm(weights['bn1.weight'] - res18_pytorch.bn1.weight) < 1e-6
assert torch.linalg.norm(weights['layer2.0.conv2.weight'] - res18_pytorch.layer2[0].conv2.weight) < 1e-6
assert torch.linalg.norm(weights['layer3.1.conv2.weight'] - res18_pytorch.layer3[1].conv2.weight) < 1e-6


## Fine Tunning (2 points)

**Explicação sobre o assunto**

A etapa de fine-tunning é apenas o treinamento do modelo já com transfer learning. Mas existe outro pulo do gato! 

Se nós formos treinar já o modelo diretamente, a nossa última camada que foi inicializada aleatoriamente vai introduzir ruído no gradiente que será retropropagado para as camadas já treinada, e acaba bagunçando os pesos que já estavam bons.

Dessa forma, a solução é 'congelar' os pesos já treinados, isto é, impedir que eles sejam atualizados, pois essa atualização (pelo menos no início) potencialmente vai *piorar* os pesos.

Contudo, depois que a sua última camada já tiver se estabilizado, então se torna seguro treinar todos os parâmetros, pois os gradiente já se tornaram 'estáveis' novamente e vai ter menos risco de bagunçar os pesos pre-treinados.

No PyTorch, tal qual no lab 2, podemos pegar todos os parâmetros de um `nn.Module` (a rede inteira, um modelo, uma camada) por meio do `.parameters()`. Veja aqui abaixo os shapes e o `.requires_grad` de cada parâmetro. Ele inclusive retorna na ordem topológica.

In [None]:
[(param.shape, param.requires_grad) for param in res18_pytorch.parameters()]

**Enunciado da Questão**

Implemente a função abaixo `freeze_all_but_last_layer` que congela todos os parâmetros do modelo, menos aqueles associados à última camada completamente conectada.

**NÃO** use LLMs (ChatGPT) para pegar a resposta pronta. **NÃO** pesquise a resposta pronta na internet.

**Pode** olhar a documentação das biblitocas (PyTorch mas todas as funções que você precisa está nas **dicas**) e **pode** (aconselhado) olhar o material de aula (slides e referências).

<details><summary><b>Dica para a resposta</b></summary>
<p>
Use `model.parameters()` para obter um objeto iterável. Acesse a última camada `.fc` do modelo e permita que ela seja treinável.
</p>
</details>

In [None]:
# questao_fine_tunning autograded_answer

def freeze_all_but_last_layer(model: ResNet18):
    """ Congela todos os parâmetros da rede a menos os da última camada
    
    Args:
        model: Modelo da ResNet18 a ser modificado
    
    Return:
        None não há valor a ser retornado
    """
    # WRITE YOUR CODE HERE! (you can delete this comment, but do not delete this cell so the ID is not lost)
    raise NotImplementedError()

Se você usou uma LLM, escreva a sua conversa com ela aqui nesta própria célula de texto (copie a conversa inteira) ou exporte o link da conversa:

**Escreva aqui**

O teste é bem simples, basta ver se os parâmetros realmente foram impedidos de treinar.

In [None]:
# testa_1_questao_fine_tunning autograder_tests 1

res18_pytorch = resnet18(weights=ResNet18_Weights.IMAGENET1K_V1)

freeze_all_but_last_layer(res18_pytorch)

assert res18_pytorch.conv1.weight.requires_grad == False
assert res18_pytorch.layer3[0].conv2.weight.requires_grad == False


Além disso, **todos** os parâmetros da última camada devem ser treináveis, não se esqueça de nenhum!

In [None]:
# testa_2_questao_fine_tunning autograder_tests 1

assert res18_pytorch.fc.weight.requires_grad == True


## Treino de Verdade!

**É recomendado que você habilite a GPU pelo menos para essa etapa**

Mas se não habilitar não tem problema, só vai demorar cerca de uns 20 minutos, frente a uns 2 minutos com GPU (estimativas de tempo variam).

Habilite a GPU gratuitamente no Colab ([aula 1 slide 137](https://docs.google.com/presentation/d/1PYRArTIBh9vQ5r7DvEwGgbZp8FCkrwZnkQuFZ-z-snc)) ou utilize a imagem docker com GPU ([jupytergpu.yml](https://github.com/Gabrui/cm203)).

Agora vamos efetivamente treinar a rede, aqui temos a definição do nosso loop de otimização, algo que vocês já implementaram no lab 3.

Temos aqui a nossa função que realiza o treinamento por apenas uma época (ver por uma única vez todos os dados do conjunto de treinamento). A estrutura dela é exatamente a mesma que implementada no lab 3: zera os gradientes, calcula a saída estimada pelo modelo, calcula a loss, realiza a retropropagação e depois a heurística do gradiente descendente (optimizer). Perceba também que para o conjunto de validação basta calcular a loss, com `no_grad` e isso é realizado apenas no final da época do treinamento.

In [None]:
def one_epoch(model, criterion, dataloaders, optimizer, scheduler):
    train_losses, val_losses = [], []
    
    model.train()
    for x, y in dataloaders.train:
        optimizer.zero_grad() # Zera os gradientes, por causa do += do lab2
        ŷ = model(x)
        loss = criterion(ŷ, y)
        loss.backward()
        optimizer.step() # Aqui que os parâmetros da rede são atualizados
        scheduler.step()
        train_losses.append(loss.item())
        print('%.2e' % loss.item(), end=', ')
    print(f'\nMean loss during training: {np.mean(train_losses)}')
    
    model.eval()
    with torch.no_grad():
        for x, y in dataloaders.valid:
            val_losses.append(criterion(model(x), y).item())
    print(f'Validation loss: {np.mean(val_losses)}\n')
    
    return train_losses, val_losses

Outro pulo do gato no PyTorch, é que a função de entropia cruzada deles, `F.cross_entropy` (utilizada na loss) já tem uma `softmax` dentro dela mesma! É por isso que não precisamos colocar uma softmax na saída da nossa rede. Eu acredito que isso é por questão de otimização numérica e computacional, por que na softmax nós fazemos a exponenciação e no entropia cruzada nós calculamos o log, então (a menos da normalização), acaba se cancelando e dá para implementar como uma operação só!

Veja aqui o resultado da entropia cruzada, observe que os logits não tem restrição, podem ser maiores que 1 ou ainda negativos. ATENÇÃO, a ordem importa! O primeiro argumento é o tensor de logits estimados pela rede e o segundo é a probabilidade de verdade.

In [None]:
F.cross_entropy(tensor([2., -1, -4, -1]), tensor([1., 0, 0, 0]))

Mas agora se quiséssemos calcular as probabilidades nós teríamos que aplicar a softmax sobre os logits:

In [None]:
probabilidades = F.softmax(tensor([2., -1, -4, -1]), dim=-1)
probabilidades

Se calcularmos a entropia cruzada pela fórmula matemática que conhecemos $-\Sigma_c p_c \log \hat{p_c}$ , vemos que o resultado é exatamente o mesmo:

In [None]:
-torch.sum(tensor([1., 0, 0, 0]) * torch.log(probabilidades))

Agora sim vamos definir a nossão função de fine tunning!

In [None]:
def fine_tune(model, dataloaders: SimpleDataLoaders, epochs = 1):
    freeze_all_but_last_layer(model)
    history = []
    
    optim = torch.optim.Adam(model.parameters())
    sched = torch.optim.lr_scheduler.OneCycleLR(optim, 3e-3, epochs=1, steps_per_epoch=len(dataloaders.train), pct_start=0.05)
    loss = one_epoch(model, F.cross_entropy, dataloaders, optim, sched)
    history.append(loss)
    
    for param in model.parameters():
        param.requires_grad = True
    
    optim = torch.optim.Adam(model.parameters())
    sched = torch.optim.lr_scheduler.OneCycleLR(optim, 3e-4, epochs=epochs, steps_per_epoch=len(dataloaders.train), pct_start=0.2)
    
    for e in range(epochs):
        print(f"Epoch {e+1}/{epochs}")
        losses = one_epoch(model, F.cross_entropy, dataloaders, optim, sched)
        history.append(losses)
    return history

O nosso datablock simplificado com o nosso dataloader, com as duas funções de transformação de um item e de gerar um batch que você implementou. Aqui vamos usar o `GrandparentSplitter` para separa em treino ou validação com base no nome da pasta que está dois níveis acima da imagem (avós) que tem justamente os nomes de `train` e de `valid`. Novamente temos que ter cuidado com a normalização das imagens, pois o modelo pré-treinado precisa da imagem já normalizada com base nas médias e desvios padrões do Imagenet.

Muita atenção que o `GrandparentSplitter` não faz o shuffle do dataset, então os itens ficam ordenados por ordem alfabética do `get_image_files`. O nosso `SimplesDataLoaders` faz o shuffle, mas no FastAI você precisa especificar isso. É uma fonte de erro onde irá aparecer uma descontinuidade no seu gráfico da loss (quando muda de uma classe para outra) porque a rede só vai aprender o viés. Experiência pessoal ao fazer este lab!

Se você quiser sentir, é só apagar a linha do `np.random.shuffle(self.train_idx)` lá no `SimplesDataLoaders` (rodar a célula lá novamente) e executar a célula abaixo.

In [None]:
normalize = Normalize.from_stats(
    mean = tensor([0.485, 0.456, 0.406]).reshape(1, 3, 1, 1),
    std = tensor([0.229, 0.224, 0.225]).reshape(1, 3, 1, 1), cuda=torch.cuda.is_available()
)

if RETRAIN:
    nosso_datablock = ImageClassDataBlock(
        get_image_files,
        GrandparentSplitter(train_name='train', valid_name='valid'),
        path_to_pil_img, 
        parent_label, 
        [Resize(192, method='squish'), ToTensor()],
        [IntToFloatTensor(), normalize] + aug_transforms())

    nosso_dataloaders = nosso_datablock.dataloaders(base_path, device=device)

Vamos definir a nossa rede neural e fazer a transferência dos pesos com a nova dimensão da saída desejada com a função que você implementou:

In [None]:
if RETRAIN:
    nossa_res18 = ResNet18()
    pesos = ResNet18_Weights.verify(ResNet18_Weights.IMAGENET1K_V1).get_state_dict("pretrained")

    transfer_weights_new_dim(nossa_res18, pesos, 2)

    nossa_res18.to(device)

E agora é só rodar! Perceba que o treino é bem 'roubado' porque no Imagenet já tem gato e cachorro.

In [None]:
history = []

if RETRAIN:
    
    history = fine_tune(nossa_res18, nosso_dataloaders, epochs=1)
    
    plt.plot([loss for train_history in history for loss in train_history[0]])
    plt.xlabel('Batches')
    plt.ylabel('Entropia Cruzada (nats)')

Perceba que parece que ficou lento, é verdade! E sabe onde ele está gastando mais tempo? É para carregar as imagens do disco. É por isso que as implementações padrões tanto no FastAI ou no PyTorch utilizam `multiprocessing`, processamento paralelo por meio de subprocessos (pois em Python as Threads não são paralelas por causa do GIL). Assim, enquanto o processo principal está treinando, há outros processos secundários lendo as imagens do disco.

Sinta-se a vontade para rodar mais ou ainda para avaliar a performance no dataset de test dos alunos, que está na pasta `test`.<sub><sup>Reza a lenda que os professores tem um outro dataset de teste escondido</sup></sub> Como acabou que não fiz nenhuma funcionalidade nova para teste, vamos fazer um dataloader normal como se fosse de treino só que o splitter vai ser 100% pra *treino*

In [None]:
if RETRAIN:
    teste_dataloader = ImageClassDataBlock(
        get_image_files,
        lambda items: (L(range(len(items))), L()),
        path_to_pil_img, 
        parent_label, 
        [Resize(192, method='squish'), ToTensor()],
        [IntToFloatTensor(), normalize] + aug_transforms()).dataloaders(base_path/'test').train

E aqui, um loop simples no nosso dataloader de teste para calcularmos os erros e os acertos. Nós escolhemos a classe por meio do `torch.argmax` que faz o contrário do one-hot encoding (ele encontra o argumento, o índice, que aponta para o maior valor).

In [None]:
acertos = 0
erros = 0
if RETRAIN:
    nossa_res18.eval()
    with torch.no_grad():
        for x, y in teste_dataloader:
            ŷ = nossa_res18(x)
            ŷ_igual_a_y = (torch.argmax(ŷ, dim=-1) == torch.argmax(y, dim=-1))
            acertos += torch.sum(ŷ_igual_a_y).item()
            erros += torch.sum(~ŷ_igual_a_y).item()
acertos, erros

Veja que é o elemento 0.7 o máximo abaixo, e ele tem índice 2 (os tensores são indexados do 0).

In [None]:
torch.argmax(tensor([0, 0.2, 0.7, 0.1]))

# Your data and feedback:

Write a feedback for the lab so we can make it better for the next years.

In the following variables, write the number of hours spent on this lab, the perceived difficulty, and the expected grade (you may delete the `raise` and the comments):

In [None]:
# meta_eval manual_graded_answer 0

horas_gastas = None    # 1.5   - Float number with the number of hours spent 
dificuldade_lab = None # 0     - Float number from 0.0 to 10.0 (inclusive)
nota_esperada = None   # 10    - Float number from 0.0 to 10.0 (inclusive)

# WRITE YOUR CODE HERE! (you can delete this comment, but do not delete this cell so the ID is not lost)
raise NotImplementedError()

Write below other comments or feedbacks about the lab. If you did not understand anything about the lab, please also comment here.

If you find any typo or bug in the lab, please comment below so we can fix it.

WRITE YOUR SOLUTION HERE! (do not change this first line):

**ATTENTION**

**ATTENTION**

**ATTENTION**

**ATTENTION**

**DISCURSIVE QUESTION**

WRITE YOUR ANSWER HERE (do not delete this cell so the ID is not lost)

**ATTENTION**

**ATTENTION**


**End of the lab!**