### **PyTorch로 GPT 구현**

#### data_utils.py
	•	텍스트 코퍼스를 읽어 학습용 입력 예시로 변환함
	•	토크나이저를 이용해 문장을 토큰화하고 BOS, EOS, PAD 토큰 추가
	•	토큰을 ID로 변환해 input_ids 생성
	•	사전학습 모드에서는 텍스트만, 미세조정 모드에서는 텍스트와 라벨 쌍 처리
	•	미세조정 모드의 학습 데이터 라벨은 딕셔너리로 매핑하여 ID 부여
	•	변환된 결과는 캐시 파일로 저장하거나 기존 캐시에서 불러옴
	•	최종적으로 PyTorch TensorDataset 형태로 변환해 반환함

In [5]:
from typing import Iterable, Union, List
from pathlib import Path
import json

import torch
import torch.distributed as dist
from torch.utils.data import TensorDataset

class PretrainInputExample:
    def __init__(self, text: str):
        self.text = text

class ClsInputExample:
    def __init__(self, text: str, label: str):
        self.text = text
        self.label = label

class PretrainInputFeatures:
    def __init__(self, input_ids: List[int]):
        self.input_ids = input_ids

class ClsInputFeatures:
    def __init__(self, input_ids: List[int], label_id: int):
        self.input_ids = input_ids
        self.label_id = label_id

def convert_examples_to_features(examples,
                                 tokenizer,
                                 args,
                                 mode):
    bos_token = tokenizer.bos_token
    eos_token = tokenizer.eos_token
    pad_token = tokenizer.pad_token
    if args.finetune:
        if mode == 'train':
            labels = sorted(list(set([example.label for example in examples])))
            label_dict = {label: i for i, label in enumerate(labels)}
            with open(args.cached_label_dict, 'w') as file:
                json.dump(label_dict, file,  indent=4)
        elif mode == 'test':
            with open(args.cached_label_dict, 'r') as file:
                label_dict = json.load(file)

    features = []
    for i, example in enumerate(examples):
        tokens = tokenizer.tokenize(example.text)
        tokens = [bos_token] + tokens[:args.max_seq_len-2] + [eos_token] # BOS, EOS
        tokens += [pad_token] * (args.max_seq_len - len(tokens))

        input_ids = tokenizer.convert_tokens_to_ids(tokens)
        if args.finetune:
            label_id = label_dict.get(example.label)

        if args.pretrain:
            feature = PretrainInputFeatures(input_ids)
        elif args.finetune:
            feature = ClsInputFeatures(input_ids, label_id)

        features.append(feature)

    return features

def create_examples(args, tokenizer, mode='train'):
    if args.local_rank not in [-1, 0]:
        dist.barrier()
    assert mode in ('train', 'test')
    cached_features_file = Path('cached_features_{}_{}_{}'.format('pretrain' if args.pretrain else 'finetune', mode, args.max_seq_len))

    if cached_features_file.exists():
        print('Loading features from cached file', cached_features_file)
        features = torch.load(cached_features_file)
    else:
        corpus_path = args.train_corpus if mode=='train' else args.test_corpus
        with open(corpus_path, 'r', encoding='utf-8') as reader:
            corpus = reader.readlines()

        if args.pretrain:
            corpus = list(map(lambda x: x.strip(), corpus))
            corpus = list(filter(lambda x: len(x) > 0, corpus))
            examples = [PretrainInputExample(text) for text in corpus]
        elif args.finetune:
            corpus = list(map(lambda x: x.split('\t'), corpus))
            corpus = list(map(lambda x: list(map(lambda y: y.strip(), x)), corpus))
            corpus = list(map(lambda x: list(filter(lambda y: len(y) > 0, x)), corpus))
            examples = [ClsInputExample(text, label) for label, text in corpus]

        features = convert_examples_to_features(examples, tokenizer, args, mode)

        print('Saving features into cached file', cached_features_file)
        torch.save(features, cached_features_file)

    if args.local_rank == 0:
        dist.barrier()

    if args.pretrain:
        all_input_ids = torch.tensor([feature.input_ids for feature in features], dtype=torch.long)
        dataset = TensorDataset(all_input_ids)
    elif args.finetune:
        all_input_ids = torch.tensor([feature.input_ids for feature in features], dtype=torch.long)
        all_label_ids = torch.tensor([feature.label_id for feature in features], dtype=torch.long)
        dataset = TensorDataset(all_input_ids, all_label_ids)

    return dataset

