# 토크나이저

In [13]:
import tiktoken

tokenizer = tiktoken.get_encoding("gpt2")

text = "  which Harry watched fly around the room wishing he still had his fulls"

tokens = tokenizer.encode(text)

print("글자수 :", len(text))  # 글자수: 26
print("토큰수 :", len(tokens))  # 토큰수: 6
print(tokens)  # [15496, 2159, 257, 281, 3453, 13]
print(tokenizer.decode(tokens))  # Harry Potter was a wizard.
for token in tokens:
    print(token, ":", tokenizer.decode([token]))

글자수 : 72
토큰수 : 15
[220, 543, 5850, 7342, 6129, 1088, 262, 2119, 24433, 339, 991, 550, 465, 1336, 82]
  which Harry watched fly around the room wishing he still had his fulls
220 :  
543 :  which
5850 :  Harry
7342 :  watched
6129 :  fly
1088 :  around
262 :  the
2119 :  room
24433 :  wishing
339 :  he
991 :  still
550 :  had
465 :  his
1336 :  full
82 : s


In [14]:
from transformers import AutoTokenizer

# 1. GPT-2 토크나이저 불러오기
tokenizer = AutoTokenizer.from_pretrained("gpt2")

# 2. GPT-2는 기본 pad_token이 없어서 직접 설정
# if tokenizer.pad_token is None:
#     special_tokens_dict = {'pad_token': '<|pad|>'}
#     num_added = tokenizer.add_special_tokens(special_tokens_dict)

# pad를 따로 추가하지 말고, eos를 그대로 pad로 사용
tokenizer.pad_token = tokenizer.eos_token
pad_id = tokenizer.pad_token_id  # == eos_id

print("vocab_size:", tokenizer.vocab_size)
print("pad_token:", tokenizer.pad_token, "id:", tokenizer.pad_token_id)
print("eos_token:", tokenizer.eos_token, "id:", tokenizer.eos_token_id)

  from .autonotebook import tqdm as notebook_tqdm


vocab_size: 50257
pad_token: <|endoftext|> id: 50256
eos_token: <|endoftext|> id: 50256


In [15]:
def load_text_samples(path: str):
    with open(path, "r", encoding="utf-8") as f:
        data = f.read().strip()
    # 샘플 사이에 빈 줄 하나 넣어놨으니까 "\n\n" 기준으로 자름
    samples = [s.strip() for s in data.split("\n\n") if s.strip()]
    return samples

train_samples = load_text_samples("lotto_train.txt")
print("샘플 개수:", len(train_samples))

example = train_samples[0]
print("원본 샘플:")
print(example)

샘플 개수: 150000
원본 샘플:
money=4000
winning=1,11,15,18,29,38
bonus=33
###
티켓수=4
구매번호:
[2,7,9,15,16,18]
[3,6,28,35,38,44]
[2,6,14,15,33,39]
[2,13,27,35,36,42]
3개일치 (5000원) = 0
4개일치 (50000원) = 0
5개일치 (1500000원) = 0
5개보너스일치 (30000000원) = 0
6개일치 (2000000000원) = 0
수익률=0.0%


In [16]:
max_len = 512  # 임시

enc = tokenizer(
    example,
    max_length=max_len,
    padding="max_length",
    truncation=True,
    return_tensors="pt",  # PyTorch 텐서로
)

input_ids = enc["input_ids"]        # shape: (1, max_len)
attention_mask = enc["attention_mask"]  # shape: (1, max_len)

print("input_ids shape:", input_ids.shape)
print("attention_mask shape:", attention_mask.shape)

# 디코딩해서 잘 복원되는지 확인
decoded = tokenizer.decode(input_ids[0], skip_special_tokens=True)
print("디코딩된 텍스트:")
print(decoded)

input_ids shape: torch.Size([1, 512])
attention_mask shape: torch.Size([1, 512])
디코딩된 텍스트:
money=4000
winning=1,11,15,18,29,38
bonus=33
###
티켓수=4
구매번호:
[2,7,9,15,16,18]
[3,6,28,35,38,44]
[2,6,14,15,33,39]
[2,13,27,35,36,42]
3개일치 (5000원) = 0
4개일치 (50000원) = 0
5개일치 (1500000원) = 0
5개보너스일치 (30000000원) = 0
6개일치 (2000000000원) = 0
수익률=0.0%


In [17]:
from torch.utils.data import Dataset
import torch

class LottoDataset(Dataset):
    def __init__(self, texts, tokenizer, max_len: int, focus_len: int = 118):
        """
        focus_len: 시퀀스 뒤에서부터 몇 개 토큰을
                   '결과/수익률 구간'이라고 보고 가중치를 줄지
        """
        self.texts = texts
        self.tokenizer = tokenizer
        self.max_len = max_len
        self.eos = tokenizer.eos_token
        self.focus_len = focus_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        row_txt = self.texts[idx]

        txt = row_txt + self.eos

        # max_len+1 길이로 토큰화 → x:[:-1], y:[1:] 사용
        enc = self.tokenizer(
            txt,
            max_length=self.max_len + 1,
            padding="max_length",
            truncation=True,
            return_tensors="pt",
        )
        ids = enc["input_ids"][0]           # (max_len+1,)
        attn_mask = enc["attention_mask"][0]  # (max_len+1,)

        # 언어모델용 input/target
        x = ids[:-1].clone()       # (max_len,)
        y = ids[1:].clone()        # (max_len,)
        x_mask = attn_mask[:-1].clone()

        # pad 위치는 loss에서 무시되도록 -100
        y[x_mask == 0] = -100

        # ----- roi_mask 생성 (결과/수익률 구간) -----
        # 실제 토큰 길이 (pad 제외)
        valid_len = int(x_mask.sum().item())   # 예: 380 토큰
        roi_mask = torch.zeros_like(x_mask)    # (max_len,)

        # 뒤에서 focus_len개를 결과 구간으로 설정
        start = max(0, valid_len - self.focus_len)
        roi_mask[start:valid_len] = 1

        return x, y, x_mask, roi_mask

In [18]:
max_len = 512
train_ds = LottoDataset(train_samples, tokenizer, max_len=max_len, focus_len=118)

x, y, x_mask, roi_mask = train_ds[0]

print("x shape:", x.shape)          # torch.Size([256])
print("y shape:", y.shape)          # torch.Size([256])
print("x_mask shape:", x_mask.shape)
print("roi_mask shape:", roi_mask.shape)

print("x[:20]:", x[:20])
print("y[:20]:", y[:20])

x shape: torch.Size([512])
y shape: torch.Size([512])
x_mask shape: torch.Size([512])
roi_mask shape: torch.Size([512])
x[:20]: tensor([26316,    28, 27559,   198, 14463,    28,    16,    11,  1157,    11,
         1314,    11,  1507,    11,  1959,    11,  2548,   198,  4189,   385])
y[:20]: tensor([   28, 27559,   198, 14463,    28,    16,    11,  1157,    11,  1314,
           11,  1507,    11,  1959,    11,  2548,   198,  4189,   385,    28])


DataSet 출력값 예시

X TEXT:
money=8000
winning=1,2,3,4,5,6
bonus=7
###

Y TEXT:
oney=8000
winning=1,2,3,4,5,6
bonus=7
###
티

In [19]:
from transformers import AutoTokenizer

