# Clone config.py script
* 클론하는 리포지토리의 config.py script
* 즉, CFG로 작성된 부분은 아래 값들을 확인하여 수정할 것
* 작성자가 주피터가 아니라 파이썬 스크립트로 작성해서 그럼
```
import torch

debug = True
image_path = "C:/Moein/AI/Datasets/Flicker-8k/Images"
captions_path = "C:/Moein/AI/Datasets/Flicker-8k"
batch_size = 8
num_workers = 0
lr = 1e-3
weight_decay = 1e-3
patience = 2
factor = 0.5
epochs = 5
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model_name = 'resnet50'
image_embedding = 2048
text_encoder_model = "distilbert-base-uncased"
text_embedding = 768
text_tokenizer = "distilbert-base-uncased"
max_length = 200

pretrained = False # for both image encoder and text encoder
trainable = False # for both image encoder and text encoder
temperature = 1.0

# image size
size = 224

# for projection head; used for both image and text encoders
num_projection_layers = 1
projection_dim = 256 
dropout = 0.1
```

# Albumentations
* Image Augmentation 라이브러리, 초당 처리 속도가 torchvision보다 빠른 것으로 증명됨.

In [1]:
import torch
from torch import nn
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import os
import cv2
import numpy as np
os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE'

In [2]:
import albumentations as A

# To CLIP
* image와 text 둘 다 encoding을 해야한다.
* Text Encoding = DistilBERT (BERT의 작은 사이즈지만 성능은 그에 준함)
* * *
### 추가적으로 알게된 사항 (Python's Dictionary <code>.items()</code>)
* <code>.items()</code> -> 딕셔너리에 있는 key와 value를 리턴한다.
* Reference: https://wikidocs.net/16

In [3]:
class CLIPDataset(Dataset):
    def __init__(self, image_filenames, captions, tokenizer, transforms):

        self.image_filenames = image_filenames
        self.captions = list(captions)
        # Dataset 선언을 통해 Tokenizer Object를 input으로 받는다.
        self.encoded_captions = tokenizer(
            list(captions), padding=True, truncation=True, max_length=200
        )
        self.transforms = transforms
        
    def __getitem__(self, idx):
        item = {
            key: torch.tensor(values[idx])
            for key, values in self.encoded_captions.items()
        }
        image = cv2.imread(f'F:/Doby/CLIP/Flickr8k/Images/{self.image_filenames[idx]}')
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image = self.transforms(image=image)['image']
        
        item['image'] = torch.tensor(image).permute(2, 0, 1).float()
        item['caption'] = self.captions[idx]

        return item
        
    def __len__(self):
        return len(self.captions)

In [4]:
def get_transforms(mode='train'):
    # Train이나 Test나 전처리 같음
    # 따로 Augmentation은 하지 않음
    if mode == 'train':
        return A.Compose(
            [
                A.Resize(224, 224, always_apply=True),
                A.Normalize(max_pixel_value=255.0, always_apply=True),
            ]
        )
    else:
        return A.Compose(
            [
                A.Resize(224, 224, always_apply=True),
                A.Normalize(max_pixel_value=255.0, always_apply=True),
            ]
        )

# Image Encoder
* image encoder는 ResNet50을 사용한다.
* 클론하는 페이지의 ResNet은 Pre-trained도 없고, 그렇다고 해서 학습하지도 않는다.
* 여기서는 ImageNet1K 사전학습한 것을 사용하도록 한다. 그리고, 모델 프리징을 시킨다.
* 또한, 페이지에서는 AdaptiveAvgPooling까지 써서 출력 vector의 사이즈가 2048이 되도록한다.

In [5]:
from torchvision.models import resnet50

class ImageEncoder(nn.Module):
    def __init__(self):
        super().__init__()
        self.model = resnet50(weights='IMAGENET1K_V1')
        self.model = nn.Sequential(*list(self.model.children())[:-1])
        self.flatten = nn.Flatten() # Image output Vector size = 2,048
        for param in self.model.parameters():
            param.requires_grad = False

    def forward(self, x):
        x = self.model(x)
        x = self.flatten(x)
        return x

In [6]:
from torchinfo import summary
# Attuibute Error 'int' has no attribute 'numpy' 때문에 torchinfo 사용함
# torchinfo가 상위 버전인가

ie = ImageEncoder()
summary(ie, (4, 3, 224, 224))

Layer (type:depth-idx)                        Output Shape              Param #
ImageEncoder                                  [4, 2048]                 --
├─Sequential: 1-1                             [4, 2048, 1, 1]           --
│    └─Conv2d: 2-1                            [4, 64, 112, 112]         (9,408)
│    └─BatchNorm2d: 2-2                       [4, 64, 112, 112]         (128)
│    └─ReLU: 2-3                              [4, 64, 112, 112]         --
│    └─MaxPool2d: 2-4                         [4, 64, 56, 56]           --
│    └─Sequential: 2-5                        [4, 256, 56, 56]          --
│    │    └─Bottleneck: 3-1                   [4, 256, 56, 56]          (75,008)
│    │    └─Bottleneck: 3-2                   [4, 256, 56, 56]          (70,400)
│    │    └─Bottleneck: 3-3                   [4, 256, 56, 56]          (70,400)
│    └─Sequential: 2-6                        [4, 512, 28, 28]          --
│    │    └─Bottleneck: 3-4                   [4, 512, 28, 28]       