#### model.py
	•	ScaledDotProductAttention: 쿼리와 키의 내적을 스케일링하고 마스킹 및 softmax를 적용해 가중치를 계산
	•	MultiHeadAttention: 입력을 여러 헤드로 나누어 병렬로 attention을 계산하고 출력 결합
	•	PositionWiseFeedForwardNetwork: 각 위치별로 독립적으로 feed-forward 연산을 수행
	•	DecoderLayer: multi-head attention과 feed-forward 네트워크로 구성된 디코더 블록
	•	TransformerDecoder: 임베딩과 positional embedding 후 여러 디코더 레이어를 순차적으로 적용
	•	GPT: transformer decoder로 구성된 GPT 모델 정의
	•	GPTLMHead: GPT 출력에 linear projection을 적용해 언어 모델링 logits 생성
	•	GPTClsHead: GPT 출력에서 특정 토큰 위치 벡터를 추출해 분류 logits 생성하고 language modeling과 함께 반환

In [1]:
import torch
import torch.nn as nn

class ScaledDotProductAttention(nn.Module):
    def __init__(self, d_k, attn_pdrop):
        super(ScaledDotProductAttention, self).__init__()
        self.d_k = d_k

        self.dropout = nn.Dropout(attn_pdrop)

    def forward(self, q, k, v, attn_mask):
        attn_score = torch.matmul(q, k.transpose(-1, -2)) / (self.d_k ** 0.5)
        attn_score.masked_fill_(attn_mask, -1e9)

        attn_weights = nn.Softmax(dim=-1)(attn_score)
        attn_weights = self.dropout(attn_weights)

        output = torch.matmul(attn_weights, v)

        return output, attn_weights

class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, n_heads, attn_pdrop):
        super(MultiHeadAttention, self).__init__()
        self.n_heads = n_heads
        self.d_k = self.d_v = d_model//n_heads

        self.WQ = nn.Linear(d_model, d_model)
        self.WK = nn.Linear(d_model, d_model)
        self.WV = nn.Linear(d_model, d_model)
        self.scaled_dot_product_attn = ScaledDotProductAttention(self.d_k, attn_pdrop)
        self.linear = nn.Linear(n_heads * self.d_v, d_model)

    def forward(self, Q, K, V, attn_mask):
        batch_size = Q.size(0)

        q_heads = self.WQ(Q).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2)
        k_heads = self.WK(K).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2)
        v_heads = self.WV(V).view(batch_size, -1, self.n_heads, self.d_v).transpose(1, 2)

        attn_mask = attn_mask.unsqueeze(1).repeat(1, self.n_heads, 1, 1)
        attn, attn_weights = self.scaled_dot_product_attn(q_heads, k_heads, v_heads, attn_mask)

        attn = attn.transpose(1, 2).contiguous().view(batch_size, -1, self.n_heads * self.d_v)
        outputs = self.linear(attn)

        return outputs, attn_weights

class PositionWiseFeedForwardNetwork(nn.Module):
    def __init__(self, d_model, d_ff):
        super(PositionWiseFeedForwardNetwork, self).__init__()

        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)
        self.gelu = nn.GELU()

        nn.init.normal_(self.linear1.weight, std=0.02)
        nn.init.normal_(self.linear2.weight, std=0.02)

    def forward(self, inputs):
        outputs = self.gelu(self.linear1(inputs))
        outputs = self.linear2(outputs)

        return outputs

class DecoderLayer(nn.Module):
    def __init__(self, d_model, n_heads, d_ff, attn_pdrop, resid_pdrop):
        super(DecoderLayer, self).__init__()

        self.mha = MultiHeadAttention(d_model, n_heads, attn_pdrop)
        self.dropout1 = nn.Dropout(resid_pdrop)
        self.layernorm1 = nn.LayerNorm(d_model, eps=1e-5)

        self.ffn = PositionWiseFeedForwardNetwork(d_model, d_ff)
        self.dropout2 = nn.Dropout(resid_pdrop)
        self.layernorm2 = nn.LayerNorm(d_model, eps=1e-5)

    def forward(self, inputs, attn_mask):
        attn_outputs, attn_weights = self.mha(inputs, inputs, inputs, attn_mask)
        attn_outputs = self.dropout1(attn_outputs)
        attn_outputs = self.layernorm1(inputs + attn_outputs)

        ffn_outputs = self.ffn(attn_outputs)
        ffn_outputs = self.dropout2(ffn_outputs)
        ffn_outputs = self.layernorm2(attn_outputs + ffn_outputs)

        return ffn_outputs, attn_weights