print("pad_token:", tokenizer.pad_token)
print("pad_token_id:", tokenizer.pad_token_id)
print("vocab_size:", tokenizer.vocab_size)


# lotto_train.txt 읽기
def load_text_samples(path):
    with open(path, "r", encoding="utf-8") as f:
        data = f.read().strip()
    samples = [s.strip() for s in data.split("\n\n") if s.strip()]
    return samples

train_samples = load_text_samples("lotto_train.txt")

# 샘플 하나
example = train_samples[0]
# print("=== ORIGINAL TEXT ===")
# print(example)

# encode
enc = tokenizer(
    example,
    max_length=512,
    padding="max_length",
    truncation=True,
    return_tensors="pt"
)

input_ids = enc["input_ids"][0]
attention_mask = enc["attention_mask"][0]

print("\ninput_ids shape:", input_ids.shape)
print("attention_mask shape:", attention_mask.shape)

# decode
decoded = tokenizer.decode(input_ids, skip_special_tokens=True)

print("\n=== DECODED TEXT ===")
print(decoded)

pad_token: <|endoftext|>
pad_token_id: 50256
vocab_size: 50257

input_ids shape: torch.Size([512])
attention_mask shape: torch.Size([512])

=== DECODED TEXT ===
money=4000
winning=1,11,15,18,29,38
bonus=33
###
티켓수=4
구매번호:
[2,7,9,15,16,18]
[3,6,28,35,38,44]
[2,6,14,15,33,39]
[2,13,27,35,36,42]
3개일치 (5000원) = 0
4개일치 (50000원) = 0
5개일치 (1500000원) = 0
5개보너스일치 (30000000원) = 0
6개일치 (2000000000원) = 0
수익률=0.0%


# 데이터셋 확인

In [20]:
def load_text_samples(path: str):
    with open(path, "r", encoding="utf-8") as f:
        data = f.read().strip()
    # 샘플 사이에 빈 줄 하나씩 있다고 가정 → "\n\n" 기준 split
    samples = [s.strip() for s in data.split("\n\n") if s.strip()]
    return samples

train_texts = load_text_samples("lotto_train.txt")
print("train 샘플 수:", len(train_texts))
print("첫 샘플 원본:")
print(train_texts[0])

train 샘플 수: 150000
첫 샘플 원본:
money=4000
winning=1,11,15,18,29,38
bonus=33
###
티켓수=4
구매번호:
[2,7,9,15,16,18]
[3,6,28,35,38,44]
[2,6,14,15,33,39]
[2,13,27,35,36,42]
3개일치 (5000원) = 0
4개일치 (50000원) = 0
5개일치 (1500000원) = 0
5개보너스일치 (30000000원) = 0
6개일치 (2000000000원) = 0
수익률=0.0%


In [21]:
from torch.utils.data import DataLoader

max_len = 512  # 일단 256 정도로 가정
batch_size = 4

train_ds = LottoDataset(train_texts, tokenizer, max_len=max_len, focus_len=118)
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)

# 배치 하나만 꺼내서 확인해보자
x_batch, y_batch, mask_batch, roi_mask = next(iter(train_loader))

print("x_batch shape:", x_batch.shape)        # (B, T) = (4, 256)
print("y_batch shape:", y_batch.shape)        # (4, 256)
print("mask_batch shape:", mask_batch.shape)  # (4, 256)
print("roi_mask shape:", roi_mask.shape)      # (4, 256)

x_batch shape: torch.Size([4, 512])
y_batch shape: torch.Size([4, 512])
mask_batch shape: torch.Size([4, 512])
roi_mask shape: torch.Size([4, 512])


### focus_len 마지막 증가할 가중치 토큰 갯수 확인

In [22]:
import numpy as np
import torch

def estimate_focus_len(texts, tokenizer, max_len: int, 
                       target_tokens=("3개일치", "수익률"),
                       sample_size: int = 1000,
                       quantile: float = 0.95):
    """
    texts: lotto_train.txt에서 읽어온 전체 텍스트 리스트
    target_tokens: 결과/수익률 블록을 대표하는 토큰 문자열들
    sample_size: 몇 개 샘플만 뽑아서 통계 낼지
    quantile: 상위 몇 %까지 커버할지 (0.95면 95퍼센타일)
    """
    n = min(len(texts), sample_size)
    lengths = []

    for i in range(n):
        row_txt = texts[i]
        txt = row_txt + tokenizer.eos_token

        enc = tokenizer(
            txt,
            max_length=max_len + 1,
            padding="max_length",
            truncation=True,
            return_tensors="pt",
        )
        ids = enc["input_ids"][0]           # (max_len+1,)
        attn = enc["attention_mask"][0]     # (max_len+1,)

        valid_len = int(attn.sum().item())
        ids_list = ids[:valid_len].tolist()

        # 이 샘플에서 결과 블록 시작 위치를 찾는다
        start_idx = valid_len  # 기본값: 못 찾으면 tail=0

        for tok_str in target_tokens:
            pat = tokenizer(tok_str, add_special_tokens=False)["input_ids"]
            L = len(pat)

            for j in range(valid_len - L + 1):
                if ids_list[j:j+L] == pat:
                    start_idx = min(start_idx, j)
                    break  # 이 토큰은 찾았으니 다음 토큰으로

        tail_len = valid_len - start_idx
        if tail_len < 0:
            tail_len = 0

        lengths.append(tail_len)

    lengths = np.array(lengths)
    print("샘플 개수:", len(lengths))
    print("min tail_len:", lengths.min())
    print("avg tail_len:", lengths.mean())
    print("max tail_len:", lengths.max())
    focus_len = int(np.quantile(lengths, quantile))
    print(f"{int(quantile*100)} 퍼센타일 tail_len:", focus_len)

    return focus_len

In [23]:
max_len = 512

train_texts = load_text_samples("lotto_train.txt")
val_texts = load_text_samples("lotto_val.txt")

focus_len = estimate_focus_len(
    train_texts,
    tokenizer,
    max_len=max_len,
    target_tokens=("3개일치", "수익률"),
    sample_size=1000,
    quantile=0.95,
)
print("최종 선택된 focus_len =", focus_len)

샘플 개수: 1000
min tail_len: 117
avg tail_len: 117.176
max tail_len: 120
95 퍼센타일 tail_len: 118
최종 선택된 focus_len = 118


# 모델 정의

In [24]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer

class GPTConfig:
    def __init__(
        self,
        vocab_size: int,
        n_layer: int = 4,
        n_head: int = 4,
        d_model: int = 256,
        d_ff: int = 1024,
        max_len: int = 512,
        dropout: float = 0.1,
        pad_id: int = 0,
    ):
        self.vocab_size = vocab_size
        self.n_layer = n_layer
        self.n_head = n_head
        self.d_model = d_model
        self.d_ff = d_ff
        self.max_len = max_len
        self.dropout = dropout
        self.pad_id = pad_id

