# Load Dataset

# Forward Pass ViT Model with a Toy Image

In [1]:
import torch  
import torch.nn as nn  
  
# Create toy image with dim (batch x channel x width x height)  
toy_img = torch.rand(1, 3, 48, 48)  
# Define conv layer parameters  
num_channels = 3  
hidden_size = 768 #or emb_dimension  
patch_size = 16  
  
# Conv 2D layer  
projection = nn.Conv2d(num_channels, hidden_size, kernel_size=patch_size,  
stride=patch_size)  
  
# Forward pass toy img  
out_projection = projection(toy_img)  
  
print(f'Original image size: {toy_img.size()}')  
print(f'Size after projection: {out_projection.size()}')  

Original image size: torch.Size([1, 3, 48, 48])
Size after projection: torch.Size([1, 768, 3, 3])


In [2]:
patch_embeddings = out_projection.flatten(2).transpose(1, 2)  
print(f'Patch embedding size: {patch_embeddings.size()}')  

Patch embedding size: torch.Size([1, 9, 768])


In [3]:
import numpy as np

a = np.array([1, 2, 3])
b = np.array([2, 3, 4])
c = np.stack((a, b), axis=0)
c

array([[1, 2, 3],
       [2, 3, 4]])

In [30]:
# Define [CLS] token embedding with the same emb dimension as the patches
batch_size = 1
cls_token = nn.Parameter(torch.randn(1, 1, hidden_size))
cls_tokens = cls_token.expand(batch_size, -1, -1)

# Prepend [CLS] token in the beginning of patch embedding
patch_embeddings = torch.cat((cls_tokens, patch_embeddings), dim=1)
print(f'Patch embedding size: {patch_embeddings.size()}')

Patch embedding size: torch.Size([1, 153, 768])


In [31]:
# Define position embedding with the same dimension as the patch embedding
position_embeddings = nn.Parameter(torch.randn(batch_size, 10, hidden_size))

# Add position embedding into patch embedding
input_embeddings = patch_embeddings + position_embeddings
print(f'Input embedding size: {input_embeddings.size()}')

RuntimeError: The size of tensor a (153) must match the size of tensor b (151) at non-singleton dimension 1

In [28]:
# Define parameters for ViT-base (example)
num_heads = 12
num_layers = 12

# Define Transformer encoders' stack
transformer_encoder_layer = nn.TransformerEncoderLayer(
           d_model=hidden_size, nhead=num_heads,
           dim_feedforward=int(hidden_size * 4),
           dropout=0.1)
transformer_encoder = nn.TransformerEncoder(
           encoder_layer=transformer_encoder_layer,
           num_layers=num_layers)

# Forward pass
output_embeddings = transformer_encoder(input_embeddings)
print(f' Output embedding size: {output_embeddings.size()}')
out_layer = nn.Linear(hidden_size, 3)
print(out_layer(output_embeddings[0][0].unsqueeze(dim = 0)))

 Output embedding size: torch.Size([1, 151, 768])
tensor([[-0.6926, -0.3719,  0.6378]], grad_fn=<AddmmBackward0>)


In [20]:
!pip install transformers

from transformers import ViTModel

# Load pretrained model
model_checkpoint = '/work/csl/code/piece/model/vit-base-patch16-224-in21k'
model = ViTModel.from_pretrained(model_checkpoint, add_pooling_layer=False)

# Example input image
input_img = torch.rand(batch_size, num_channels, 224, 224)

# Forward pass input image
output_embedding = model(input_img)
print(output_embedding)
print(f"Ouput embedding size: {output_embedding['last_hidden_state'].size()}")

Looking in indexes: https://pypi.org/simple, https://pypi.ngc.nvidia.com


HFValidationError: Repo id must be in the form 'repo_name' or 'namespace/repo_name': '/work/csl/code/piece/model/vit-base-patch16-224-in21k'. Use `repo_type` argument if needed.

In [18]:
num_labels = 2

# Define linear classifier layer
classifier = nn.Linear(hidden_size, num_labels) 

# Forward pass on the output embedding of [CLS] token
output_classification = classifier(output_embedding['last_hidden_state'][:, 0, :])
print(f"Output embedding size: {output_classification.size()}")