class TransformerDecoder(nn.Module):
    def __init__(self, vocab_size, seq_len, d_model, n_layers, n_heads, d_ff, embd_pdrop, attn_pdrop, resid_pdrop, pad_id):
        super(TransformerDecoder, self).__init__()
        self.pad_id = pad_id
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.dropout = nn.Dropout(embd_pdrop)
        self.pos_embedding = nn.Embedding(seq_len+1, d_model)
        self.layers = nn.ModuleList([DecoderLayer(d_model, n_heads, d_ff, attn_pdrop, resid_pdrop) for _ in range(n_layers)])

        nn.init.normal_(self.embedding.weight, std=0.02)

    def forward(self, inputs):
        positions = torch.arange(inputs.size(1), device=inputs.device, dtype=inputs.dtype).repeat(inputs.size(0), 1) + 1
        position_pad_mask = inputs.eq(self.pad_id)
        positions.masked_fill_(position_pad_mask, 0)

        outputs = self.dropout(self.embedding(inputs)) + self.pos_embedding(positions)

        attn_pad_mask = self.get_attention_padding_mask(inputs, inputs, self.pad_id)
        subsequent_mask = self.get_attention_subsequent_mask(inputs).to(device=attn_pad_mask.device)
        attn_mask = torch.gt((attn_pad_mask.to(dtype=subsequent_mask.dtype) + subsequent_mask), 0)

        attention_weights = []
        for layer in self.layers:
            outputs, attn_weights = layer(outputs, attn_mask)
            attention_weights.append(attn_weights)

        return outputs, attention_weights

    def get_attention_padding_mask(self, q, k, pad_id):
        attn_pad_mask = k.eq(pad_id).unsqueeze(1).repeat(1, q.size(1), 1)

        return attn_pad_mask

    def get_attention_subsequent_mask(self, q):
        bs, q_len = q.size()
        subsequent_mask = torch.ones(bs, q_len, q_len).triu(diagonal=1)

        return subsequent_mask

class GPT(nn.Module):
    def __init__(self,
                 vocab_size,
                 seq_len=512,
                 d_model=768,
                 n_layers=12,
                 n_heads=12,
                 d_ff=3072,
                 embd_pdrop=0.1,
                 attn_pdrop=0.1,
                 resid_pdrop=0.1,
                 pad_id=0):
        super(GPT, self).__init__()

        self.decoder = TransformerDecoder(vocab_size, seq_len, d_model, n_layers, n_heads, d_ff,
                                          embd_pdrop, attn_pdrop, resid_pdrop, pad_id)

    def forward(self, inputs):
        outputs, attention_weights = self.decoder(inputs)

        return outputs, attention_weights

class GPTLMHead(nn.Module):
    def __init__(self, gpt):
        super(GPTLMHead, self).__init__()
        vocab_size, d_model = gpt.decoder.embedding.weight.size()

        self.gpt = gpt
        self.linear = nn.Linear(d_model, vocab_size, bias=False)
        self.linear.weight = gpt.decoder.embedding.weight

    def forward(self, inputs):
        outputs, attention_weights = self.gpt(inputs)
        lm_logits = self.linear(outputs)

        return lm_logits

class GPTClsHead(nn.Module):
    def __init__(self, gpt, n_class, cls_token_id, cls_pdrop=0.1):
        super(GPTClsHead, self).__init__()
        vocab_size, d_model = gpt.decoder.embedding.weight.size()
        self.cls_token_id = cls_token_id

        self.gpt = gpt

        # LM
        self.linear1 = nn.Linear(d_model, vocab_size, bias=False)
        self.linear1.weight = gpt.decoder.embedding.weight

        # Classification
        self.linear2 = nn.Linear(d_model, n_class)
        self.dropout = nn.Dropout(cls_pdrop)

        nn.init.normal_(self.linear2.weight, std=0.02)
        nn.init.normal_(self.linear2.bias, 0)

    def forward(self, inputs):
        outputs, attention_weights = self.gpt(inputs)

        lm_logits = self.linear1(outputs)

        outputs = outputs[inputs.eq(self.cls_token_id)]
        cls_logits = self.linear2(self.dropout(outputs))

        return lm_logits, cls_logits

#### tokenization.py
	•	Tokenizer: 문자열을 토큰으로 분할하고 토큰-ID 변환 기능을 제공
	•	생성 시 vocab 파일을 읽어 토큰과 ID 간 매핑 딕셔너리 생성
	•	tokenize: 입력 문자열을 토큰 목록으로 분할
	•	convert_token_to_id: 토큰을 ID로 변환
	•	convert_id_to_token: ID를 토큰으로 변환
	•	convert_tokens_to_ids: 토큰 리스트를 ID 리스트로 변환
	•	convert_ids_to_tokens: ID 리스트를 토큰 리스트로 변환
	•	여러 특수 토큰의 ID 속성 제공
	•	PretrainedTokenizer: 사전학습된 SentencePiece 모델을 기반으로 Tokenizer 초기화
	•	detokenize: 토큰 리스트를 문자열로 복원