In [25]:
class CausalSelfAttention(nn.Module):
    def __init__(self, config: GPTConfig):
        super().__init__()
        self.n_head = config.n_head
        self.d_model = config.d_model
        self.dropout = nn.Dropout(config.dropout)

        self.qkv = nn.Linear(config.d_model, 3 * config.d_model)
        self.proj = nn.Linear(config.d_model, config.d_model)

        mask = torch.tril(torch.ones(config.max_len, config.max_len))
        self.register_buffer(
            "causal_mask",
            mask.view(1, 1, config.max_len, config.max_len)
        )
    def forward(self, x, attn_mask=None):
        # x: (B, T, C)
        B, T, C = x.size()
        H = self.n_head
        head_dim = C // H

        qkv = self.qkv(x)              # (B, T, 3C)
        q, k, v = qkv.split(C, dim=2)  # (B, T, C) each

        q = q.view(B, T, H, head_dim).transpose(1, 2)  # (B, H, T, head_dim)
        k = k.view(B, T, H, head_dim).transpose(1, 2)
        v = v.view(B, T, H, head_dim).transpose(1, 2)

        att = (q @ k.transpose(-2, -1)) / (head_dim ** 0.5)  # (B, H, T, T)

        causal_mask = self.causal_mask[:, :, :T, :T]
        att = att.masked_fill(causal_mask == 0, float("-inf"))

        if attn_mask is not None:
            # attn_mask: (B, T) → (B, 1, 1, T)
            pad_mask = attn_mask.view(B, 1, 1, T)
            att = att.masked_fill(pad_mask == 0, float("-inf"))

        att = torch.softmax(att, dim=-1)
        att = self.dropout(att)

        y = att @ v                    # (B, H, T, head_dim)
        y = y.transpose(1, 2).contiguous().view(B, T, C)
        y = self.proj(y)
        y = self.dropout(y)
        return y

In [26]:
class TransformerBlock(nn.Module):
    def __init__(self, config: GPTConfig):
        super().__init__()
        self.ln1 = nn.LayerNorm(config.d_model)
        self.attn = CausalSelfAttention(config)
        self.ln2 = nn.LayerNorm(config.d_model)
        self.ff = nn.Sequential(
            nn.Linear(config.d_model, config.d_ff),
            nn.GELU(),
            nn.Linear(config.d_ff, config.d_model),
            nn.Dropout(config.dropout),
        )

    def forward(self, x, attn_mask=None):
        x = x + self.attn(self.ln1(x), attn_mask=attn_mask)
        x = x + self.ff(self.ln2(x))
        return x

In [27]:
class MiniGPT(nn.Module):
    def __init__(self, config: GPTConfig):
        super().__init__()
        self.config = config

        self.tok_emb = nn.Embedding(config.vocab_size, config.d_model)
        self.pos_emb = nn.Embedding(config.max_len, config.d_model)
        self.dropout = nn.Dropout(config.dropout)

        self.blocks = nn.ModuleList(
            [TransformerBlock(config) for _ in range(config.n_layer)]
        )
        self.ln_f = nn.LayerNorm(config.d_model)
        self.head = nn.Linear(config.d_model, config.vocab_size, bias=False)

        # (선택) 입력 임베딩과 출력 head weight tying
        self.head.weight = self.tok_emb.weight

    def forward(self, idx, attn_mask=None):
        # idx: (B, T)
        B, T = idx.size()
        device = idx.device

        pos = torch.arange(0, T, dtype=torch.long, device=device)
        pos = pos.unsqueeze(0).expand(B, T)  # (B, T)

        x = self.tok_emb(idx) + self.pos_emb(pos)
        x = self.dropout(x)

        for block in self.blocks:
            x = block(x, attn_mask=attn_mask)

        x = self.ln_f(x)
        logits = self.head(x)  # (B, T, vocab_size)
        return logits

In [28]:
@torch.no_grad()
def generate_text(model, tokenizer, prompt: str, max_new_tokens: int = 200, device="cpu"):
    
    model.eval()
    max_len = model.config.max_len
    eos_token_id = tokenizer.eos_token_id

    # 1) 처음에는 패딩 없이 실제 길이만큼만 인코딩
    enc = tokenizer(
        prompt,
        return_tensors="pt",
    )
    x = enc["input_ids"].to(device)      # (1, T0)
    attn_mask = enc["attention_mask"].to(device)  # (1, T0)

    for _ in range(max_new_tokens):
        # 2) 모델에 넣기 전에 길이가 max_len을 넘으면 뒤에서 max_len만 유지
        if x.size(1) > max_len:
            x = x[:, -max_len:]
            attn_mask = attn_mask[:, -max_len:]

        # 3) forward
        logits = model(x, attn_mask=attn_mask)        # (1, T, vocab)
        last_logits = logits[:, -1, :]                # (1, vocab)

        probs = torch.softmax(last_logits, dim=-1)
        next_id = torch.multinomial(probs, num_samples=1)  # (1,1)
        next_token = next_id.item()

        if next_token == eos_token_id:
            break

        # 4) 새 토큰 이어붙이기
        x = torch.cat([x, next_id], dim=1)  # (1, T+1)
        next_mask = torch.ones_like(next_id, device=device)
        attn_mask = torch.cat([attn_mask, next_mask], dim=1)

    # 5) 결과 디코딩
    out_ids = x[0].tolist()
    text = tokenizer.decode(out_ids, skip_special_tokens=True)
    return text

In [29]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print("device:", device)


pad_id = tokenizer.pad_token_id
vocab_size = tokenizer.vocab_size

# 2) 데이터 로딩
train_texts = load_text_samples("lotto_train.txt")
val_texts = load_text_samples("lotto_val.txt")  # 없다면 주석 처리하고 train만 써도 됨

print("train samples:", len(train_texts))
print("val samples:", len(val_texts))

max_len = 512

train_ds = LottoDataset(train_texts, tokenizer, max_len=max_len, focus_len=118)
val_ds = LottoDataset(val_texts, tokenizer, max_len=max_len, focus_len=118)

train_loader = DataLoader(train_ds, batch_size=8, shuffle=True)
val_loader = DataLoader(val_ds, batch_size=8)

# 3) 모델 & optimizer & loss
config = GPTConfig(
    vocab_size=vocab_size,
    n_layer=4,
    n_head=4,
    d_model=256,
    d_ff=1024,
    max_len=max_len,
    dropout=0.1,
    pad_id=pad_id,
)

model = MiniGPT(config).to(device)

optimizer = torch.optim.AdamW(model.parameters(), lr=3e-4)
criterion = nn.CrossEntropyLoss(ignore_index=-100)

device: cuda
train samples: 150000
val samples: 15000


# Eos 확인

In [30]:
print("pad_token:", tokenizer.pad_token, "id:", tokenizer.pad_token_id)
print("eos_token:", tokenizer.eos_token, "id:", tokenizer.eos_token_id)

pad_token: <|endoftext|> id: 50256
eos_token: <|endoftext|> id: 50256


In [31]:
print("GLOBAL tokenizer:")
print("  pad_token:", tokenizer.pad_token, "id:", tokenizer.pad_token_id)
print("  eos_token:", tokenizer.eos_token, "id:", tokenizer.eos_token_id)

print("\nDATASET 내부 tokenizer:")
print("  pad_token:", train_ds.tokenizer.pad_token, "id:", train_ds.tokenizer.pad_token_id)
print("  eos_token:", train_ds.tokenizer.eos_token, "id:", train_ds.tokenizer.eos_token_id)

GLOBAL tokenizer:
  pad_token: <|endoftext|> id: 50256
  eos_token: <|endoftext|> id: 50256

DATASET 내부 tokenizer:
  pad_token: <|endoftext|> id: 50256
  eos_token: <|endoftext|> id: 50256


