In [None]:
!pip install datasets tqdm

In [None]:
from datasets import load_dataset
import tqdm

# 1. データセットのロード（ストリーミングモードでメモリ節約）
print("データセットに接続中...")
dataset = load_dataset("izumi-lab/wikipedia-ja-20230720", split="train", streaming=True)

# 2. 保存処理
CORPUS_FILE = "wiki_ja_subset.txt"
print(f"ダウンロードと保存を開始します: {CORPUS_FILE}")

with open(CORPUS_FILE, "w", encoding="utf-8") as f:
    # 最初の1万件だけを取得
    for i, data in enumerate(tqdm.tqdm(dataset)):
        # 改行を除去して1行にする
        text = data["text"].replace("\n", "")

        # 短すぎる記事は除外（100文字以上のみ保存）
        if len(text) > 100:
            f.write(text + "\n")

        # 10,000件でストップ
        if i >= 10000:
            break

print("✅ データセット作成完了！")


In [None]:
from tokenizers import Tokenizer
from tokenizers.models import BPE
from tokenizers.trainers import BpeTrainer
from tokenizers.pre_tokenizers import ByteLevel
from tokenizers.decoders import ByteLevel as ByteLevelDecoder

# 1. モデルの初期化（BPE）
tokenizer = Tokenizer(BPE())

# 2. Pre-tokenizerの設定（バイトレベルで分割）
# GPT-2などと同様、スペースも含めて処理する設定
tokenizer.pre_tokenizer = ByteLevel(add_prefix_space=False)

# 3. Decoderの設定（IDから文字列に戻す用）
tokenizer.decoder = ByteLevelDecoder()

# 4. Trainerの設定
# vocab_size: 語彙数（日本語LLMでは32000〜64000程度が一般的）
# min_frequency: 登場回数がこれ以下のサブワードは作らない
vocab_size = 32000
trainer = BpeTrainer(
    vocab_size=vocab_size,
    min_frequency=2,
    special_tokens=["<|endoftext|>", "<|pad|>"], # 特殊トークンの定義
    show_progress=True
)

# 5. 学習実行
tokenizer.train([CORPUS_FILE], trainer)

# 6. 保存
tokenizer.save("custom_tokenizer.json")
print(f"トークナイザ学習完了。Vocab Size: {tokenizer.get_vocab_size()}")

In [None]:
test_sentence = "生成AIの技術は日進月歩で進化しています。"
encoded = tokenizer.encode(test_sentence)

print(f"Original: {test_sentence}")
print(f"Tokens:   {encoded.tokens}")
print(f"IDs:      {encoded.ids}")

In [None]:
# 1つずつのIDを元の文字に戻して表示する検証コード
print(f"元の文: {test_sentence}\n")
print("--- トークンごとの分割内訳 ---")

for t_id in encoded.ids:
    # IDを1つだけデコードする
    word = tokenizer.decode([t_id])
    print(f"ID: {t_id:5d} | トークン: {word}")

In [None]:
import torch
from torch.utils.data import Dataset

class LLMPretrainDataset(Dataset):
    def __init__(self, txt_file, tokenizer, max_length=512):
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.input_ids = []

        # 全テキストを読み込んでトークナイズ（メモリ注意：本番ではmmap等を使う）
        with open(txt_file, "r", encoding="utf-8") as f:
            text = f.read()

        # 一気にエンコード
        tokens = tokenizer.encode(text).ids

        # max_lengthごとに分割（簡易的なスライディングウィンドウなしの実装）
        # strideをつける場合は step = max_length (オーバーラップなし)
        for i in range(0, len(tokens) - max_length, max_length):
            self.input_ids.append(tokens[i : i + max_length])

    def __len__(self):
        return len(self.input_ids)

    def __getitem__(self, idx):
        # input:  x_1, x_2, ..., x_T
        # target: x_2, x_3, ..., x_{T+1} (1つ右にずらす)
        chunk = self.input_ids[idx]

        x = torch.tensor(chunk, dtype=torch.long)
        y = torch.tensor(chunk, dtype=torch.long) # 実際はずらしてLoss計算時に処理するが、ここではデータとしては同じものを返すのが通例

        return x, y

# 動作確認
dataset = LLMPretrainDataset(CORPUS_FILE, tokenizer, max_length=128)
print(f"総サンプル数: {len(dataset)}")

x, y = dataset[0]
print(f"Input shape: {x.shape}")

In [None]:
from dataclasses import dataclass
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

@dataclass
class ModelArgs:
    dim: int = 512          # 埋め込み次元 (d_model)
    n_layers: int = 8       # レイヤー数
    n_heads: int = 8        # Attentionヘッド数
    vocab_size: int = 32000 # 語彙数 (Vol.1で作成したtokenizerに合わせる)
    multiple_of: int = 256  # SwiGLUの隠れ層次元の調整用
    max_seq_len: int = 512  # 最大コンテキスト長
    dropout: float = 0.1

    # 計算用プロパティ
    @property
    def head_dim(self):
        return self.dim // self.n_heads

In [None]:
class RMSNorm(torch.nn.Module):
    def __init__(self, dim: int, eps: float = 1e-6):
        super().__init__()
        self.eps = eps
        self.weight = nn.Parameter(torch.ones(dim))

    def _norm(self, x):
        # x: (Batch, Seq, Dim)
        return x * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps)

    def forward(self, x):
        output = self._norm(x.float()).type_as(x)
        return output * self.weight

