In [None]:
!curl -L "https://gist.githubusercontent.com/Tosainu/47ed11f068f942026494/raw/cute_AA.txt" -o cute_AA.txt


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100 35692  100 35692    0     0   184k      0 --:--:-- --:--:-- --:--:--  185k


In [None]:
# -*- coding: utf-8 -*-
# train_kaomoji_byt5.py
import os, random, math
from typing import Dict, List
import torch
from datasets import Dataset, DatasetDict
import datasets

from transformers import (
    AutoTokenizer, AutoModelForSeq2SeqLM,
    DataCollatorForSeq2Seq, Seq2SeqTrainer, Seq2SeqTrainingArguments,
)
from peft import LoraConfig, get_peft_model, TaskType, PeftModel

# ======== 設定（環境変数で上書きOK）========
BASE_MODEL = os.environ.get("BASE_MODEL", "sonoisa/byt5-small-japanese")
RAW_FILE   = os.environ.get("RAW_FILE", "cute_AA.txt")
OUT_DIR    = os.environ.get("OUT_DIR", "./kaomoji_byt5_lora")
SEED       = int(os.environ.get("SEED", 42))

MAX_SOURCE_LEN = int(os.environ.get("MAX_SOURCE_LEN", 32))
MAX_TARGET_LEN = int(os.environ.get("MAX_TARGET_LEN", 64))

# LoRA（T5系の定番ターゲット）
LORA_R = int(os.environ.get("LORA_R", 16))
LORA_ALPHA = int(os.environ.get("LORA_ALPHA", 32))
LORA_DROPOUT = float(os.environ.get("LORA_DROPOUT", 0.05))

EPOCHS = float(os.environ.get("EPOCHS", 5))
LR = float(os.environ.get("LR", 2e-4))
BATCH = int(os.environ.get("BATCH", 32))
GRAD_ACC = int(os.environ.get("GRAD_ACC", 1))
WARMUP_RATIO = float(os.environ.get("WARMUP_RATIO", 0.05))

import re
from typing import Dict, List

def load_pairs(path: str) -> List[Dict[str, str]]:
    pairs = []
    with open(path, "r", encoding="utf-8") as f:
        for raw in f:
            # 改行だけ消して、中身が空ならskip（前後の空白は保持したいのでstripはしない）
            s = raw.rstrip("\r\n")
            if not s or s.strip() == "":
                continue

            # 1) タブ優先で分解（空カラムはそのまま残す）
            cols_raw = s.split("\t")

            # バリエーション対策：
            # - 行末に余計なタブがあって空カラムが居る
            # - 「顔文字」列が最後じゃない（まず無い想定だが保険）
            # → 最後に出現する「顔文字」の直前までを有効データとみなす
            last_kemoji_idx = None
            for i in range(len(cols_raw) - 1, -1, -1):
                if cols_raw[i].strip() == "顔文字":
                    last_kemoji_idx = i
                    break

            if last_kemoji_idx is not None:
                cols = cols_raw[:last_kemoji_idx]  # 「顔文字」以降を切り落とし
            else:
                # タブが無い/「顔文字」が無い行はフォールバック（空白ベース）
                m = re.match(r"^\s*(.*?)\s+(.+?)\s*顔文字\s*$", s)
                if m:
                    left, right = m.group(1), m.group(2)
                    left, right = left.strip(), right.strip()
                    if left and right:
                        pairs.append({"input_text": left, "target_text": right})
                continue

            # 有効カラムが最低2個（入力 + 顔文字）必要
            if len(cols) < 2:
                continue

            # 入力は最初のカラム、顔文字は 2カラム目以降を結合（もし複数あればタブで復元）
            left = cols[0].strip()
            # 2列目以降をタブで繋ぎ直す（顔文字側にタブやスペースを含んでいても再現性を担保）
            right = "\t".join(cols[1:]).strip()

            # 念のためゼロ幅・制御文字を除去（見た目崩れ対策）
            # ただし通常の空白や記号は保持する
            right = re.sub(r"[\u200B-\u200D\uFEFF]", "", right)

            if left and right:
                pairs.append({"input_text": left, "target_text": right})

    # 重複除去（順序保持）
    uniq, seen = [], set()
    for p in pairs:
        key = (p["input_text"], p["target_text"])
        if key not in seen:
            uniq.append(p); seen.add(key)
    return uniq


def build_dataset(pairs: List[Dict[str,str]], seed: int = 42) -> DatasetDict:
    random.Random(seed).shuffle(pairs)
    n = len(pairs); n_train = int(n * 0.9)
    return DatasetDict({
        "train": Dataset.from_list(pairs[:n_train]),
        "validation": Dataset.from_list(pairs[n_train:]),
    })