#학습


In [20]:
import torch.nn.functional as F

pad_target_id = -100        # y에서 pad 자리에 들어있는 값
alpha = 10.0                 # 결과/수익률 구간 가중치 배율
epochs = 10 
best_val_loss = float("inf")

train_losses = []
val_losses = []

accum_steps = 4  # 그대로 (batch 8 * 4 = effective 32)

for epoch in range(1, epochs + 1):

    # ----- Train -----
    model.train()
    total_loss = 0.0
    optimizer.zero_grad()

    for step, (x, y, mask, roi) in enumerate(train_loader, start=1):
        x    = x.to(device)       # (B, T)
        y    = y.to(device)       # (B, T)  pad는 -100
        mask = mask.to(device)    # (B, T)
        roi  = roi.to(device)     # (B, T)

        logits = model(x, attn_mask=mask)   # (B, T, vocab)
        B, T, V = logits.size()

        # 1) 펼치기
        logits_flat = logits.view(-1, V)    # (B*T, V)
        y_flat      = y.view(-1)           # (B*T,)
        roi_flat    = roi.view(-1).float() # (B*T,)

        # 2) 토큰별 CE loss (pad는 ignore_index=-100)
        per_token_loss = F.cross_entropy(
            logits_flat,
            y_flat,
            reduction="none",
            ignore_index=pad_target_id,
        )  # (B*T,)

        # 3) pad 아닌 위치 마스크
        non_pad_mask = (y_flat != pad_target_id).float()  # pad면 0, 나머지 1

        # roi가 pad 위치를 건드리지 않도록 한 번 더 마스킹
        roi_flat = roi_flat * non_pad_mask

        # 4) 가중치: 기본 1, roi 구간은 alpha배
        base_w  = non_pad_mask                  # pad: 0, 나머지: 1
        weights = base_w + roi_flat * (alpha - 1.0)
        # -> roi_flat=1이면 1+(α-1)=α, roi_flat=0이면 1

        # 5) 최종 loss: 가중 평균
        loss = (per_token_loss * weights).sum() / (weights.sum() + 1e-8)

        total_loss += loss.item()

        # gradient accumulation
        loss = loss / accum_steps
        loss.backward()

        if step % accum_steps == 0:
            optimizer.step()
            optimizer.zero_grad()

    else:
        # 마지막에 accum_steps로 딱 안 나누어떨어지는 경우 방어 코드
        if (step % accum_steps) != 0:
            optimizer.step()
            optimizer.zero_grad()

    avg_train_loss = total_loss / len(train_loader)
    train_losses.append(avg_train_loss)

    # ----- Validation (여기는 가중치 없이, pad만 무시) -----
    model.eval()
    val_loss_sum = 0.0

    with torch.no_grad():
        for x, y, mask, roi in val_loader:
            x    = x.to(device)
            y    = y.to(device)
            mask = mask.to(device)

            logits = model(x, attn_mask=mask)
            B, T, V = logits.size()

            logits_flat = logits.view(-1, V)
            y_flat      = y.view(-1)

            per_token_loss = F.cross_entropy(
                logits_flat,
                y_flat,
                reduction="none",
                ignore_index=pad_target_id,
            )

            non_pad_mask = (y_flat != pad_target_id).float()
            val_loss = (per_token_loss * non_pad_mask).sum() / (non_pad_mask.sum() + 1e-8)
            val_loss_sum += val_loss.item()

    avg_val_loss = val_loss_sum / len(val_loader)
    val_losses.append(avg_val_loss)

    print(f"[Epoch {epoch:03d}] train_loss={avg_train_loss:.4f}, val_loss={avg_val_loss:.4f}")

    # ----- 체크포인트 -----
    torch.save(model.state_dict(), f"lotto_gpt_epoch{epoch:03d}.pt")
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        torch.save(model.state_dict(), "lotto_gpt_best.pt")

[Epoch 001] train_loss=0.7677, val_loss=0.7206
[Epoch 002] train_loss=0.1722, val_loss=0.6180
[Epoch 003] train_loss=0.1546, val_loss=0.6101
[Epoch 004] train_loss=0.1516, val_loss=0.6078
[Epoch 005] train_loss=0.1505, val_loss=0.6074
[Epoch 006] train_loss=0.1498, val_loss=0.6068
[Epoch 007] train_loss=0.1494, val_loss=0.6065
[Epoch 008] train_loss=0.1491, val_loss=0.6058
[Epoch 009] train_loss=0.1490, val_loss=0.6060
[Epoch 010] train_loss=0.1488, val_loss=0.6059


In [32]:
import matplotlib.pyplot as plt

plt.figure(figsize=(8, 5))
plt.plot(range(1, epochs + 1), train_losses, label="train_loss")
plt.plot(range(1, epochs + 1), val_losses, label="val_loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Training / Validation Loss")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

plt.savefig("loss_curve.png")

NameError: name 'epochs' is not defined

<Figure size 800x500 with 0 Axes>

In [33]:
example_prompt = (
    "money=5000\n"
    "winning=1,2,3,4,5,6\n"
    "bonus=7\n"
    "###\n"
    # "티켓수=5\n"
    # "구매번호:\n"
    # "[1,2,3,4,5,6]\n"
)

print("prompt len (tokens):", tokenizer(example_prompt, return_tensors="pt")["input_ids"].shape)
print("model max_len:", model.config.max_len)

generated = generate_text(model, tokenizer, example_prompt, max_new_tokens=512, device=device)
print("=== SAMPLE GENERATION ===")
print(generated)

prompt len (tokens): torch.Size([1, 25])
model max_len: 512
=== SAMPLE GENERATION ===




































































































































































































































































































































































































































































































































In [34]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print("device:", device)

model = MiniGPT(config).to(device)

# 체크포인트 로드 (파일 이름 맞게 수정 가능)
ckpt_path = "lotto_gpt_best.pt"   # 또는 "lotto_gpt_epoch030.pt" 등
state = torch.load(ckpt_path, map_location=device)
model.load_state_dict(state)

model.eval()
print("checkpoint loaded from:", ckpt_path)

device: cuda
checkpoint loaded from: lotto_gpt_best.pt


In [35]:
example_prompt = (
    "money=1000\n"
    "winning=1,2,3,4,5,6\n"
    "bonus=7\n"
    "###\n"
)

print("prompt len (tokens):", tokenizer(example_prompt, return_tensors="pt")["input_ids"].shape)
print("model max_len:", model.config.max_len)

generated = generate_text(model, tokenizer, example_prompt, max_new_tokens=512, device=device)
print("=== SAMPLE GENERATION ===")
print(generated)

prompt len (tokens): torch.Size([1, 25])
model max_len: 512
=== SAMPLE GENERATION ===
money=1000
winning=1,2,3,4,5,6
bonus=7
###
티켓수=1
구매번호:
[5,18,24,29,31,35]
3개일치 (5000원) = 0
4개일치 (50000원) = 0
5개일치 (1500000원) = 0
5개보너스일치 (30000000원) = 0
6개일치 (2000000000원) = 0
수익률=0.0%


## Test

Test Data set을 이용하여 구간별 정답률 테스트