In [None]:
def precompute_freqs_cis(dim: int, end: int, theta: float = 10000.0):
    # 回転角度の事前計算 (複素数平面で考えると楽)
    freqs = 1.0 / (theta ** (torch.arange(0, dim, 2)[: (dim // 2)].float() / dim))
    t = torch.arange(end, device=freqs.device)  # type: ignore
    freqs = torch.outer(t, freqs).float()  # (Seq, Dim/2)
    freqs_cis = torch.polar(torch.ones_like(freqs), freqs)  # complex64
    return freqs_cis

def apply_rotary_emb(xq, xk, freqs_cis):
    # xq, xk: (Batch, Seq, Head, HeadDim) -> 複素数化して回転
    xq_ = torch.view_as_complex(xq.float().reshape(*xq.shape[:-1], -1, 2))
    xk_ = torch.view_as_complex(xk.float().reshape(*xk.shape[:-1], -1, 2))

    # broadcastingのためにshapeを合わせる
    freqs_cis = freqs_cis[:xq.shape[1]].view(1, xq.shape[1], 1, -1)

    # 回転適用
    xq_out = torch.view_as_real(xq_ * freqs_cis).flatten(3)
    xk_out = torch.view_as_real(xk_ * freqs_cis).flatten(3)
    return xq_out.type_as(xq), xk_out.type_as(xk)

In [None]:
class Attention(nn.Module):
    def __init__(self, args: ModelArgs):
        super().__init__()
        self.n_heads = args.n_heads
        self.head_dim = args.head_dim

        self.wq = nn.Linear(args.dim, args.n_heads * self.head_dim, bias=False)
        self.wk = nn.Linear(args.dim, args.n_heads * self.head_dim, bias=False)
        self.wv = nn.Linear(args.dim, args.n_heads * self.head_dim, bias=False)
        self.wo = nn.Linear(args.n_heads * self.head_dim, args.dim, bias=False)

    def forward(self, x, freqs_cis):
        # x: (Batch, Seq, Dim)
        bsz, seqlen, _ = x.shape

        # Q, K, V の射影 & Head分割
        xq = self.wq(x).view(bsz, seqlen, self.n_heads, self.head_dim)
        xk = self.wk(x).view(bsz, seqlen, self.n_heads, self.head_dim)
        xv = self.wv(x).view(bsz, seqlen, self.n_heads, self.head_dim)

        # RoPE の適用 (QとKを回転させる)
        xq, xk = apply_rotary_emb(xq, xk, freqs_cis)

        # Flash Attention (is_causal=True で因果マスク自動適用)
        output = F.scaled_dot_product_attention(
            xq.transpose(1, 2), # (B, H, S, D)
            xk.transpose(1, 2),
            xv.transpose(1, 2),
            is_causal=True
        )

        output = output.transpose(1, 2).contiguous().view(bsz, seqlen, -1)
        return self.wo(output)

In [None]:
class FeedForward(nn.Module):
    def __init__(self, args: ModelArgs):
        super().__init__()
        # 隠れ層のサイズ計算 (Llamaの仕様：2/3 * 4d 程度にしてmultiple_of倍数にする)
        hidden_dim = 4 * args.dim
        hidden_dim = int(2 * hidden_dim / 3)
        hidden_dim = args.multiple_of * ((hidden_dim + args.multiple_of - 1) // args.multiple_of)

        self.w1 = nn.Linear(args.dim, hidden_dim, bias=False) # Gate
        self.w2 = nn.Linear(hidden_dim, args.dim, bias=False) # Down
        self.w3 = nn.Linear(args.dim, hidden_dim, bias=False) # Up

    def forward(self, x):
        # SwiGLU: w2( SiLU(w1(x)) * w3(x) )
        return self.w2(F.silu(self.w1(x)) * self.w3(x))

In [None]:
class TransformerBlock(nn.Module):
    def __init__(self, args: ModelArgs):
        super().__init__()
        self.attention_norm = RMSNorm(args.dim)
        self.attention = Attention(args)
        self.ffn_norm = RMSNorm(args.dim)
        self.feed_forward = FeedForward(args)

    def forward(self, x, freqs_cis):
        # Residual Connection (Pre-Norm)
        h = x + self.attention(self.attention_norm(x), freqs_cis)
        out = h + self.feed_forward(self.ffn_norm(h))
        return out

class Transformer(nn.Module):
    def __init__(self, args: ModelArgs):
        super().__init__()
        self.args = args
        self.tok_embeddings = nn.Embedding(args.vocab_size, args.dim)
        self.layers = nn.ModuleList([TransformerBlock(args) for _ in range(args.n_layers)])
        self.norm = RMSNorm(args.dim) # Final Norm
        self.output = nn.Linear(args.dim, args.vocab_size, bias=False)

        # RoPEテーブルの事前計算
        self.freqs_cis = precompute_freqs_cis(self.args.dim // self.args.n_heads, self.args.max_seq_len * 2)

    def forward(self, idx):
        # idx: (Batch, Seq)
        bsz, seqlen = idx.shape
        x = self.tok_embeddings(idx)

        # RoPEテーブルをデバイスへ
        freqs_cis = self.freqs_cis[:seqlen].to(x.device)

        for layer in self.layers:
            x = layer(x, freqs_cis)

        x = self.norm(x)
        logits = self.output(x)
        return logits