In [24]:
import torchvision.transforms as transforms
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import os
import pandas as pd
from PIL import Image
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from torch.nn.utils.rnn import pad_sequence
import matplotlib.pyplot as plt
import math
import copy

In [25]:
print(torch.__version__)
print(torch.cuda.is_available())

2.4.1+cu124
True


In [26]:
# Train, Test 데이터셋을 불러오는 클래스
def split_dataset(label_file, test_size=0.2, shuffle=True):

    # CSV 파일에서 이미지와 레이블 데이터를 불러옴
    labels = pd.read_csv(label_file)

    # train, test 데이터를 나눔
    train_labels, test_labels = train_test_split(labels, test_size=test_size, shuffle=shuffle)

    return train_labels, test_labels

In [27]:
# # 자/모음 vocabulary와 이를 index로 반환하는 함수
# jamo_vocabulary = ['ㅎ', 'ㅔ', 'ㅀ', 'ㅁ', 'ㅂ', 'ㅄ', 'ㅅ', 'ㅆ', 'ㅇ', 'ㅈ', 'ㅊ', 'ㅋ', 'ㅌ', 'ㅍ', 
#              'ㅕ', 'ㄲ', 'ㄳ', 'ㄴ', 'ㄵ', 'ㄶ', 'ㄷ', 'ㄹ', 'ㄺ', 'ㄻ', 'ㄼ', 'ㄽ', 'ㄾ', 'ㄿ', 
#              'ㅖ', 'ㄱ', 'ㅗ', 'ㅘ', 'ㅙ', 'ㅚ', 'ㅛ', 'ㅟ', 'ㅠ', 'ㅡ', 'ㅢ', 'ㅣ', 'ㅏ', 'ㅐ', 
#              'ㅃ', 'ㅑ', 'ㅒ', 'ㅓ', 'ㅝ', 'ㅞ', 'ㄸ', 'ㅜ', 'ㅉ', '<sos>', '<eos>', '<pad>']

# jamoindex = {word: i for i, word in enumerate(jamo_vocabulary)}

In [28]:
# 자/모음을 index로 변환하는 클래스
class jamo_to_index:
  def __init__(self):
    self.jamo_vocabulary = ['ㅎ', 'ㅔ', 'ㅀ', 'ㅁ', 'ㅂ', 'ㅄ', 'ㅅ', 'ㅆ', 'ㅇ', 'ㅈ', 'ㅊ', 'ㅋ', 'ㅌ', 'ㅍ', 
             'ㅕ', 'ㄲ', 'ㄳ', 'ㄴ', 'ㄵ', 'ㄶ', 'ㄷ', 'ㄹ', 'ㄺ', 'ㄻ', 'ㄼ', 'ㄽ', 'ㄾ', 'ㄿ', 
             'ㅖ', 'ㄱ', 'ㅗ', 'ㅘ', 'ㅙ', 'ㅚ', 'ㅛ', 'ㅟ', 'ㅠ', 'ㅡ', 'ㅢ', 'ㅣ', 'ㅏ', 'ㅐ', 
             'ㅃ', 'ㅑ', 'ㅒ', 'ㅓ', 'ㅝ', 'ㅞ', 'ㄸ', 'ㅜ', 'ㅉ', '<sos>', '<eos>', '<pad>']
    self.jamo_to_index = {word: i for i, word in enumerate(self.jamo_vocabulary)}

  def process(self, label):
    label_indices = [self.jamo_to_index[char] for char in label]

    src_target = [self.jamo_to_index['<sos>']] + label_indices
    src_target_tensor = torch.tensor(src_target, dtype=torch.long)

    tgt_target = label_indices + [self.jamo_to_index['<eos>']]
    tgt_target_tensor = torch.tensor(tgt_target, dtype=torch.long)

    return src_target_tensor, tgt_target_tensor
      

In [29]:
# print(jamo_to_index().process('ㅎㅔㄹ'))

In [30]:
# pad_sequence를 추가하는 함수
def collate_fn(batch, pad_idx):
  images, src_labels, tgt_labels = zip(*batch)

  images = torch.stack(images)

  src_labels_padded = pad_sequence(src_labels, batch_first=True, padding_value=pad_idx)
  tgt_labels_padded = pad_sequence(tgt_labels, batch_first=True, padding_value=pad_idx)

  return images, src_labels_padded, tgt_labels_padded