In [53]:
def load_blocks(path: str):
    with open(path, "r", encoding="utf-8") as f:
        text = f.read().strip()
    # 빈 줄 기준으로 블록 나누기
    raw_blocks = text.split("\n\n")
    # 공백만 있는 블록 제거
    blocks = [b.strip() for b in raw_blocks if b.strip()]
    return blocks

test_blocks = load_blocks("lotto_test.txt")
print("test blocks:", len(test_blocks))
print("첫 블록:\n", test_blocks[0])

test blocks: 15000
첫 블록:
 money=7000
winning=7,24,27,29,39,45
bonus=1
###
티켓수=7
구매번호:
[7,9,12,18,28,38]
[4,14,17,18,28,30]
[5,6,14,16,23,32]
[3,16,20,22,34,35]
[4,8,11,14,16,23]
[7,24,28,29,39,45]
[7,16,22,24,28,30]
3개일치 (5000원) = 0
4개일치 (50000원) = 0
5개일치 (1500000원) = 1
5개보너스일치 (30000000원) = 0
6개일치 (2000000000원) = 0
수익률=21428.6%


In [79]:
def get_prompt_for_testset(block: str) -> str:
    """
    블록에서 티켓 생성용 프롬프트 부분만 추출
    """
    lines = block.splitlines()
    prompt_lines = []
    for line in lines:
        if line.startswith("money=") or line.startswith("winning=") or line.startswith("bonus="):
            prompt_lines.append(line)
        elif line.strip() == "###":
            prompt_lines.append(line)
            break  # ### 이후는 티켓 정보이므로 중단
    prompt = "\n".join(prompt_lines) + "\n"
    return prompt

print("=== TEST SET용 프롬프트 예시 ===")
get_prompt_for_testset(test_blocks[0])

=== TEST SET용 프롬프트 예시 ===


'money=7000\nwinning=7,24,27,29,39,45\nbonus=1\n###\n'

### 형식검사

- parse_ticket_count(block) -> int | None
- check_ticket_lines_format(block) -> dict | None
- parse_result_labels(block) -> dict | None

In [106]:
import re

def parse_ticket_count(block: str):
    """
    '티켓수=숫자' 에서 숫자만 뽑는다.
    없거나 정수가 아니면 None 반환.
    """
    m = re.search(r"티켓수\s*=\s*([0-9]+)", block)
    if not m:
        return None

    try:
        return int(m.group(1))
    except ValueError:
        return None
print(parse_ticket_count(test_blocks[0]))

7


In [89]:
import re

def check_ticket_lines_format(block: str):
    """
    1-2. 구매번호 줄 형식 검사
    - '구매번호:' 라인이 존재하는지
    - 그 아래에 [...] 형태 줄들이 있는지
    - 각 줄에서 숫자를 파싱할 수 있는지

    반환:
        {
            "has_section": True/False,        # 구매번호: 라인 존재 여부
            "ticket_lines": [...],            # 실제 [...] 형태 줄 리스트
            "parsed_numbers": [[...], ...],   # 각 줄 숫자 리스트
        }
    """

    lines = block.splitlines()
    has_section = False
    start = None

    # 1) 구매번호: 찾기
    for idx, line in enumerate(lines):
        if line.strip() == "구매번호:":
            has_section = True
            start = idx + 1
            break

    if not has_section:
        return {
            "has_section": False,
            "ticket_lines": [],
            "parsed_numbers": [],
        }

    # 2) 구매번호 아래 줄들 수집
    ticket_lines = []
    for line in lines[start:]:
        s = line.strip()
        # 결과 항목 나오면 구매번호 section 끝난 것으로 간주
        if s.startswith("3개일치") or s.startswith("4개일치") or s.startswith("5개일치") or s.startswith("6개일치") or s.startswith("수익률"):
            break
        if s.startswith("[") and s.endswith("]"):
            ticket_lines.append(s)

    # 3) 숫자 파싱
    parsed = []
    try:
        for t in ticket_lines:
            nums = re.findall(r"\d+", t)
            parsed.append([int(n) for n in nums])
    except ValueError:
        return {
            "has_section": True,
            "ticket_lines": ticket_lines,
            "parsed_numbers": []
        }
    return {
        "has_section": True,
        "ticket_lines": ticket_lines,
        "parsed_numbers": parsed,
    }
        
print(check_ticket_lines_format(test_blocks[0]))

{'has_section': True, 'ticket_lines': ['[7,9,12,18,28,38]', '[4,14,17,18,28,30]', '[5,6,14,16,23,32]', '[3,16,20,22,34,35]', '[4,8,11,14,16,23]', '[7,24,28,29,39,45]', '[7,16,22,24,28,30]'], 'parsed_numbers': [[7, 9, 12, 18, 28, 38], [4, 14, 17, 18, 28, 30], [5, 6, 14, 16, 23, 32], [3, 16, 20, 22, 34, 35], [4, 8, 11, 14, 16, 23], [7, 24, 28, 29, 39, 45], [7, 16, 22, 24, 28, 30]]}


In [91]:
import re

def parse_result_labels(block: str):
    """
    1-3. 라벨(결과) 줄 형식 검사 + 파싱
    아래 6개가 모두 존재하고 숫자로 파싱되면 dict 반환,
    하나라도 없거나 숫자 변환 실패하면 None 반환.

      3개일치 (...) = X
      4개일치 (...) = Y
      5개일치 (...) = Z
      5개보너스일치 (...) = A
      6개일치 (...) = B
      수익률 = R%
    """

    m3  = re.search(r"3개일치\s*\([^)]*\)\s*=\s*(-?\d+)", block)
    m4  = re.search(r"4개일치\s*\([^)]*\)\s*=\s*(-?\d+)", block)
    m5  = re.search(r"5개일치\s*\([^)]*\)\s*=\s*(-?\d+)", block)
    m5b = re.search(r"5개보너스일치\s*\([^)]*\)\s*=\s*(-?\d+)", block)
    m6  = re.search(r"6개일치\s*\([^)]*\)\s*=\s*(-?\d+)", block)
    mroi = re.search(r"수익률\s*=?\s*([-+]?\d+(?:\.\d+)?)\s*%", block)

    # 하나라도 없으면 형식 실패
    if not all([m3, m4, m5, m5b, m6, mroi]):
        return None

    try:
        labels = {
            "m3":  int(m3.group(1)),
            "m4":  int(m4.group(1)),
            "m5":  int(m5.group(1)),
            "m5b": int(m5b.group(1)),
            "m6":  int(m6.group(1)),
            "roi": float(mroi.group(1)),  # 예: 21428.6
        }
    except ValueError:
        # 숫자로 변환 안 되면 형식 실패
        return None

    return labels

print(parse_result_labels(test_blocks[0]))

{'m3': 0, 'm4': 0, 'm5': 1, 'm5b': 0, 'm6': 0, 'roi': 21428.6}


### 내부 일관성 확인(llm 출력안에서 검사)

In [93]:
def check_ticket_count_consistency(N: int, list_ticket: list  ):
    """
    N: 선언된 티켓수
    M: 실제 구매번호 줄 개수
    """
    return N == len(list_ticket)
print(check_ticket_count_consistency(5, [[1,2,3,4,5,6], [7,8,9,10,11,12]]))  # False
print(check_ticket_count_consistency(2, [[1,2,3,4,5,6], [7,8,9,10,11,12]]))  # True

False
True