NameError: name 'output_embedding' is not defined

# Fine Tuning ViT Impelementation

In [1]:
import numpy as np
import torch
import glob, os, sys
import cv2
import torch.nn as nn
import torch.nn.functional as F
from transformers import ViTModel, ViTConfig
from torchvision import transforms
from torch.optim import Adam
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm


#Pretrained model checkpoint
model_checkpoint = '/work/csl/code/piece/model/vit-base-patch16-224-in21k'

In [2]:
def resizeImage(original_image_filename, width=224, height=224):
    img = cv2.imread(original_image_filename)
    resized_img = cv2.resize(img, (width, height))
    return resized_img

class CustomDataset(Dataset):
    def __init__(self, root_folder, transform=None):
        self.root_folder = root_folder
        self.transform = transform
        self.data = self.load_data()

    def load_data(self):
        data = []
        idx = 0
        max_l = 50000
        training_data_paths = glob.glob(os.path.join(self.root_folder, "*"))
        for path in tqdm(training_data_paths):
            if idx > max_l: break
            idx += 1
            label_path = os.path.join(path, 'target.txt')
            image_path = os.path.join(path, 'state.png')

            with open(label_path) as f:
                for line in f:
                    line = line.rstrip().split()
                    label = line[1]
            data.append((image_path, label))

        return data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_path, label = self.data[idx]
        img = resizeImage(img_path)

        if self.transform:
            img = self.transform(img)

        return img, label


train_dataset_path = "dataset/training_dataset/std"
# train_dataset = CustomDataset(train_dataset_path)
# train_dataloader = DataLoader(train_dataset, num_workers=10, batch_size=8, shuffle=True)
# for train_image, train_label in tqdm(train_dataloader):
#     print(train_label)

In [3]:
num_labels = 2
class ViT(nn.Module):

  def __init__(self, config=ViTConfig(), num_labels=2, 
               model_checkpoint=model_checkpoint):

        super(ViT, self).__init__()

        self.vit = ViTModel.from_pretrained(model_checkpoint, add_pooling_layer=False)
        self.classifier = (
            nn.Linear(config.hidden_size, num_labels) 
        )

  def forward(self, x):

    x = self.vit(x)['last_hidden_state']
    # Use the embedding of [CLS] token
    output = self.classifier(x[:, 0, :])

    return output

In [4]:
# Hyperparameters
EPOCHS = 10
LEARNING_RATE = 1e-4
BATCH_SIZE = 128
# Load batch image
train_dataset = CustomDataset(train_dataset_path)
train_dataloader = DataLoader(train_dataset, num_workers=32, batch_size=BATCH_SIZE, shuffle=True)
print("load train_dataset ")
# Train the model

 34%|██████████████████████▏                                          | 50001/146340 [00:01<00:02, 42518.78it/s]

load train_dataset 





In [5]:
from torch.utils.tensorboard import SummaryWriter  
import shutil
tensorboard_dir = "./log"
if os.path.exists(tensorboard_dir):
    # 删除目录及其内容
    try:
        shutil.rmtree(tensorboard_dir)
        print(f"目录 '{tensorboard_dir}' 已成功删除。")
    except OSError as e:
        print(f"删除目录时发生错误: {e}")
else:
    print(f"目录 '{tensorboard_dir}' 不存在。")
    