In [31]:
class HangulOCRDataset(Dataset):
  def __init__(self, img_dir, labels, transform = None, max_seq_len = 16, vocabulary = None):
    self.img_dir = img_dir
    self.labels = labels
    self.transform = transform
    self.vocabulary = jamo_to_index()

  def __len__(self):
    return len(self.labels)
  
  def __getitem__(self, idx):
    # 데이터셋에서 인덱스에 해당하는 이미지와 레이블을 불러옴
    img_name = os.path.join(self.img_dir, self.labels.iloc[idx, 0])
    image = Image.open(img_name)
    label = self.labels.iloc[idx, 1]

    src_label, tgt_label = self.vocabulary.process(label)

    if self.transform:
      image = self.transform(image)
    
    return image, src_label, tgt_label
  


In [32]:
# def make_pad_mask(sequence, pad_idx = 53):
#   # embedding하기 전 token index가 pad_idx인 경우 mask를 생성하는 함수

#   mask = sequence.ne(pad_idx).unsqueeze(1).unsqueeze(2)
#   mask = mask.repeat(1, 1, sequence.size(1), 1)

#   mask.requires_grad = False
#   return mask # (batch_size, 1, tgt_len, tgt_len)



In [33]:
# sequence = torch.randint(52, 54, (2, 16, ))
# print(sequence)
# print(make_pad_mask(sequence))


In [34]:
# def make_subsequent_mask(seq_len, pad_idx = 53):
#   # query는 target sequence, key는 source sequence

#   tril = np.tril(np.ones((seq_len, seq_len)), k=0).astype('uint8') # 0인 부분이 mask가 되어야 함
#   mask = torch.tensor(tril, dtype=torch.bool, requires_grad=False, device='cuda')
#   return mask

In [35]:
# print(make_subsequent_mask(16))

In [36]:
# def make_tgt_mask(self, tgt):
#   pad_mask = self.

In [37]:
class JamoEmbedding(nn.Module):
  def __init__(self, vocab_size = 54, embedding_dim = 512, pad_idx = None):
    super(JamoEmbedding, self).__init__()
    self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx = pad_idx)
    self.embedding_dim = embedding_dim
  def forward(self, x):
    return self.embedding(x) * math.sqrt(self.embedding_dim)

In [9]:
class CNNEncoder(nn.Module):
  def __init__(self):
    super(CNNEncoder, self).__init__()
    # input_size: [batch_size, 1, 360, 360]
    # output_size: [batch_size, 121, 512]

    self.conv1 = nn.Sequential(
      nn.Conv2d(1, 64, kernel_size=3, stride=1, padding=1),
      nn.BatchNorm2d(64),
      nn.ReLU(),
      nn.MaxPool2d(kernel_size=2, stride=2) # [batch_size, 64, 180, 180]
    )

    self.conv2 = nn.Sequential(
      nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
      nn.BatchNorm2d(128),
      nn.ReLU(),
      nn.MaxPool2d(kernel_size=2, stride=2) # [batch_size, 128, 90, 90]
    )

    self.conv3 = nn.Sequential(
      nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),
      nn.BatchNorm2d(256),
      nn.ReLU(),
      nn.MaxPool2d(kernel_size=2, stride=2) # [batch_size, 256, 45, 45]
    )

    self.conv4 = nn.Sequential(
      nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1),
      nn.BatchNorm2d(512),
      nn.ReLU(),
      nn.MaxPool2d(kernel_size=4, stride=4) # [batch_size, 512, 11, 11]
    )

  def flatten_feature_map(self, x):
    batch_size, channels, height, width = x.size()
    flattened_feature_map = x.view(batch_size, channels, height * width) # [batch_size, 512, 121]
    transposed_feature_map = flattened_feature_map.permute(0, 2, 1) # [batch_size, 121, 512]
    # 여기 learnable한 linear layer 추가 할지 말지는 나중에 결정

    return transposed_feature_map

  def forward(self, x):
    x = self.conv1(x)
    x = self.conv2(x)
    x = self.conv3(x)
    x = self.conv4(x)

    return self.flatten_feature_map(x) # [batch_size, 121, 512]


In [38]:
class PositionalFeedForward(nn.Module):
  def __init__(self, embedding_dim = 512, d_ff = 2048, dropout = 0.1):
    super(PositionalFeedForward, self).__init__()
    self.fc1 = nn.Linear(embedding_dim, d_ff) # (d_embed, d_ff)
    self.relu = nn.ReLU()
    self.fc2 = nn.Linear(d_ff, embedding_dim) # (d_ff, d_embed)
    self.dropout = nn.Dropout(dropout)

  def forward(self, x):
    out = x
    out = self.fc1(out)
    out = self.relu(out)
    out = self.dropout(out)
    out = self.fc2(out)
    return out