In [95]:
def validate_ticket_numbers(parsed_numbers):
    """
    parsed_numbers: [[...], [...], ...]

    반환:
      (invalid_count, duplicate_count, out_of_range_count)
    """

    invalid_count = 0
    duplicate_count = 0
    out_of_range_count = 0

    for nums in parsed_numbers:
        # 개수 검사
        if len(nums) != 6:
            invalid_count += 1

        # 중복 검사
        if len(set(nums)) != len(nums):
            duplicate_count += 1

        # 범위 검사
        for n in nums:
            if n < 1 or n > 45:
                out_of_range_count += 1
                break

    return invalid_count, duplicate_count, out_of_range_count
print(validate_ticket_numbers([[1,2,3,4,5,6], [7,8,9,10,11,11], [0,2,3,4,5,6], [1,2,3]]))  # (1,1,1)

(1, 1, 1)


### 계산 정확도 레벨 (기능 테스트)

In [108]:
def count_matches(ticket, winning_numbers, bonus_number):
    """
    ticket: [n1, n2, ..., n6]
    winning_numbers: [w1, ..., w6]
    bonus_number: int
    """
    win_set = set(winning_numbers)
    match_cnt = sum(1 for n in ticket if n in win_set)
    bonus_match = bonus_number in ticket
    return match_cnt, bonus_match
print(count_matches([1,2,3,4,5,6], [4,5,6,7,8,9], 10))  # (3, False)
print(count_matches([1,2,3,4,5,10], [4,5,6,7,8,9], 10))  # (2, True)

(3, False)
(2, True)


In [109]:
def compute_match_stats(tickets, winning_numbers, bonus_number):
    """
    tickets: [[...], [...], ...]
    반환: {"m3": x, "m4": y, "m5": z, "m5b": a, "m6": b}
    """

    stats = {"m3": 0, "m4": 0, "m5": 0, "m5b": 0, "m6": 0}

    for t in tickets:
        match_cnt, bonus_match = count_matches(t, winning_numbers, bonus_number)

        if match_cnt == 3:
            stats["m3"] += 1
        elif match_cnt == 4:
            stats["m4"] += 1
        elif match_cnt == 5 and bonus_match:
            stats["m5b"] += 1
        elif match_cnt == 5:
            stats["m5"] += 1
        elif match_cnt == 6:
            stats["m6"] += 1

    return stats
print(compute_match_stats([[1,2,3,4,5,6], [4,5,6,7,8,9], [1,2,3,4,5,10]], [4,5,6,7,8,9], 10))

{'m3': 1, 'm4': 0, 'm5': 0, 'm5b': 0, 'm6': 1}


In [110]:
def check_match_stats(tickets, winning_numbers, bonus_number, labels):
    """
    tickets: LLM이 출력한 구매번호 리스트 (parsed_numbers)
    winning_numbers, bonus_number: 데이터셋에서 가져온 당첨 정보
    labels: parse_result_labels(block) 결과 dict (m3~m6 포함)

    반환:
      (m3_ok, m4_ok, m5_ok, m5b_ok, m6_ok)
    """

    stats = compute_match_stats(tickets, winning_numbers, bonus_number)

    m3_ok  = (stats["m3"]  == labels.get("m3"))
    m4_ok  = (stats["m4"]  == labels.get("m4"))
    m5_ok  = (stats["m5"]  == labels.get("m5"))
    m5b_ok = (stats["m5b"] == labels.get("m5b"))
    m6_ok  = (stats["m6"]  == labels.get("m6"))

    return m3_ok, m4_ok, m5_ok, m5b_ok, m6_ok
print(check_match_stats(
    [[1,2,3,4,5,6], [4,5,6,7,8,9], [1,2,3,4,5,10]],
    [4,5,6,7,8,9],  10,
    {"m3":1, "m4":0, "m5":0, "m5b":0, "m6":1}
))  # (True, True, True, True, True)

(True, True, True, True, True)


In [None]:
def check_roi_exact(labels, money):
    """
    labels: {
        "m3": X,
        "m4": Y,
        "m5": Z,
        "m5b": A,
        "m6": B,
        "roi": R
    }
    money: 총 사용 금액

    return: True / False
    """

    # 상금 규칙
    reward = (
        labels["m3"]  * 5000 +
        labels["m4"]  * 50000 +
        labels["m5"]  * 1500000 +
        labels["m5b"] * 30000000 +
        labels["m6"]  * 2000000000
    )

    # 계산 수익률
    calc_roi = (reward / money) * 100

    # LLM이 쓴 수익률
    llm_roi = labels["roi"]
    print("asdf",calc_roi, llm_roi)

    return round(calc_roi, 1) == round(llm_roi, 1)

In [168]:
import re

def parse_money_from_prompt(prompt: str):
    m = re.search(r"money\s*=\s*(\d+)", prompt)
    if not m:
        return None
    return int(m.group(1))


def parse_winning_and_bonus_from_prompt(prompt: str):
    mwin = re.search(r"winning=([0-9,]+)", prompt)
    mbonus = re.search(r"bonus=(\d+)", prompt)

    if not mwin or not mbonus:
        return None, None

    winning_numbers = list(map(int, mwin.group(1).split(",")))
    bonus_number = int(mbonus.group(1))

    return winning_numbers, bonus_number

## 테스트 한줄 통합

In [184]:
def test_one_sample(input: str, block: str):
    """
    테스트셋 블록 1개를 검사하는 함수.
    기존에 만든 함수들만 호출한다.
    """
    # print(input, "\n---\n", block)

    results = {
        "ok" : True,
        "format_ok" : True,
        "logic_ok" : True,
        "errors": []
    }

    # 1-1. 티켓수 파싱
    ticket_count = parse_ticket_count(block)

    if ticket_count is None:
        results["ok"] = False
        results["format_ok"] = False
        results["errors"].append("ticket_count_parse_error")
        
        return results

    # 1-2. 구매번호 줄 파싱
    ticket_info = check_ticket_lines_format(block)
    ticket_lines = ticket_info["ticket_lines"]
    parsed_numbers = ticket_info["parsed_numbers"]

    if not ticket_info["has_section"] or not parsed_numbers:
        results["ok"] = False
        results["format_ok"] = False
        results["errors"].append("ticket_lines_format_error")
        return results
    
    # 1-3. 라벨 파싱
    labels = parse_result_labels(block)

    if labels is None:
        results["ok"] = False
        results["format_ok"] = False
        results["errors"].append("result_labels_parse_error")
        return results

    # 2-1. 티켓수 일관성
    ticket_count_match = check_ticket_count_consistency(ticket_count, parsed_numbers)
    if not ticket_count_match:
        results["ok"] = False
        results["format_ok"] = False
        results["errors"].append("ticket_count_mismatch")
        return results

    # 2-2. 구매번호 유효성
    invalid_cnt, dup_cnt, range_cnt = validate_ticket_numbers(parsed_numbers)
    if invalid_cnt > 0 or dup_cnt > 0 or range_cnt > 0:
        results["ok"] = False
        results["format_ok"] = False
        results["errors"].append("ticket_numbers_invalid")
        return results
    
    # 인풋데이터 추출
    winning_numbers, bonus_number = parse_winning_and_bonus_from_prompt(input)
    money = parse_money_from_prompt(input)

    # 3-1. n개 일치 검증
    m3_ok, m4_ok, m5_ok, m5b_ok, m6_ok = check_match_stats(
        parsed_numbers,
        winning_numbers,
        bonus_number,
        labels
    )

    error = []
    match_check = all([m3_ok, m4_ok, m5_ok, m5b_ok, m6_ok])
    if not match_check:
        error.append("match_stats_mismatch")
    # 3-2. 수익률 검증
    roi_ok = check_roi_exact(labels, money)
    if not roi_ok:
        error.append("roi_mismatch")

    # 결과 리턴 (단순하게)

    if match_check and roi_ok:
        return {
            "ok": True,
            "format_ok" : True,
            "login_ok": True,
            "errors": []
        }
    else:
        return {
            "ok": False,
            "format_ok" : True,
            "login_ok": match_check and roi_ok,
            "errors": error
        }