os.makedirs(tensorboard_dir, exist_ok=True)
save_path = "checkpoints/vit/"
def model_train(train_dataloader, epochs, learning_rate, bs):
    writer = SummaryWriter(tensorboard_dir)
    use_cuda = torch.cuda.is_available()
    device = torch.device("cuda" if use_cuda else "cpu")
    print(f"using device {device}")
    # Load nodel, loss function, and optimizer
    model = ViT().to(device)
    criterion = nn.CrossEntropyLoss().to(device)
    optimizer = Adam(model.parameters(), lr=learning_rate)

    idx = 0
   
    # Fine tuning loop
    for i in range(epochs):
        total_acc_train = 0
        total_loss_train = 0.0
        
        for train_image, train_label in tqdm(train_dataloader):
            train_image_permuted = train_image.permute(0, 3, 1, 2)
            output = model(train_image_permuted.to(device))
            loss = criterion(output, train_label.to(device))
            acc = (output.argmax(dim=1) == train_label.to(device)).sum().item()
            total_acc_train += acc
            total_loss_train += loss.item()
            
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()
            writer.add_scalar('Loss', loss.item() / bs, idx)
            
            idx += 1

        print(f'Epochs: {i + 1} | Loss: {total_loss_train / len(train_dataset): .6f} | Accuracy: {total_acc_train / len(train_dataset): .5f}')
        writer.add_scalar('Accuracy', total_acc_train / len(train_dataset), i)
        if (i + 1) % 5 == 0:
            torch.save(model, f'{save_path}new_model_epochs{i+1}.pth')
    return model


trained_model = model_train(train_dataloader, EPOCHS, LEARNING_RATE, BATCH_SIZE)

目录 './log' 已成功删除。
using device cuda


Some weights of the model checkpoint at /work/csl/code/piece/model/vit-base-patch16-224-in21k were not used when initializing ViTModel: ['pooler.dense.bias', 'pooler.dense.weight']
- This IS expected if you are initializing ViTModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing ViTModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
100%|█████████████████████████████████████████████████████████████████████████| 391/391 [03:03<00:00,  2.13it/s]


Epochs: 1 | Loss:  0.002135 | Accuracy:  0.87446


100%|█████████████████████████████████████████████████████████████████████████| 391/391 [03:02<00:00,  2.14it/s]


Epochs: 2 | Loss:  0.001172 | Accuracy:  0.94130


100%|█████████████████████████████████████████████████████████████████████████| 391/391 [03:02<00:00,  2.14it/s]


Epochs: 3 | Loss:  0.000814 | Accuracy:  0.96186


100%|█████████████████████████████████████████████████████████████████████████| 391/391 [03:02<00:00,  2.14it/s]


Epochs: 4 | Loss:  0.000602 | Accuracy:  0.97308


100%|█████████████████████████████████████████████████████████████████████████| 391/391 [03:02<00:00,  2.14it/s]


Epochs: 5 | Loss:  0.000391 | Accuracy:  0.98252


100%|█████████████████████████████████████████████████████████████████████████| 391/391 [03:02<00:00,  2.14it/s]


Epochs: 6 | Loss:  0.000311 | Accuracy:  0.98596


100%|█████████████████████████████████████████████████████████████████████████| 391/391 [03:02<00:00,  2.14it/s]


Epochs: 7 | Loss:  0.000226 | Accuracy:  0.99040


100%|█████████████████████████████████████████████████████████████████████████| 391/391 [03:02<00:00,  2.14it/s]


Epochs: 8 | Loss:  0.000168 | Accuracy:  0.99258


100%|█████████████████████████████████████████████████████████████████████████| 391/391 [03:02<00:00,  2.14it/s]


Epochs: 9 | Loss:  0.000169 | Accuracy:  0.99264


100%|█████████████████████████████████████████████████████████████████████████| 391/391 [03:02<00:00,  2.14it/s]


Epochs: 10 | Loss:  0.000131 | Accuracy:  0.99420


# Model Prediction

In [13]:
import torch.nn.functional as F
def predict(img_path):

    use_cuda = torch.cuda.is_available()
    device = torch.device("cuda" if use_cuda else "cpu")
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Resize((224, 224)),
        transforms.Normalize(mean=[0.5, 0.5, 0.5], 
                             std=[0.5, 0.5, 0.5])
        ])
    
    img = resizeImage(img_path)
    torch_tensor = torch.from_numpy(img)
    output = trained_model(torch_tensor.unsqueeze(0).permute(0, 3, 1, 2).to(device))
    y = F.softmax(output)

    return y

In [15]:
input_img = torch.rand(1, 3, 224, 224)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
img_path = os.path.join(train_dataset_path, "59", "state.png")
predict(img_path)
    

tensor([[9.9959e-01, 4.1004e-04]], device='cuda:0', grad_fn=<SoftmaxBackward0>)

cookie