In [39]:
class Residual(nn.Module):
  def __init__(self):
    super(Residual, self).__init__()

  def forward(self, x, sub_layer):
    out = x
    out = sub_layer(out)
    out = out + x
    return out

In [40]:
class DecoderBlock(nn.Module):

  def __init__(self, self_attention, cross_attention, position_ff):
    super(DecoderBlock, self).__init__()
    self.self_attention = self_attention
    self.cross_attention = cross_attention
    self.position_ff = position_ff
    self.residuals = [Residual() for _ in range(3)]

  def forward(self, tgt, encoder_out, pad_mask, look_ahead_mask):
     out = tgt # [batch_size, seq_len, d_embed = 512]
     out = self.residuals[0](out, lambda out: self.self_attention(query = out, key = out, value = out, mask = (pad_mask & look_ahead_mask))) # tgt_mask는 make_subesequent_mask로 생성
     out = self.residuals[1](out, lambda out: self.cross_attention(query = out, key = encoder_out, value = encoder_out, pad_mask = pad_mask)) # make_subesequent_mask로 생성
     out = self.residuals[2](out, self.position_ff)

     return out

In [41]:
class Decoder(nn.Module):
  def __init__(self, embedding_dim, num_heads, num_layers):
    super(Decoder, self).__init__()

    self.embedding_dim = embedding_dim
    self.num_heads = num_heads
    self.num_layers = num_layers

    self.decoder_layer = DecoderBlock(
      
      self_attention = nn.MultiheadAttention(embed_dim = embedding_dim, num_heads = num_heads),
      cross_attention = nn.MultiheadAttention(embed_dim = embedding_dim, num_heads = num_heads),
      position_ff = PositionalFeedForward(embedding_dim = embedding_dim)
    )
    self.layers = nn.ModuleList([copy.deepcopy(self.decoder_layer) for _ in range(num_layers)]) # DecoderBlock 객체를 num_layers만큼 생성
    
  def forward(self, src, encoder_out, pad_mask, look_ahead_mask):
    out = src
    for layer in self.layers:
      out = layer(out, encoder_out, pad_mask, look_ahead_mask)

    return out # decoder block을 num_layers만큼 통과한 결과
  

In [42]:
class MultiHeadAttentionBlock(nn.Module):
  def __init__(self, embedding_dim, num_heads, dropout = 0.1):
    super(MultiHeadAttentionBlock, self).__init__()
    self.embedding_dim = embedding_dim
    self.num_heads = num_heads

    assert embedding_dim % num_heads == 0 # embedding_dim이 num_heads로 나누어 떨어져야 함, 우리는 일단 head 수를 1로 할 것임.

    self.head_dim = embedding_dim // num_heads

    self.q_fc = nn.Linear(embedding_dim, embedding_dim)
    self.k_fc = nn.Linear(embedding_dim, embedding_dim)
    self.v_fc = nn.Linear(embedding_dim, embedding_dim)

    self.fc_out = nn.Linear(embedding_dim, embedding_dim)

    self.dropout = nn.Dropout(dropout)

  @staticmethod
  def attention(self, query, key, value, mask, dropout : nn.Dropout):

    d_k = key.size(-1)
    
    scores = torch.matmul(query, key.transpose(-2, -1))
    scores = scores / math.sqrt(d_k)

    if mask is not None:
      scores = scores.masked_fill(mask == 0, -1e9) # 0인 부분이 mask, -inf로 채움

    scores = scores.softmax(dim = -1)

    if dropout is not None:
      scores = dropout(scores)

    return torch.matmul(scores, value)
  
  def forward(self, query, key, value, mask):

    # cross attention 할 때는 query는 decoder의 output, key와 value는 encoder의 output
    # self attention 할 때는 query, key, value가 모두 같음
    
    batch_size = query.shape[0]

    # query, key, value: [batch_size, seq_len, embedding_dim]
    
    def transform(x, fc):
      out = fc(x)
      out = out.view(batch_size, -1, self.num_heads, self.head_dim)
      out = out.transpose(1, 2)
      return out

    query = transform(query, self.q_fc)
    key = transform(key, self.k_fc)
    value = transform(value, self.v_fc)


    # Q: [batch_size, num_heads, seq_len, head_dim]
    # K: [batch_size, num_heads, seq_len, head_dim]
    # V: [batch_size, num_heads, seq_len, head_dim]

    output = self.attention(query, key, value, mask, self.dropout)

    # output: [batch_size, num_heads, seq_len, head_dim]

    output = output.permute(0, 2, 1, 3).contiguous()

    # output: [batch_size, seq_len, num_heads, head_dim]

    output = output.view(batch_size, -1, self.embedding_dim)

    # energy: [batch_size, seq_len, embedding_dim]

    return self.fc_out(output)