print(test_one_sample(get_prompt_for_testset(test_blocks[0]), test_blocks[0]))

{'ok': True, 'format_ok': True, 'login_ok': True, 'errors': []}


### 전체 테스트

In [183]:
def evaluate_testset(   
        model,
        tokenizer,
        test_path: str = "lotto_test.txt",
        device: str = "cpu",
        max_samples: int = 100,
        max_new_tokens: int = 256,
    ):

    blocks = load_blocks(test_path)

    stats = {
        "n_total": 0,
        "n_ok": 0,
        "n_fail": 0,
        
        "n_format_fail": 0,
        "n_logic_fail": 0,

        "ticket_count_parse_error": 0,
        "ticket_lines_format_error": 0,
        "result_labels_parse_error": 0,
        "ticket_count_mismatch": 0,
        "ticket_numbers_invalid": 0,

        "match_stats_mismatch": 0,
        "roi_mismatch": 0,

        "test_fail": 0,  # 형식은 통과했는데 match_check/roi_ok에서 깨진 경우
    }

    for i, block in enumerate(blocks):
        if max_samples is not None and i >= max_samples:
            break

        prompt = get_prompt_for_testset(block)
        get_text = generate_text(
            model,
            tokenizer,
            prompt,
            max_new_tokens=max_new_tokens,
            device=device,
        )
        res = test_one_sample(prompt, get_text)

        stats["n_total"] += 1

        if res.get("ok"):
            stats["n_ok"] += 1
            continue

        # 실패
        stats["n_fail"] += 1

        err = res.get("errors")
        # print(err)
        if not res.get("format_ok"):
            stats["n_format_fail"] += 1
            if "ticket_count_parse_error" in err:
                stats["ticket_count_parse_error"] += 1
            elif "ticket_lines_format_error" in err:
                stats["ticket_lines_format_error"] += 1
            elif "result_labels_parse_error" in err:
                stats["result_labels_parse_error"] += 1
            elif "ticket_count_mismatch" in err:
                stats["ticket_count_mismatch"] += 1
            elif "ticket_numbers_invalid" in err:
                stats["ticket_numbers_invalid"] += 1
        
        if not res.get("logic_ok"):
            stats["n_logic_fail"] += 1
            if "match_stats_mismatch" in err:
                stats["match_stats_mismatch"] += 1
            if "roi_mismatch" in err:
                stats["roi_mismatch"] += 1

    return stats

In [185]:
summary = evaluate_testset(
    model=model,
    tokenizer=tokenizer,
    test_path="lotto_test.txt",
    device=device,
    max_samples=5,
    max_new_tokens=512
)

print("=== TEST SUMMARY ===")
for k, v in summary.items():
    print(k, ":", v)

=== TEST SUMMARY ===
n_total : 5
n_ok : 2
n_fail : 3
n_format_fail : 0
n_logic_fail : 3
ticket_count_parse_error : 0
ticket_lines_format_error : 0
result_labels_parse_error : 0
ticket_count_mismatch : 0
ticket_numbers_invalid : 0
match_stats_mismatch : 3
roi_mismatch : 1
test_fail : 0


In [None]:
import re

def parse_tickets_from_generated(text: str):
    """
    LLM 출력에서 구매번호 리스트 파싱
    예:
    [1,3,7,9,21,22]
    [3,8,11,15,22,35]
    ...
    """
    tickets = []
    lines = text.splitlines()

    for line in lines:
        line = line.strip()
        if line.startswith("[") and line.endswith("]"):
            try:
                nums = re.findall(r"\d+", line)
                nums = list(map(int, nums))
                tickets.append(nums)
            except:
                return None

    return tickets

In [None]:
import re

def parse_labels_from_block(text: str):
    """
    텍스트(데이터셋/LLM 출력 공통)에서
    3개일치~6개일치, 수익률(%)을 파싱.
    없거나 숫자 변환 실패 시 None.
    """
    m3  = re.search(r"3개일치\s*\([^)]*\)\s*=\s*(-?\d+)", text)
    m4  = re.search(r"4개일치\s*\([^)]*\)\s*=\s*(-?\d+)", text)
    m5  = re.search(r"5개일치\s*\([^)]*\)\s*=\s*(-?\d+)", text)
    m5b = re.search(r"5개보너스일치\s*\([^)]*\)\s*=\s*(-?\d+)", text)
    m6  = re.search(r"6개일치\s*\([^)]*\)\s*=\s*(-?\d+)", text)
    mroi = re.search(r"수익률\s*=?\s*([-+]?\d+(?:\.\d+)?)\s*%", text)

    if not all([m3, m4, m5, m5b, m6, mroi]):
        return None

    try:
        return {
            "m3":  int(m3.group(1)),
            "m4":  int(m4.group(1)),
            "m5":  int(m5.group(1)),
            "m5b": int(m5b.group(1)),
            "m6":  int(m6.group(1)),
            "roi": float(mroi.group(1)),   # 예: 125.0
        }
    except ValueError:
        return None

In [56]:
def parse_money_from_block(block: str) -> int:
    m = re.search(r"money\s*=\s*(\d+)", block)
    if not m:
        return 0
    return int(m.group(1))

In [58]:
def count_matches(ticket, winning, bonus):
    """
    ticket: [6개 숫자]
    winning: set([...])
    bonus: int
    """
    match_cnt = len(set(ticket) & winning)
    bonus_match = bonus in ticket
    return match_cnt, bonus_match

In [59]:
def calculate_roi(stat, money:int):
    """
    stat: simulate_lotto 결과 dict
    """

    reward_table = {
        "m3": 5000,
        "m4": 50000,
        "m5": 1500000,
        "m5b": 30000000,
        "m6": 2000000000,
    }

    total_reward = 0
    for k, v in stat.items():
        total_reward += reward_table[k] * v

    if money <= 0:
        return 0.0

    roi = total_reward / money * 100.0
    return roi


def stat_values(stat):
    return [stat[k] for k in ["m3", "m4", "m5", "m5b", "m6"]]

In [60]:
def simulate_lotto(tickets, winning_numbers, bonus):
    """
    tickets: [[6개], [6개], ...]
    """

    win_set = set(winning_numbers)

    result = {
        "m3": 0,
        "m4": 0,
        "m5": 0,
        "m5b": 0,
        "m6": 0
    }

    for t in tickets:
        match_cnt, bonus_match = count_matches(t, win_set, bonus)

        if match_cnt == 3:
            result["m3"] += 1
        elif match_cnt == 4:
            result["m4"] += 1
        elif match_cnt == 5 and bonus_match:
            result["m5b"] += 1
        elif match_cnt == 5:
            result["m5"] += 1
        elif match_cnt == 6:
            result["m6"] += 1

    return result