In [6]:
! pip install prenlp

Collecting prenlp
  Downloading prenlp-0.0.13-py3-none-any.whl.metadata (6.7 kB)
Collecting nltk==3.2.5 (from prenlp)
  Downloading nltk-3.2.5.tar.gz (1.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m15.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting konlpy (from prenlp)
  Downloading konlpy-0.6.0-py2.py3-none-any.whl.metadata (1.9 kB)
Collecting ijson (from prenlp)
  Downloading ijson-3.4.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (21 kB)
Collecting py7zr==0.5b5 (from prenlp)
  Downloading py7zr-0.5b5-py3-none-any.whl.metadata (15 kB)
Collecting texttable (from py7zr==0.5b5->prenlp)
  Downloading texttable-1.7.0-py2.py3-none-any.whl.metadata (9.8 kB)
Collecting JPype1>=0.7.0 (from konlpy->prenlp)
  Downloading jpype1-1.5.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.9 kB)
Downloading prenlp-0.0.13-py3-none-any.whl (30 kB)
Downloading py

In [7]:
from typing import List
from collections import OrderedDict

from prenlp.tokenizer import SentencePiece

class Tokenizer:
    def __init__(self, tokenizer, vocab_file: str,
                 pad_token: str = '[PAD]',
                 unk_token: str = '[UNK]',
                 bos_token: str = '[BOS]',
                 eos_token: str = '[EOS]',
                 sep_token: str = '[SEP]',
                 cls_token: str = '[CLS]',
                 mask_token: str = '[MASK]'):
        self.tokenizer = tokenizer
        self.pad_token = pad_token
        self.unk_token = unk_token
        self.bos_token = bos_token
        self.eos_token = eos_token
        self.sep_token = sep_token
        self.cls_token = cls_token
        self.mask_token = mask_token
        self.vocab = OrderedDict()
        self.ids_to_tokens = OrderedDict()

        with open(vocab_file, 'r', encoding='utf-8') as reader:
            for i, line in enumerate(reader.readlines()):
                token = line.split()[0]
                self.vocab[token] = i
        for token, id in self.vocab.items():
            self.ids_to_tokens[id] = token

    def tokenize(self, text: str) -> List[str]:
        return self.tokenizer(text)

    def convert_token_to_id(self, token: str) -> int:
        return self.vocab.get(token, self.vocab.get(self.unk_token))

    def convert_id_to_token(self, id: int) -> str:
        return self.ids_to_tokens.get(id, self.unk_token)

    def convert_tokens_to_ids(self, tokens: List[str]) -> List[int]:
        return [self.convert_token_to_id(token) for token in tokens]

    def convert_ids_to_tokens(self, ids: List[int]) -> List[str]:
        return [self.convert_id_to_token(id) for id in ids]

    @property
    def vocab_size(self) -> int:
        return len(self.vocab)

    @property
    def pad_token_id(self) -> int:
        return self.convert_token_to_id(self.pad_token)

    @property
    def unk_token_id(self) -> int:
        return self.convert_token_to_id(self.unk_token)

    @property
    def bos_token_id(self) -> int:
        return self.convert_token_to_id(self.bos_token)

    @property
    def eos_token_id(self) -> int:
        return self.convert_token_to_id(self.eos_token)

    @property
    def sep_token_id(self) -> int:
        return self.convert_token_to_id(self.sep_token)

    @property
    def cls_token_id(self) -> int:
        return self.convert_token_to_id(self.cls_token)

    @property
    def mask_token_id(self) -> int:
        return self.convert_token_to_id(self.mask_token)

class PretrainedTokenizer(Tokenizer):
    def __init__(self, pretrained_model: str, vocab_file: str,
                 pad_token: str = '[PAD]',
                 unk_token: str = '[UNK]',
                 bos_token: str = '[BOS]',
                 eos_token: str = '[EOS]',
                 sep_token: str = '[SEP]',
                 cls_token: str = '[CLS]',
                 mask_token: str = '[MASK]'):
        tokenizer = SentencePiece.load(pretrained_model)

        super(PretrainedTokenizer, self).__init__(tokenizer, vocab_file, pad_token, unk_token, bos_token, eos_token)

    def detokenize(self, tokens: List[str]) -> str:
        return self.tokenizer.detokenize(tokens)