In [23]:
# def make_pad_mask(sequence, pad_idx = 53):
#         # sequence: [batch_size, seq_len]
#         # embedding하기 전 token index가 pad_idx인 경우 mask를 생성하는 함수
#         seq_len = sequence.size(1)

#         mask = (sequence != pad_idx).unsqueeze(1).unsqueeze(2)
#         mask.requires_grad = False

#         return mask # (batch_size, 1, 1, seq_len)

# sequence = torch.randint(52, 54, (2, 16,))
# print(sequence)
# print(make_pad_mask(sequence))

tensor([[53, 52, 52, 52, 52, 52, 52, 52, 52, 52, 52, 52, 52, 53, 52, 52],
        [53, 53, 53, 53, 52, 53, 52, 52, 53, 52, 53, 52, 53, 53, 53, 53]])
tensor([[[[False,  True,  True,  True,  True,  True,  True,  True,  True,  True,
            True,  True,  True, False,  True,  True]]],


        [[[False, False, False, False,  True, False,  True,  True, False,  True,
           False,  True, False, False, False, False]]]])


In [47]:
class HangulViT(nn.Module):
    def __init__(self, encoder, decoder, emb_size, nhead, vocab_size, num_decoder, dim_feedforward):
        super(HangulViT, self).__init__()
        self.encoder = encoder # Vision Transformer Encoder (나중에 구현 예정)
        self.decoder = decoder # Transformer Decoder
        self.num_decoder = num_decoder
        self.emb_size = emb_size
        self.nhead = 1
        self.embedding = JamoEmbedding(vocab_size = vocab_size, embedding_dim = emb_size, pad_idx = 53) # 각 토큰 embedding
        self.generator = nn.Linear(emb_size, vocab_size) # 자/모음 generation

    def decode(self, src, encoder_out, pad_mask, look_ahead_mask):
        return self.decoder(self.embedding(src), encoder_out, pad_mask, look_ahead_mask)

    def make_pad_mask(sequence, pad_idx = 53):
        # sequence: [batch_size, seq_len]
        # embedding하기 전 token index가 pad_idx인 경우 mask를 생성하는 함수
        seq_len = sequence.size(1)

        mask = (sequence != pad_idx).unsqueeze(1).unsqueeze(2)
        mask.requires_grad = False

        return mask # (batch_size, 1, 1, seq_len)
    


    def make_subsequent_mask(seq_len, pad_idx = 53):
        # look_ahead masking

        tril = np.tril(np.ones((seq_len, seq_len)), k=0).astype('uint8') # 0인 부분이 mask가 되어야 함
        mask = torch.tensor(tril, dtype=torch.bool, requires_grad=False, device='cuda')
        return mask
    
    def make_tgt_mask(self, tgt):
        pad_mask = self.make_pad_mask(tgt, tgt)
        seq_mask = self.make_subsequent_mask(tgt, tgt)
        mask = pad_mask & seq_mask
        return mask
    
    def forward(self, image, src):

        max_seq = src.size(1)
        look_ahead_mask = self.make_subsequent_mask(max_seq) # Look-ahead mask
        pad_mask = self.make_pad_mask(src) # attention 할때 사용할 mask

        src = self.embedding(src)
        # tgt = self.embedding(tgt)

        feature_map = self.encoder(image) # [batch_size, 121, 512]

        # q, k, v = feature_map, feature_map, feature_map

        decoder_out = self.decode(src, encoder_out = feature_map, pad_mask = pad_mask, look_ahead_mask = look_ahead_mask)
        out = self.generator(decoder_out) # [batch_size, 16, 54] : 마지막 dimension이 각 자/모음에 대한 확률값
        out = F.log_softmax(out, dim = -1)
        
        return out, decoder_out

In [48]:
model = HangulViT(encoder = CNNEncoder(), decoder = Decoder(embedding_dim=512, num_heads=1, num_layers=6), 
                  emb_size = 512, nhead = 1, vocab_size = 54, 
                  num_decoder = 6, dim_feedforward = 2048).to('cuda')

In [49]:
random_image = torch.randn(2, 1, 360, 360).to('cuda')
random_src = torch.randint(0, 54, (2, 16)).to('cuda')

try:
  output = model(random_image, random_src)
  print("성공")
  print("출력 크기: ", output.size())
except Exception as e:
  print("실패")
  print(e)

실패
'HangulViT' object cannot be interpreted as an integer