In [None]:
def functional_test_one_sample(gen_text, gt_block):
    """
    gen_text: LLM이 출력한 전체 텍스트
    gt_block: 데이터셋 원본 블록 (정답 포함)
    """

    # 1. 정답 파싱
    gt = parse_labels_from_block(gt_block)
    if gt is None:
        return {
            "ok": False,
            "reason": "no_gt"
        }

    # 2. LLM 출력에서 구매번호 파싱
    pred_tickets = parse_tickets_from_generated(gen_text)

    if not pred_tickets:
        return {
            "ok": False,
            "reason": "ticket_parse_fail"
        }

    # 3. winning / bonus 파싱
    winning = re.search(r"winning=([0-9,]+)", gt_block)
    bonus = re.search(r"bonus=(\d+)", gt_block)
    money = parse_money_from_block(gt_block)

    if not winning or not bonus:
        return {
            "ok": False,
            "reason": "cannot_parse_winning"
        }

    winning_nums = list(map(int, winning.group(1).split(",")))
    bonus_num = int(bonus.group(1))

    # 4. 실제 시뮬레이션
    sim_stat = simulate_lotto(pred_tickets, winning_nums, bonus_num)

    # 5. 데이터셋 기준 ROI 계산
    sim_roi = calculate_roi(sim_stat, money)

    # 6. 정확도 비교
    acc = {
        "m3_match": sim_stat["m3"] == gt["m3"],
        "m4_match": sim_stat["m4"] == gt["m4"],
        "m5_match": sim_stat["m5"] == gt["m5"],
        "m5b_match": sim_stat["m5b"] == gt["m5b"],
        "m6_match": sim_stat["m6"] == gt["m6"],
        "roi_diff": sim_roi == gt["roi"],
    }

    return {
        "ok": True,
        "simulated": sim_stat,
        "simulated_roi": sim_roi,
        "gt": gt,
        "acc": acc
    }

In [70]:
def run_functional_tests(
    model,
    tokenizer,
    test_path: str = "lotto_test.txt",
    device: str = "cpu",
    max_samples: int = 100,
    max_new_tokens: int = 256,
):
    """
    functional_test_one_sample(gen_text, gt_block)를 이용해서
    데이터셋 전체를 테스트한다.
    """

    blocks = load_blocks(test_path)

    results = []

    total = 0
    ok_count = 0
    fail_count = 0
    ticket_parse_fail = 0
    winning_parse_fail = 0

    # 정확도 누적
    hit_match_counts = {
        "m3": 0,
        "m4": 0,
        "m5": 0,
        "m5b": 0,
        "m6": 0,
    }
    hit_eval_counts = {
        "m3": 0,
        "m4": 0,
        "m5": 0,
        "m5b": 0,
        "m6": 0,
    }

    roi_abs_error_sum = 0.0
    roi_eval_count = 0

    for i, blk in enumerate(blocks):
        if i >= max_samples:
            break

        # --- 1) 프롬프트 생성 ---
        prompt = make_prompt_for_tickets(blk)

        # --- 2) LLM 호출 ---
        gen_text = generate_text(
            model,
            tokenizer,
            prompt,
            device=device,
            max_new_tokens=max_new_tokens
        )

        # --- 3) 핵심 테스트 함수 호출 ---
        r = functional_test_one_sample(gen_text, blk)

        total += 1
        results.append(r)

        if not r["ok"]:
            fail_count += 1
            if r["reason"] == "ticket_parse_fail":
                ticket_parse_fail += 1
            elif r["reason"] == "cannot_parse_winning":
                winning_parse_fail += 1
            continue

        ok_count += 1

        # --- 4) 정확도 누적 ---
        acc = r["acc"]

        if "m3_match" in acc:
            hit_eval_counts["m3"] += 1
            if acc["m3_match"]:
                hit_match_counts["m3"] += 1

        if "m4_match" in acc:
            hit_eval_counts["m4"] += 1
            if acc["m4_match"]:
                hit_match_counts["m4"] += 1

        if "m5_match" in acc:
            hit_eval_counts["m5"] += 1
            if acc["m5_match"]:
                hit_match_counts["m5"] += 1

        if "m5b_match" in acc:
            hit_eval_counts["m5b"] += 1
            if acc["m5b_match"]:
                hit_match_counts["m5b"] += 1

        if "m6_match" in acc:
            hit_eval_counts["m6"] += 1
            if acc["m6_match"]:
                hit_match_counts["m6"] += 1

        # ROI 오차 누적
        if "roi_diff" in acc:
            roi_abs_error_sum += acc["roi_diff"]
            roi_eval_count += 1

    # --- 5) 최종 Summary 계산 ---
    if total == 0:
        return {
            "n_eval": 0
        }

    parse_success_ratio = ok_count / total

    acc_hit = {}
    for k in hit_match_counts.keys():
        cnt = hit_eval_counts[k] if hit_eval_counts[k] > 0 else 1
        acc_hit[k] = hit_match_counts[k] / cnt

    if roi_eval_count > 0:
        roi_mae = roi_abs_error_sum / roi_eval_count
    else:
        roi_mae = 0.0

    summary = {
        "n_eval": total,
        "success_ratio": parse_success_ratio,
        "fail_cnt": fail_count,
        "fail_ticket_parse": ticket_parse_fail,
        "fail_winning_parse": winning_parse_fail,
        "acc_hit": acc_hit,     # m3~m6 정확도
        "roi_mae": roi_mae      # 수익률 평균 오차
    }

    return summary, results

In [71]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print("device:", device)

model = MiniGPT(config).to(device)

# 체크포인트 로드 (파일 이름 맞게 수정 가능)
ckpt_path = "lotto_gpt_best.pt"   # 또는 "lotto_gpt_epoch030.pt" 등
state = torch.load(ckpt_path, map_location=device)
model.load_state_dict(state)

model.eval()
print("checkpoint loaded from:", ckpt_path)

device: cuda
checkpoint loaded from: lotto_gpt_best.pt


In [74]:
summary, results = run_functional_tests(
    model=model,
    tokenizer=tokenizer,
    test_path="lotto_test.txt",
    device=device,
    max_samples=100,
    max_new_tokens=512
)

print("=== TEST SUMMARY ===")
for k, v in summary.items():
    print(k, ":", v)

=== TEST SUMMARY ===
n_eval : 100
success_ratio : 1.0
fail_cnt : 0
fail_ticket_parse : 0
fail_winning_parse : 0
acc_hit : {'m3': 0.64, 'm4': 0.88, 'm5': 0.93, 'm5b': 1.0, 'm6': 0.97}
roi_mae : 0.37


In [73]:
print(results[0])

{'ok': True, 'simulated': {'m3': 0, 'm4': 0, 'm5': 0, 'm5b': 0, 'm6': 0}, 'simulated_roi': 0.0, 'gt': {'m3': 0, 'm4': 0, 'm5': 1, 'm5b': 0, 'm6': 0, 'roi': 21428.6}, 'acc': {'m3_match': True, 'm4_match': True, 'm5_match': False, 'm5b_match': True, 'm6_match': True, 'roi_diff': False}}