#### vocab.py
	•	--corpus: 한 줄에 하나의 문장이 있는 입력 텍스트 파일 경로 지정
	•	--prefix: 생성될 vocab 및 모델 파일 이름의 접두어 지정 ({prefix}.model, {prefix}.vocab)
	•	--vocab_size: 생성할 서브워드 vocab의 크기 설정
	•	--character_coverage: 문자 커버리지 비율 설정
	•	--model_type: 사용할 SentencePiece 모델 타입 선택
	•	--max_sentence_length: 학습에 사용할 최대 문장 길이 제한
	•	--pad_token, --unk_token, --bos_token, --eos_token: 특수 토큰 정의
	•	build(args): 위 인자들을 기반으로 SentencePiece.train()을 호출하여 모델 학습 수행
	•	실행 시 python build_tokenizer.py --corpus my.txt --prefix mytokenizer와 같이 사용 가능

In [10]:
import argparse
from collections import Counter, OrderedDict

from prenlp.tokenizer import SentencePiece

def build(args):
    tokenizer = SentencePiece.train(input = args.corpus, model_prefix = args.prefix,
                                    vocab_size = args.vocab_size,
                                    model_type = args.model_type,
                                    character_coverage = args.character_coverage,
                                    max_sentence_length = args.max_sentence_length,
                                    pad_token = args.pad_token,
                                    unk_token = args.unk_token,
                                    bos_token = args.bos_token,
                                    eos_token = args.eos_token)

if __name__ == '__main__':
    parser = argparse.ArgumentParser()

    parser.add_argument('--corpus',      required=True,           type=str, help='one-sentence-per-line corpus file')
    parser.add_argument('--prefix',      required=True,           type=str, help='output vocab(or sentencepiece model) name prefix')

    parser.add_argument('--vocab_size',          default=16000,   type=int, help='the maximum size of the vocabulary')
    parser.add_argument('--character_coverage',  default=1.0,     type=float,
                        help='amount of characters covered by the model, good defaults are: 0.9995 for languages with rich character set\
                             like Japanse or Chinese and 1.0 for other languages with small character set')
    parser.add_argument('--model_type',          default='bpe',   type=str, help='sentencepiece model type. Choose from unigram, bpe, char, or word')
    parser.add_argument('--max_sentence_length', default=100000,  type=int, help='The maximum input sequence length')
    parser.add_argument('--pad_token',           default='[PAD]', type=str, help='token that indicates padding')
    parser.add_argument('--unk_token',           default='[UNK]', type=str, help='token that indicates unknown word')
    parser.add_argument('--bos_token',           default='[BOS]', type=str, help='token that indicates beginning of sentence')
    parser.add_argument('--eos_token',           default='[EOS]', type=str, help='token that indicates end of sentence')

    args = parser.parse_args()

    build(args)

usage: colab_kernel_launcher.py [-h] --corpus CORPUS --prefix PREFIX
                                [--vocab_size VOCAB_SIZE]
                                [--character_coverage CHARACTER_COVERAGE]
                                [--model_type MODEL_TYPE]
                                [--max_sentence_length MAX_SENTENCE_LENGTH]
                                [--pad_token PAD_TOKEN]
                                [--unk_token UNK_TOKEN]
                                [--bos_token BOS_TOKEN]
                                [--eos_token EOS_TOKEN]
colab_kernel_launcher.py: error: the following arguments are required: --corpus, --prefix


SystemExit: 2

#### trainer.py

In [3]:
! pip install pytorch-optimizer

Collecting pytorch-optimizer
  Downloading pytorch_optimizer-3.5.1-py3-none-any.whl.metadata (71 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/71.9 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m71.9/71.9 kB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=1.10->pytorch-optimizer)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch>=1.10->pytorch-optimizer)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch>=1.10->pytorch-optimizer)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch>=1.10->pytorch-optimizer)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3

In [3]:
!pip install torch_optimizer