# Text Encoder
* <a href="https://huggingface.co/transformers/v3.0.2/model_doc/distilbert.html"><code>DistilBERT</code></a>

In [7]:
from transformers import DistilBertModel, DistilBertConfig

class TextEncoder(nn.Module):
    def __init__(self):
        super().__init__()
        self.model = DistilBertModel.from_pretrained('distilbert-base-uncased')

        for param in self.model.parameters():
            param.requires_grad = False

        self.target_token_idx = 0

    def forward(self, input_ids, attention_mask):
        output = self.model(input_ids=input_ids, attention_mask=attention_mask)
        last_hidden_state = output.last_hidden_state
        return last_hidden_state[:, self.target_token_idx, :]

# Projection Head

In [8]:
class ProjectionHead(nn.Module):
    def __init__(self, embedding_dim):
        super().__init__()
        self.projection = nn.Linear(embedding_dim, 256)
        self.gelu = nn.GELU()
        self.fc = nn.Linear(256, 256)
        self.dropout = nn.Dropout(0.1)
        self.layer_norm = nn.LayerNorm(256)
    
    def forward(self, x):
        projected = self.projection(x)
        x = self.gelu(projected)
        x = self.fc(x)
        x = self.dropout(x)
        x = x + projected
        x = self.layer_norm(x)
        return x

# CLIP Model
* CrossEntropy를 굳이 구현해서 사용하는 이유는?
1. 매우 드물지만, 한 Batch 안에 동일한 이미지가 들어갈 수도 있기 때문이다.
2. 음... 직접 구현하면서 좀 배우길 바랬다고 한다.

In [9]:
import torch.nn.functional as F

In [10]:
class CLIPModel(nn.Module):
    def __init__(self, temperature, image_embedding, text_embedding):
        super().__init__()
        
        self.image_encoder = ImageEncoder()
        self.text_encoder = TextEncoder()
        self.image_projection = ProjectionHead(embedding_dim=image_embedding)
        self.text_projection = ProjectionHead(embedding_dim=text_embedding)
        self.temperature = temperature

    def forward(self, batch):
        # Get Image, Text features through ResNet50, DistilBERT
        image_features = self.image_encoder(batch['image'])
        text_features = self.text_encoder(
            input_ids=batch['input_ids'],
            attention_mask=batch['attention_mask']
        )

        # Same DIMENSION EMBEDDING
        image_embeddings = self.image_projection(image_features)
        text_embeddings = self.text_projection(text_features)

        logits = (text_embeddings @ image_embeddings.T) / self.temperature
        images_similarity = image_embeddings @ image_embeddings.T
        texts_similarity = text_embeddings @ text_embeddings.T

        targets = F.softmax(
            (images_similarity + texts_similarity) / 2.0 * self.temperature, dim=-1
        )

        texts_loss = self.cross_entropy(logits, targets, reduction='none')
        images_loss = self.cross_entropy(logits.T, targets.T, reduction='none')
        loss = (images_loss + texts_loss) / 2.0
        
        return loss.mean()

    def cross_entropy(self, preds, targets, reduction='none'):
        log_softmax = nn.LogSoftmax(dim=-1)
        loss = (-targets * log_softmax(preds)).sum(1)
        if reduction == 'none':
            return loss
        elif reduction == 'mean':
            return loss.mean()

# Check How Embedding Works!
* Similarity 계산해서 어떻게 작동하는지 확인
* <code>F.softmax</code>의 결과가 1., 0.이라 해서 1, 0이 아니라 극단적이라 그렇게 보일 뿐, 소수점 아래 숫자 있음
* <code>b_out</code>의 결과를 보면 납득이 된다.
* <b>즉, <u>CLIP의 역할</u>은 <u>Image Embedding과 Text Embedding의 Similarity를 Identity Matrix에 가깝게 학습</u>하는 것!!</b>
* 그러면 여기서 궁금한 게 있다 왜 굳이 Image Embedding, Text Embedding의 Similarity를 계산해서 Identity Matrix를 만드는 거지?
* 완전한 Identity Matrix가 요구되는 것이 아니기 때문에 (위에서 1.0, 0.0이 아니었듯이) 그런 듯 하다.

In [11]:
batch_size = 4
dim = 256
a = torch.randn(batch_size, dim)
b = torch.randn(batch_size, dim)

a_out = a @ a.T
b_out = a @ b.T

print('===============a_out===============') #Best Case
print(a_out)
print(F.softmax(a_out, dim=-1))
print('===============b_out===============')
print(b_out)
print(F.softmax(b_out, dim=-1))