TASK_PREFIX = "kaomoji: "

def preprocess(examples, tokenizer):
    inputs = [TASK_PREFIX + s for s in examples["input_text"]]
    model_inputs = tokenizer(inputs, max_length=MAX_SOURCE_LEN,
                             truncation=True, padding="max_length")
    with tokenizer.as_target_tokenizer():
        labels = tokenizer(examples["target_text"],
                           max_length=MAX_TARGET_LEN,
                           truncation=True, padding="max_length")
    labels_ids = [[(-100 if t == tokenizer.pad_token_id else t) for t in seq]
                  for seq in labels["input_ids"]]
    model_inputs["labels"] = labels_ids
    return model_inputs

def metric_exact_match(eval_pred, tokenizer):
    preds, labels = eval_pred

    # -100 → pad に戻す（これは従来どおり）
    labels = [[(t if t != -100 else tokenizer.pad_token_id) for t in seq] for seq in labels]

    # ★ ByT5は offset(通常3)未満のIDは special 想定 → decode前に pad に逃がす
    offset = getattr(tokenizer, "offset", 3)

    def _safe(batch_ids):
        safe = []
        for seq in batch_ids:
            # seq が tensor/ndarray の可能性もあるので list に
            seq = list(map(int, seq))
            safe.append([tid if tid >= offset else tokenizer.pad_token_id for tid in seq])
        return safe

    preds_safe  = _safe(preds)
    labels_safe = _safe(labels)

    pred_texts  = tokenizer.batch_decode(preds_safe,  skip_special_tokens=True)
    label_texts = tokenizer.batch_decode(labels_safe, skip_special_tokens=True)

    correct = sum(p.strip() == l.strip() for p, l in zip(pred_texts, label_texts))
    return {"exact_match": correct / max(1, len(label_texts))}



In [None]:

datasets.utils.logging.set_verbosity_error()
os.makedirs(OUT_DIR, exist_ok=True)
torch.manual_seed(SEED)

# 1) データ
pairs = load_pairs(RAW_FILE)
if not pairs:
    raise RuntimeError("データ0件。cute_AA.txtの場所/形式を確認してね。")
dsd = build_dataset(pairs, SEED)
dsd


DatasetDict({
    train: Dataset({
        features: ['input_text', 'target_text'],
        num_rows: 681
    })
    validation: Dataset({
        features: ['input_text', 'target_text'],
        num_rows: 76
    })
})

In [None]:

for i in range(100):
    print(dsd["train"][i])