Collecting torch_optimizer
  Downloading torch_optimizer-0.3.0-py3-none-any.whl.metadata (55 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/55.9 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m55.9/55.9 kB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
Collecting pytorch-ranger>=0.1.1 (from torch_optimizer)
  Downloading pytorch_ranger-0.1.1-py3-none-any.whl.metadata (509 bytes)
Downloading torch_optimizer-0.3.0-py3-none-any.whl (61 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.9/61.9 kB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pytorch_ranger-0.1.1-py3-none-any.whl (14 kB)
Installing collected packages: pytorch-ranger, torch_optimizer
Successfully installed pytorch-ranger-0.1.1 torch_optimizer-0.3.0


	•	Trainer: GPT 모델의 학습, 평가, 저장을 담당하는 클래스 정의
	•	__init__: 학습 인자, 데이터 로더, 토크나이저를 받아 모델과 옵티마이저 초기화
	•	사전학습 모드에서는 GPTLMHead, 미세조정 모드에서는 GPTClsHead 모델 사용
	•	분산 학습(distributed training)을 지원하며, 해당 시 DistributedDataParallel로 모델 감쌈
	•	train: 한 epoch 동안 학습 수행 (모드에 따라 pretrain 또는 finetune 호출)
	•	pretrain: 입력 문장에 대해 다음 토큰 예측을 위한 GPT 언어 모델 학습
	•	finetune: 입력 문장과 라벨을 이용해 언어 모델링 + 분류 모델 학습
	•	evaluate: 학습된 모델을 검증 데이터셋에 대해 평가, loss 및 accuracy 기록
	•	save: 현재 모델 상태를 지정된 경로에 저장
	•	@timeit: 함수 실행 시간을 측정하고 출력하는 데코레이터 정의

In [4]:
import time
import json
from pathlib import Path

import torch
import torch.nn as nn
from torch.nn.parallel import DistributedDataParallel
from torch.utils.tensorboard import SummaryWriter
from torch_optimizer import RAdam

def timeit(method):
    def timed(*args, **kw):
        _args = args[0].args

        ts = time.time()
        result = method(*args, **kw)
        te = time.time()

        if _args.distributed:
            if _args.local_rank == 0:
                print('Function Time: {}\t>\t{:.0f} min {:.0f} sec'.format(method.__name__, (te-ts)//60, (te-ts)%60))
        else:
            print('Function Time: {}\t>\t{:.0f} min {:.0f} sec'.format(method.__name__, (te-ts)//60, (te-ts)%60))

        return result
    return timed

class Trainer:
    def __init__(self, args, train_loader, test_loader, tokenizer):
        self.args = args
        self.train_loader = train_loader
        self.test_loader = test_loader
        self.tokenizer = tokenizer
        self.vocab_size = tokenizer.vocab_size
        self.pad_id = tokenizer.pad_token_id
        self.eos_id = tokenizer.eos_token_id
        self.device = torch.device('cuda' if torch.cuda.is_available() and not args.no_cuda else 'cpu', args.local_rank)
        self.writer = SummaryWriter() if args.local_rank in [-1, 0] else None
        self.n_gpus = torch.distributed.get_world_size() if args.distributed else torch.cuda.device_count()
        assert args.pretrain != args.finetune

        if args.pretrained_model:
            self.gpt = torch.load(args.pretrained_model)
        else:
            self.gpt = GPT(vocab_size=self.vocab_size,
                           seq_len=args.max_seq_len,
                           d_model=args.hidden,
                           n_layers=args.n_layers,
                           n_heads=args.n_attn_heads,
                           d_ff=args.ffn_hidden,
                           embd_pdrop=args.embd_dropout,
                           attn_pdrop=args.attn_dropout,
                           resid_pdrop=args.resid_dropout,
                           pad_id=self.pad_id)

        if args.pretrain:
            self.model = GPTLMHead(self.gpt)
            self.model.to(self.device)
        if args.finetune:
            with open(args.cached_label_dict, 'r') as file:
                label_dict = json.load(file)
            self.model = GPTClsHead(self.gpt, n_class=len(label_dict), cls_token_id=self.eos_id)
            self.model.to(self.device)

        if args.distributed:
            self.model = DistributedDataParallel(self.model, device_ids=[args.local_rank], output_device=args.local_rank)

        self.optimizer = RAdam(self.model.parameters(), args.lr)
        self.criterion = nn.CrossEntropyLoss(ignore_index = self.pad_id).to(self.device)
        self.cls_criterion = nn.CrossEntropyLoss().to(self.device)

    @timeit
    def train(self, epoch):
        if self.args.pretrain:
            self.pretrain(epoch)
        if self.args.finetune:
            self.finetune(epoch)

    def pretrain(self, epoch):
        losses = 0
        n_batches, n_samples = len(self.train_loader), len(self.train_loader.dataset)

        self.model.train()
        for i, batch in enumerate(self.train_loader):
            inputs = batch[0].to(self.device)
            targets = inputs[:, 1:].contiguous()
            lm_logits = self.model(inputs)
            lm_logits = lm_logits[:, :-1].contiguous()
            loss = self.criterion(lm_logits.view(-1, self.vocab_size), targets.view(-1))
            losses += loss.item()
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()

            if self.args.local_rank in [-1, 0]:
                self.writer.add_scalar('Loss/pre-train', loss.item(), ((epoch-1)*n_batches)+i)
                if i % (n_batches//5) == 0 and i != 0:
                    print('Iteration {} ({}/{})\tLoss: {:.4f}'.format(i, i, n_batches, losses/i))

        print('Train Epoch {} [rank: {}]\t>\tLoss: {:.4f}'.format(epoch, self.args.local_rank, losses/n_batches))

    def finetune(self, epoch):
        losses, accs = 0, 0
        n_batches, n_samples = len(self.train_loader), len(self.train_loader.dataset)

        self.model.train()
        for i, batch in enumerate(self.train_loader):
            inputs, labels = map(lambda x: x.to(self.device), batch)
            lm_logits, cls_logits = self.model(inputs)
            lm_logits = lm_logits[:, :-1].contiguous()
            lm_loss = self.criterion(lm_logits.view(-1, self.vocab_size), inputs[:, 1:].contiguous().view(-1))
            cls_loss = self.cls_criterion(cls_logits, labels)
            loss = cls_loss + (self.args.auxiliary_ratio * lm_loss)

            losses += loss.item()
            acc = (cls_logits.argmax(dim=-1) == labels).to(dtype=cls_logits.dtype).mean()
            accs += acc

            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()

            if self.args.local_rank in [-1, 0]:
                self.writer.add_scalar('Loss/fine-tune', loss.item(), ((epoch-1)*n_batches)+i)
                self.writer.add_scalar('Accuracy/fine-tune', acc, ((epoch-1)*n_batches)+i)
                if i % (n_batches//5) == 0 and i != 0:
                    print('Iteration {} ({}/{})\tLoss: {:.4f} Acc: {:.1f}%'.format(i, i, n_batches, losses/i, accs/i*100.))

        print('Train Epoch {} [rank: {}]\t>\tLoss: {:.4f} / Acc: {:.1f}%'.format(epoch, self.args.local_rank, losses/n_batches, accs/n_batches*100.))

    def evaluate(self, epoch):
        losses, accs = 0, 0
        n_batches, n_samples = len(self.test_loader), len(self.test_loader.dataset)

        self.model.eval()
        with torch.no_grad():
            for i, batch in enumerate(self.test_loader):
                if self.args.pretrain:
                    inputs = batch.to(self.device)
                    targets = inputs[:, 1:].contiguous()

                    lm_logits = self.model(inputs)
                    lm_logits = lm_logits[:, :-1].contiguous()

                    loss = self.criterion(lm_logits.view(-1, self.vocab_size), targets.view(-1))
                    losses += loss.item()

                    if self.args.local_rank in [-1, 0]:
                        self.writer.add_scalar('Loss/pre-train(eval)', loss.item(), ((epoch-1)*n_batches)+i)

                elif self.args.finetune:
                    inputs, labels = map(lambda x: x.to(self.device), batch)

                    lm_logits, cls_logits = self.model(inputs)
                    lm_logits = lm_logits[:, :-1].contiguous()

                    lm_loss = self.criterion(lm_logits.view(-1, self.vocab_size), inputs[:, 1:].contiguous().view(-1))
                    cls_loss = self.cls_criterion(cls_logits, labels)
                    loss = cls_loss + (self.args.auxiliary_ratio * lm_loss)

                    losses += loss.item()
                    acc = (cls_logits.argmax(dim=-1) == labels).to(dtype=cls_logits.dtype).mean()
                    accs += acc

                    if self.args.local_rank in [-1, 0]:
                        self.writer.add_scalar('Loss/fine-tune(eval)', loss.item(), ((epoch-1)*n_batches)+i)
                        self.writer.add_scalar('Accuracy/fine-tune(eval)', acc, ((epoch-1)*n_batches)+i)

        print('Eval Epoch {} [rank: {}]\t>\tLoss: {:.4f} / Acc: {:.1f}%'.format(epoch, self.args.local_rank, losses/n_batches, accs/n_batches*100.))

    def save(self, epoch, model_prefix='model', root='.model'):
        path = Path(root) / (model_prefix + '.ep%d' % epoch)
        if not path.parent.exists():
            path.parent.mkdir()

        if self.args.distributed:
            if self.args.local_rank == 0:
                torch.save(self.gpt, path)
        else:
            torch.save(self.gpt, path)

#### main.py
	•	argparse를 통해 학습 설정, 파일 경로, 모델 파라미터 등을 커맨드라인 인자로 받아 처리
	•	--pretrain, --finetune 플래그로 실행 모드를 선택
	•	분산 학습(distributed training) 설정 시 torch.distributed.init_process_group()으로 초기화
	•	SentencePiece 기반의 PretrainedTokenizer 로드 및 학습용 데이터셋 생성
	•	create_examples() 함수를 통해 입력 텍스트를 PyTorch Dataset으로 변환
	•	DataLoader와 RandomSampler 또는 DistributedSampler로 배치 구성
	•	Trainer 객체를 생성하여 학습/저장/평가 루프 실행
	•	trainer.train(), trainer.save(), trainer.evaluate()를 epoch마다 반복 수행
	•	평가 모드 활성화(--do_eval) 시 테스트셋도 로딩하여 성능 측정

In [None]:
import argparse
import torch
import torch.distributed as dist
from torch.utils.data import DataLoader, RandomSampler, DistributedSampler

from data_utils import create_examples
from tokenization import PretrainedTokenizer
from trainer import Trainer

def main(args):
    print(args)
    if args.distributed:
        torch.cuda.set_device(args.local_rank)
        dist.init_process_group(backend='nccl')

    tokenizer = PretrainedTokenizer(pretrained_model=args.pretrained_sp_model, vocab_file=args.vocab_file)
    train_dataset = create_examples(args, tokenizer, mode='train')
    train_sampler = RandomSampler(train_dataset) if args.local_rank == -1 else DistributedSampler(train_dataset)
    train_loader = DataLoader(train_dataset, sampler=train_sampler, batch_size=args.batch_size, num_workers=args.n_workers)
    if args.do_eval:
        test_dataset = create_examples(args, tokenizer, mode='test')
        test_sampler = RandomSampler(test_dataset) if args.local_rank == -1 else DistributedSampler(test_dataset)
        test_loader = DataLoader(test_dataset, sampler=test_sampler, batch_size=args.batch_size, num_workers=args.n_workers)

    trainer = Trainer(args=args,
                      train_loader=train_loader,
                      test_loader=test_loader if args.do_eval else None,
                      tokenizer=tokenizer)

    for epoch in range(1, args.epochs+1):
        trainer.train(epoch)
        trainer.save(epoch, args.output_model_prefix)
        if args.do_eval:
            trainer.evaluate(epoch)

if __name__ == '__main__':
    parser = argparse.ArgumentParser()

    parser.add_argument('--train_corpus',           required=True,     type=str, help='corpus for either pre-train or fine-tune')
    parser.add_argument('--vocab_file',             required=True,     type=str, help='pretrained vocabulary')
    parser.add_argument('--pretrained_sp_model',    required=True,     type=str, help='pretrained sentencepiece model')
    parser.add_argument('--pretrain',               action='store_true')
    parser.add_argument('--finetune',               action='store_true')
    parser.add_argument('--do_eval',                action='store_true')

    parser.add_argument('--test_corpus',            default=None,     type=str, help='corpus for either pre-train or fine-tune evaluation')
    parser.add_argument('--pretrained_model',       default=None,     type=str, help='pretrained GPT model path')
    parser.add_argument('--output_model_prefix',    default='model',  type=str, help='output model name prefix')
    # Input parameters
    parser.add_argument('--batch_size',     default=64,    type=int,   help='batch size')
    parser.add_argument('--max_seq_len',    default=512,   type=int,   help='the maximum size of the input sequence')
    parser.add_argument('--n_workers',      default=4,     type=int,   help='the number of workers')
    # Train parameters
    parser.add_argument('--epochs',         default=100,       type=int,   help='the number of epochs')
    parser.add_argument('--lr',             default=1.5e-4,    type=float, help='initial learning rate')
    parser.add_argument('--auxiliary_ratio',default=.25,       type=float, help='weight of auxiliary objective')
    parser.add_argument('--local_rank',     default=-1,        type=int,   help='node rank for distributed training')
    parser.add_argument('--no_cuda',        action='store_true')
    parser.add_argument('--distributed',    action='store_true')
    # Model parameters
    parser.add_argument('--hidden',         default=768,  type=int,   help='the number of expected features in the transformer decoder')
    parser.add_argument('--n_layers',       default=12,   type=int,   help='the number of decoder layers')
    parser.add_argument('--n_attn_heads',   default=12,   type=int,   help='the number of multi-head attention heads')
    parser.add_argument('--embd_dropout',   default=0.1,  type=float, help='embedding dropout value')
    parser.add_argument('--resid_dropout',  default=0.1,  type=float, help='residual dropout value')
    parser.add_argument('--attn_dropout',   default=0.1,  type=float, help='attention dropout value')
    parser.add_argument('--ffn_hidden',     default=3072, type=int,   help='dimension of the feedforward network')
    # Others
    parser.add_argument('--cached_label_dict', default='cached_label_dict.json', type=str)

    args = parser.parse_args()

    main(args)