tensor([[262.2000,  -9.8797,  15.3062,   9.6773],
        [ -9.8797, 230.2375,  14.4797, -32.6931],
        [ 15.3062,  14.4797, 265.3638,  16.0985],
        [  9.6773, -32.6931,  16.0985, 231.0343]])
tensor([[1., 0., 0., 0.],
        [0., 1., 0., 0.],
        [0., 0., 1., 0.],
        [0., 0., 0., 1.]])
tensor([[ 18.9918,  22.6922,  11.8289, -25.1275],
        [ 10.1902,   5.1853, -14.3637,  10.6142],
        [-29.9872, -18.0869,  21.0406,   8.1824],
        [ -5.5496, -11.6104,   0.5162,  -2.8999]])
tensor([[2.4117e-02, 9.7586e-01, 1.8686e-05, 1.6655e-21],
        [3.9451e-01, 2.6451e-03, 8.5594e-12, 6.0284e-01],
        [6.9012e-23, 1.0167e-17, 1.0000e+00, 2.6047e-06],
        [2.2422e-03, 5.2297e-06, 9.6603e-01, 3.1725e-02]])


# Train

In [12]:
captions_path = 'F:\Doby\CLIP\Flickr8k\captions.txt'

In [13]:
import pandas as pd

captions = pd.read_csv(captions_path, sep=',')

In [14]:
captions = captions.reset_index()
captions.rename(columns={'index': 'id'}, inplace=True)
captions

Unnamed: 0,id,image,caption
0,0,1000268201_693b08cb0e.jpg,A child in a pink dress is climbing up a set o...
1,1,1000268201_693b08cb0e.jpg,A girl going into a wooden building .
2,2,1000268201_693b08cb0e.jpg,A little girl climbing into a wooden playhouse .
3,3,1000268201_693b08cb0e.jpg,A little girl climbing the stairs to her playh...
4,4,1000268201_693b08cb0e.jpg,A little girl in a pink dress going into a woo...
...,...,...,...
40450,40450,997722733_0cb5439472.jpg,A man in a pink shirt climbs a rock face
40451,40451,997722733_0cb5439472.jpg,A man is rock climbing high in the air .
40452,40452,997722733_0cb5439472.jpg,A person in a red shirt climbing up a rock fac...
40453,40453,997722733_0cb5439472.jpg,A rock climber in a red shirt .


In [15]:
def make_train_valid_dfs(df):
    max_id = df['id'].max() + 1
    image_ids = np.arange(0, max_id)
    np.random.seed(42)
    valid_ids = np.random.choice(
        image_ids, size=int(0.2 * len(image_ids)), replace=False
    )
    train_ids = [id_ for id_ in image_ids if id_ not in valid_ids]
    train_df = df[df['id'].isin(train_ids)]
    train_df = train_df.drop(columns=['id'])
    valid_df = df[df['id'].isin(valid_ids)]
    valid_df = valid_df.drop(columns=['id'])
    return train_df, valid_df

In [16]:
train_df, valid_df = make_train_valid_dfs(captions)

In [17]:
def build_loaders(df, tokenizer, mode):
    transforms = get_transforms(mode=mode)
    dataset = CLIPDataset(
        df['image'].values,
        df['caption'].values,
        tokenizer=tokenizer,
        transforms=transforms
    )
    dataloader = DataLoader(
        dataset,
        batch_size=8,
        num_workers=0,
        shuffle=True if mode == 'train' else False,
    )
    return dataloader

In [18]:
from transformers import DistilBertTokenizer

tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased')
train_loader = build_loaders(train_df, tokenizer, 'train')
valid_loader = build_loaders(valid_df, tokenizer, 'valid')

In [19]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

model = CLIPModel(temperature=1.0, image_embedding=2048, text_embedding=768).to(device)
params = [
    {'params': model.image_encoder.parameters(),
     'params': model.text_encoder.parameters(),
     'params': model.image_projection.parameters(), 
     'params': model.text_projection.parameters()
     }
]
optimizer = torch.optim.AdamW(params, weight_decay=0.)

Some weights of the model checkpoint at distilbert-base-uncased were not used when initializing DistilBertModel: ['vocab_projector.bias', 'vocab_transform.bias', 'vocab_layer_norm.bias', 'vocab_transform.weight', 'vocab_layer_norm.weight']
- This IS expected if you are initializing DistilBertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing DistilBertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [22]:
def train(model, data_loader, optimizer, epoch):
    n_data = 0
    running_loss = 0.
    for batch_idx, batch in enumerate(data_loader, start=1):
        batch = {k: v.to(device) for k, v in batch.items() if k != 'caption'}
        loss = model(batch)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        n_data += 8
        running_loss += loss
        
        print(f'\rTrain Epoch: {epoch} [{n_data}/{len(data_loader.dataset)} ({100 * batch_idx / len(data_loader):.2f}%)]  Loss: {running_loss/batch_idx:.4f}', end='')

In [23]:
for epoch in range(1, 3+1):
    train(model, train_loader, optimizer, epoch)
    print()



In [24]:
torch.save(model, './CLIPModel.pt')

In [25]:
torch.save(model.state_dict(), './CLIPModelDict.pt')