{'input_text': 'おふとん', 'target_text': '(:3[___]'}
{'input_text': 'ずさぁ', 'target_text': "⊂('ω`⊂ 三"}
{'input_text': 'ぱんち', 'target_text': '((⊂(╹◡╹๑∩)ｼｭｯｼｭｯ'}
{'input_text': 'てへぺろ', 'target_text': 'てへぺろ！(*ゝωб)'}
{'input_text': 'ふぇえ', 'target_text': '(*>_<*)ﾉ'}
{'input_text': 'ねむい', 'target_text': '₍ᐢっ ̫-ᐢ₎'}
{'input_text': 'なみだ', 'target_text': '(☍﹏⁰)'}
{'input_text': 'おせんべい', 'target_text': 'ヾ(〄⌒ー⌒〄)ノ'}
{'input_text': 'きゃぴ', 'target_text': '(´pゝω･)'}
{'input_text': 'なみだ', 'target_text': '˚‧º·(˚ ˃̣̣̥᷄⌓˂̣̣̥᷅ )‧º·˚'}
{'input_text': 'にこ', 'target_text': 'ε-ε-ヾ( o´∀)ﾂ'}
{'input_text': 'ぶんぶん', 'target_text': 'ﾌﾞﾝ((>ω<｀*)(*´>ω<))ﾌﾞﾝ'}
{'input_text': 'ぱーん', 'target_text': '( ‘д‘⊂彡☆))Д´)'}
{'input_text': 'にこ', 'target_text': '(ﾟ▽ﾟ*)'}
{'input_text': 'よし', 'target_text': '(๑˃̵ᴗ˂̵)و ﾖｼ!'}
{'input_text': 'もうしわけねえ', 'target_text': "ヽ('ω')ﾉ三ヽ('ω')ﾉ"}
{'input_text': 'ようせい', 'target_text': '(・ワ・)'}
{'input_text': 'かーちゃん', 'target_text': "Ｊ( 'ｰ`)し"}
{'input_text': 'ぎゅっ', 'target_text': '°+♡:.(っ>ω<c).:♡+°

In [None]:
# 2) トークナイザー＆モデル（ByT5はSPM不要）
print(f"Loading base model: {BASE_MODEL}")
tokenizer = AutoTokenizer.from_pretrained(BASE_MODEL, use_fast=True)
base_model = AutoModelForSeq2SeqLM.from_pretrained(BASE_MODEL)



Loading base model: sonoisa/byt5-small-japanese


In [None]:
# 3) LoRA 注入（T5/ByT5の一般的なターゲット）
lora_targets = ["q", "k", "v", "o", "wi_0", "wi_1", "wo"]
lora = LoraConfig(
    r=LORA_R, lora_alpha=LORA_ALPHA, lora_dropout=LORA_DROPOUT,
    task_type=TaskType.SEQ_2_SEQ_LM, bias="none", target_modules=lora_targets,
)
model = get_peft_model(base_model, lora)
model.print_trainable_parameters()



trainable params: 6,258,688 || all params: 305,896,448 || trainable%: 2.0460


In [None]:
# 4) 前処理
tokenized = dsd.map(lambda ex: preprocess(ex, tokenizer), batched=True,
                    remove_columns=dsd["train"].column_names)



Map:   0%|          | 0/681 [00:00<?, ? examples/s]



Map:   0%|          | 0/76 [00:00<?, ? examples/s]

In [None]:
# 5) Collator & 学習設定
data_collator = DataCollatorForSeq2Seq(tokenizer=tokenizer, model=model)
bf16 = torch.cuda.is_available() and torch.cuda.is_bf16_supported()
fp16 = torch.cuda.is_available() and not bf16

steps_per_epoch = math.ceil(len(tokenized["train"]) / (BATCH * max(1, torch.cuda.device_count()) * max(1, GRAD_ACC)))
from transformers import GenerationConfig

gen_cfg = GenerationConfig(
    max_new_tokens=24,
    no_repeat_ngram_size=3,
    encoder_no_repeat_ngram_size=3,
    repetition_penalty=1.2,
    num_beams=1,
    eos_token_id=tokenizer.eos_token_id,
    pad_token_id=tokenizer.pad_token_id,
)
args = Seq2SeqTrainingArguments(
    output_dir=OUT_DIR,
    per_device_train_batch_size=BATCH,
    per_device_eval_batch_size=BATCH,
    gradient_accumulation_steps=GRAD_ACC,
    learning_rate=LR,
    num_train_epochs=EPOCHS,
    warmup_ratio=WARMUP_RATIO,
    logging_steps=max(1, steps_per_epoch // 5),

    # ↓ここを evaluation_strategy から eval_strategy に変える
    eval_strategy="epoch",
    generation_config=gen_cfg,   # ★評価時もこの条件で generate

    save_strategy="epoch",
    save_total_limit=2,
    predict_with_generate=True,
    bf16=bf16, fp16=fp16,
    dataloader_pin_memory=True,
    report_to="none",
    load_best_model_at_end=True,
    metric_for_best_model="exact_match",
    greater_is_better=True,
)


trainer = Seq2SeqTrainer(
    model=model, args=args,
    train_dataset=tokenized["train"],
    eval_dataset=tokenized["validation"],
    data_collator=data_collator,
    tokenizer=tokenizer,
    compute_metrics=lambda x: metric_exact_match(x, tokenizer),
)



  trainer = Seq2SeqTrainer(


In [None]:
print("tokenizer type:", type(tokenizer).__name__)
print("len(tokenizer):", len(tokenizer))
print("model vocab_size:", model.config.vocab_size)
print("eos_token_id:", tokenizer.eos_token_id, "pad_token_id:", tokenizer.pad_token_id)


tokenizer type: ByT5Tokenizer
len(tokenizer): 384
model vocab_size: 384
eos_token_id: 1 pad_token_id: 0


In [None]:
# 6) 学習
trainer.train()



Epoch,Training Loss,Validation Loss,Exact Match
1,3.2824,2.949673,0.0
2,2.5806,2.212036,0.0
3,2.143,1.988311,0.0
4,2.0873,1.909775,0.0
5,1.965,1.888562,0.0


TrainOutput(global_step=110, training_loss=2.544600491090254, metrics={'train_runtime': 112.9098, 'train_samples_per_second': 30.157, 'train_steps_per_second': 0.974, 'total_flos': 199613074268160.0, 'train_loss': 2.544600491090254, 'epoch': 5.0})

In [None]:
# 7) ベスト保存（LoRAアダプタ＆tokenizer）
trainer.model.save_pretrained(os.path.join(OUT_DIR, "adapter"))
tokenizer.save_pretrained(OUT_DIR)

# 8) LoRAをベースにマージ（単一モデルとして配布・推論用）
print("Merging LoRA into base weights...")
merged = trainer.model.merge_and_unload() if isinstance(trainer.model, PeftModel) else trainer.model
merged.save_pretrained(os.path.join(OUT_DIR, "merged"))
print("Done. Saved under:", OUT_DIR)

Merging LoRA into base weights...
Done. Saved under: ./kaomoji_byt5_lora


In [None]:
from transformers import LogitsProcessor
import torch

class BanHiraganaAllowKatakana(LogitsProcessor):
    """
    ByT5用：ひらがな (U+3040–309F) を禁止、カタカナ (U+30A0–30FF) は許可。
    UTF-8パターン:
      ひらがな:  E3 81 80..BF,  E3 82 80..9F
      カタカナ:  E3 82 A0..BF,  E3 83 80..BF
    """
    def __init__(self, tokenizer, vocab_size=None):
        self.offset = getattr(tokenizer, "offset", 3)
        self.vocab_size = vocab_size if vocab_size is not None else getattr(tokenizer, "vocab_size", 384)

        # 便利: byte値→token_id の写像（specialは除外、0..255のみ有効）
        self.byte_to_tid = {}
        for b in range(256):
            tid = b + self.offset
            if 0 <= tid < self.vocab_size:
                self.byte_to_tid[b] = tid

    def _last_valid_bytes(self, ids_row, k=2):
        """末尾から special を除外しつつ、直近の実バイト(0..255)を最大k個返す（順番は古→新）。"""
        out = []
        for t in reversed(ids_row.tolist()):
            b = int(t) - self.offset
            if 0 <= b <= 255:
                out.append(b)
                if len(out) >= k:
                    break
        return list(reversed(out))

    def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor) -> torch.FloatTensor:
        batch_size = input_ids.size(0)
        for i in range(batch_size):
            ctx = self._last_valid_bytes(input_ids[i], k=2)

            banned_bytes = set()

            if not ctx:
                # 先頭バイト段階：E3自体は許可（この時点で弾くとカタカナも死ぬ）
                pass

            elif len(ctx) == 1:
                b1 = ctx[0]
                if b1 == 0xE3:
                    # 2バイト目に 0x81 が来たら "E3 81 ..." → ひらがな確定なので禁止
                    # 0x82 は次で分岐（0x80..9F=ひらがな, 0xA0..=カタカナ）なので許可
                    if 0x81 in self.byte_to_tid:
                        banned_bytes.add(0x81)

            else:  # len(ctx) >= 2
                b1, b2 = ctx[-2], ctx[-1]
                if b1 == 0xE3 and b2 == 0x81:
                    # E3 81 xx は全部ひらがな → 3バイト目 全禁止
                    banned_bytes.update(range(0x80, 0xC0))  # 0x80..0xBF
                elif b1 == 0xE3 and b2 == 0x82:
                    # E3 82 80..9F はひらがな、A0..BF はカタカナ → 下限だけ禁止
                    banned_bytes.update(range(0x80, 0xA0))  # 0x80..0x9F

            if banned_bytes:
                banned_tids = [self.byte_to_tid[b] for b in banned_bytes if b in self.byte_to_tid]
                if banned_tids:
                    scores[i, banned_tids] = float("-inf")

        return scores


In [None]:
# -*- coding: utf-8 -*-
# infer_kaomoji.py
import os, torch
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM

MODEL_DIR = os.environ.get("MODEL_DIR", "./kaomoji_byt5_lora/merged")
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"



tokenizer = AutoTokenizer.from_pretrained(MODEL_DIR, use_fast=True)
model = AutoModelForSeq2SeqLM.from_pretrained(MODEL_DIR).to(DEVICE)
model.eval()
logits_processors = [BanHiraganaKanjiBytes(tokenizer)]
def predict(s: str) -> str:
    # 入力に接頭辞を付与
    x = tokenizer(TASK_PREFIX + s, return_tensors="pt").to(DEVICE)
    with torch.no_grad():
        y = model.generate(
            **x,
            # 反復よけ設定（短文×顔文字向け）
            max_new_tokens=24,
            no_repeat_ngram_size=3,
            encoder_no_repeat_ngram_size=3,
            repetition_penalty=1.2,
            num_beams=1,  # ビーム大は反復誘発しがち。ここは1でOK
            eos_token_id=tokenizer.eos_token_id,
            early_stopping=True,
            logits_processor=logits_processors,  # ★ここ
        )
    return tokenizer.decode(
        y[0], skip_special_tokens=True, clean_up_tokenization_spaces=False
    ).strip()

if __name__ == "__main__":
    for text in ["あせ", "おこ", "うえーん", "いえーい", "がおー"]:
        print(text, "->", predict(text))


あせ -> … ☆ ♪
おこ -> omeji... … ₎ ■▽
うえーん -> ☆ ♪ ○ ■▽△
いえーい -> _ ○ ■ ☆ ♡︎♪
がおー -> omaji.: … ☆