In [45]:
def display_image_batch(image_batch, rows=4, cols=8):
  fig, axes = plt.subplots(rows, cols, figsize=(cols * 2, rows * 2))
  for i, ax in enumerate(axes.flat):
    if i < len(image_batch):
      ax.imshow(np.asarray(image_batch[i].squeeze(0)))
      # ax.set_title(image_labels[i])
      ax.axis("off")
    else:
      ax.axis('off')

  plt.tight_layout()
  plt.show()

In [46]:
transform = transforms.Compose([
    # transforms.Resize((360, 360)),  # 이미지 크기 조정
    transforms.ToTensor(),  # 텐서로 변환
    transforms.Normalize((0.5,), (0.5,))  # 정규화
])

# CSV 파일 경로 및 이미지 폴더 설정
img_dir = 'D:/dataset/13.한국어글자체/01.손글씨/image'
label_file = 'D:/dataset/13.한국어글자체/01.손글씨/new_labels.csv'

# 데이터셋 분할
train_labels, test_labels = split_dataset(label_file, test_size=0.2, shuffle=True)

# train과 test 데이터셋 초기화
train_dataset = HangulOCRDataset(img_dir=img_dir, labels=train_labels, transform=transform)
test_dataset = HangulOCRDataset(img_dir=img_dir, labels=test_labels, transform=transform)

# DataLoader로 셔플 및 배치 설정
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=lambda batch: collate_fn(batch, pad_idx = 53))
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn=lambda batch: collate_fn(batch, pad_idx = 53))

# 예시: 첫 번째 배치 확인
images, src_labels, tgt_labels = next(iter(train_loader))
# display_image_batch(images)

# images, _ = next(iter(test_loader))
# display_image_batch(images)

print(images.shape)
# print(labels[0]) # label은 string 타입
# print(src_labels)
print(tgt_labels.shape)
print(src_labels.shape)
print(src_labels)
print(tgt_labels)


torch.Size([32, 1, 360, 360])
torch.Size([32, 10])
torch.Size([32, 10])
tensor([[51, 12, 33, 29, 37, 17, 53, 53, 53, 53],
        [51,  6, 36,  2, 53, 53, 53, 53, 53, 53],
        [51, 12,  1,  2, 53, 53, 53, 53, 53, 53],
        [51,  4, 40, 29, 20, 40, 53, 53, 53, 53],
        [51, 17, 47, 18, 53, 53, 53, 53, 53, 53],
        [51,  9, 30,  3, 39, 21, 34, 53, 53, 53],
        [51, 10, 34, 16, 53, 53, 53, 53, 53, 53],
        [51,  8, 38, 10, 53, 53, 53, 53, 53, 53],
        [51,  3, 38, 12, 53, 53, 53, 53, 53, 53],
        [51,  6, 40,  3, 30, 17, 39,  3, 53, 53],
        [51,  8, 40,  3, 49, 12, 37, 17, 53, 53],
        [51,  3, 40, 10, 39,  3, 17, 41, 53, 53],
        [51, 48, 44, 15, 53, 53, 53, 53, 53, 53],
        [51, 11, 31, 10, 53, 53, 53, 53, 53, 53],
        [51,  9,  1, 21, 53, 53, 53, 53, 53, 53],
        [51, 12, 43, 20, 53, 53, 53, 53, 53, 53],
        [51, 15, 40,  5, 53, 53, 53, 53, 53, 53],
        [51, 11, 33, 26, 53, 53, 53, 53, 53, 53],
        [51, 29, 39, 23, 53,

In [43]:
def train(model, data_loader, optimizer, criterion, epoch, checkpoint_dir):
  model.train()
  epoch_loss = 0

  for i, (images, src_labels, tgt_labels) in enumerate(data_loader):
    images, src_labels, tgt_labels = images.to(device), src_labels.to(device), tgt_labels.to(device)

    optimizer.zero_grad()

    output = model(images, src_labels, tgt_labels)

    output_dim = output.shape[-1]

    output = output.contiguous().view(-1, output_dim) # model의 예측 output
    tgt_labels = tgt_labels.contiguous().view(-1) # 실제 정답

    loss = criterion(output, tgt_labels)

    loss.backward()
    nn.utils.clip_grad_norm_(model.parameters(), max_norm=1)
    optimizer.step()

    epoch_loss += loss.item()

  num_samples = idx + 1

  if checkpoint_dir:
    os.makedirs(checkpoint_dir, exist_ok=True)
    checkpoint_file = os.path.join(checkpoint_dir, f'epoch_{epoch}.pt')
    torch.save({
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'loss': epoch_loss / num_samples
    }, checkpoint_file)

  return epoch_loss / num_samples

In [44]:
model = Transformer

NameError: name 'mod' is not defined