# 第10章: 事前学習済み言語モデル（GPT型）

本章では、GPT型（Transformerのデコーダ型）の事前学習済みモデルを利用して、言語生成、評判分析器（ポジネガ分類器）の構築、ファインチューニング、強化学習などに取り組む。

In [None]:
import os
from dotenv import load_dotenv
import torch

dotenv_path = './.env'
load_dotenv(dotenv_path)
HF_TOKEN = os.getenv('HF_TOKEN')

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device = torch.device("cuda:1")

In [None]:
!huggingface-cli login --token $HF_TOKEN

## 90. 次単語予測

“The movie was full of"に続くトークン（トークン列ではなく一つのトークンであることに注意せよ）として適切なもの上位10個と、その確率（尤度）を求めよ。ただし、言語モデルへのプロンプトがどのようなトークン列に変換されたか、確認せよ。

In [None]:
from transformers import AutoTokenizer, AutoModelForCausalLM

tokenizer = AutoTokenizer.from_pretrained('meta-llama/Llama-3.2-3B-Instruct')
model = AutoModelForCausalLM.from_pretrained('meta-llama/Llama-3.2-3B-Instruct')

In [None]:
sentence = 'The movie was full of'

encoded = tokenizer(sentence, return_tensors='pt')

tokenized = tokenizer.tokenize(sentence)
print(tokenized)

input_ids = tokenizer(sentence).input_ids
print(input_ids)

decoded = tokenizer.decode(input_ids)
print(decoded)

In [None]:
# 実験：各系列における尤度が最大のものを選んだ結果
model.to(device)
encoded.to(device)

In [None]:
model.to(device)
encoded.to(device)

print(encoded)

outputs = model(**encoded)
logits = outputs.logits # {batch_size, seq_len, vocab_size}
# logits[:, k, :]はinput_ids[k-1]までを使って計算されたスコア
next_token_logits = logits[:, -1, :]
next_token_id = next_token_logits.argmax(-1).item()
print(f'Next token id: {next_token_id}')
next_token = tokenizer.decode([next_token_id])
print(f'Next token: {next_token}')


In [None]:
top_k = 10
next_token_probs = torch.softmax(next_token_logits, dim=-1)
top_k_probs, top_k_indices = torch.topk(next_token_probs, top_k) # (batch_size, top_k)

print(f'Top {top_k} next tokens')
for i in range(top_k):
    token_id = top_k_indices[0, i].item()
    token = tokenizer.decode([token_id])
    prob = top_k_probs[0, i].item()
    print(f'{i+1}. Token: {token}, Probability: {prob:.4f}')

## 91. 続きのテキストの予測

“The movie was full of"に続くテキストを複数予測せよ。このとき、デコーディングの方法や温度パラメータ（temperature）を変えながら、予測される複数のテキストの変化を観察せよ。

In [None]:
prompt = 'The movie was full of'
encoded = tokenizer(prompt, return_tensors='pt')
input_ids = encoded.input_ids.to(device)
attention_mask = encoded.attention_mask.to(device)

In [None]:
# Greedy Searchに近い
print('--- Default (Greedy-like) ---')
outputs_default = model.generate(input_ids, attention_mask=attention_mask, max_length=50, pad_token_id=tokenizer.eos_token_id)
print(tokenizer.decode(outputs_default[0], skip_special_tokens=True))
print('-' * 30)

In [None]:
# サンプリング (do_sample=True) と Temperature
# Temperature < 1.0 : より決定的
# Temperature > 1.0 : よりランダム
print('--- Sampling with Temperature (0.7) ---')
outputs_temp_low = model.generate(
    input_ids,
    attention_mask=attention_mask,
    max_length=50,
    do_sample=True,
    temperature=0.7,
    top_k=50,
    pad_token_id=tokenizer.eos_token_id
)
print(tokenizer.decode(outputs_temp_low[0], skip_special_tokens=True))
print('-' * 30)

print('--- Sampling with Temperature (1.5) ---')
outputs_temp_high = model.generate(
    input_ids,
    attention_mask=attention_mask,
    max_length=50,
    do_sample=True,
    temperature=1.5,
    top_k=50,
    pad_token_id=tokenizer.eos_token_id
)
print(tokenizer.decode(outputs_temp_high[0], skip_special_tokens=True))
print('-' * 30)

In [None]:
# Top-k サンプリング
# 次のトークンを予測する際に，確率上位k個の中からサンプリングする
print('--- Top-k Sampling (k=30) ---')
outputs_top_k = model.generate(
    input_ids,
    attention_mask=attention_mask,
    max_length=50,
    do_sample=True,
    top_k=30,
    pad_token_id=tokenizer.eos_token_id
)
print(tokenizer.decode(outputs_top_k[0], skip_special_tokens=True))
print("-" * 30)

In [None]:
# Top-p (Nucleus) サンプリング
# 確率の累積がpを超える最小のトークンセットからサンプリングする
print('--- Top-p (Nucleus) Sampling (p=0.9) ---')
outputs_top_p = model.generate(
    input_ids,
    max_length=50,
    do_sample=True,
    top_p=0.9,
    top_k=0, # top_kとtop_pは通常どちらか一方を指定するか、top_k=0でtop_pを有効にする
    pad_token_id=tokenizer.eos_token_id
)
print(tokenizer.decode(outputs_top_p[0], skip_special_tokens=True))
print("-" * 30)

In [None]:
# 5. ビームサーチ
# 複数の候補 (ビーム) を保持しながら探索する
print("--- Beam Search (num_beams=5) ---")
outputs_beam = model.generate(
    input_ids,
    max_length=50,
    num_beams=5,
    early_stopping=True, # EOSトークンが出たら早めに打ち切る
    pad_token_id=tokenizer.eos_token_id
)
print(tokenizer.decode(outputs_beam[0], skip_special_tokens=True))
print("-" * 30)

In [None]:
# 複数の異なる出力を得るために num_return_sequences を使用
print("--- Beam Search with num_return_sequences=3 ---")
outputs_beam_multiple = model.generate(
    input_ids,
    max_length=50,
    num_beams=5,
    num_return_sequences=3,
    early_stopping=True,
    pad_token_id=tokenizer.eos_token_id
)
for i, output in enumerate(outputs_beam_multiple):
    print(f"Output {i+1}: {tokenizer.decode(output, skip_special_tokens=True)}")
print("-" * 30)

## 92. 予測されたテキストの確率を計算

“The movie was full of"に続くテキストを予測し、生成された各単語の尤度を表示せよ（生成されるテキストが長いと出力が読みにくくなるので、適当な長さで生成を打ち切るとよい）。

In [None]:
prompt = 'The movie was full of'
encoded = tokenizer(prompt, return_tensors='pt')
input_ids = encoded.input_ids.to(device)
attention_mask = encoded.attention_mask.to(device)

In [None]:
outputs = model.generate(
    input_ids, 
    attention_mask=attention_mask, 
    max_length=20, 
    pad_token_id=tokenizer.eos_token_id,
    output_scores=True,
    return_dict_in_generate=True
)

sequences = list(outputs.sequences.squeeze(0))
scores = outputs.scores

prompt_len = input_ids.shape[1]

print(f"Input prompt: {tokenizer.decode(sequences[:prompt_len], skip_special_tokens=True)}")
print("Generated tokens and their probabilities:")

for k in range(len(scores)):
    current_token_idx_in_sequence = prompt_len + k

    generated_token_id = sequences[current_token_idx_in_sequence]

    generated_token_str = tokenizer.decode([generated_token_id])

    step_logits = scores[k]

    step_probs = torch.softmax(step_logits, dim=-1).squeeze()

    prob_of_generated_token = step_probs[generated_token_id].item()

    print(f"Token: '{generated_token_str}', Probability: {prob_of_generated_token:.4f}")


## 93. パープレキシティ

適当な文を準備して、事前学習済み言語モデルでパープレキシティを測定せよ。例えば、

+ The movie was full of surprises
+ The movies were full of surprises
+ The movie were full of surprises
+ The movies was full of surprises

の4文に対して、パープレキシティを測定して観察せよ（最後の2つの文は故意に文法的な間違いを入れた）。

In [None]:
sentences = [
    'The movie was full of surprises',
    'The movies were full of surprises',
    'The movie were full of surprises',
    'The movies was full of surprises'
]

encoded = tokenizer(sentences, return_tensors='pt')

In [None]:
model.to(device)
encoded.to(device)

In [None]:
input_ids = encoded.input_ids
outputs = model(**encoded)
logits = outputs.logits

for i in range(logits.shape[0]):
    current_prediction_logits = logits[i, :-1, :]
    current_target_ids = input_ids[i, 1:]

    criterion = torch.nn.CrossEntropyLoss(reduction='mean')

    mean_neg_log_likelihood = criterion(current_prediction_logits, current_target_ids)

    ppl = torch.exp(mean_neg_log_likelihood)
    print(f'Sentence: {sentences[i]}, Perplexity: {ppl.item()}')

## 94. チャットテンプレート

"What do you call a sweet eaten after dinner?"という問いかけに対する応答を生成するため、チャットテンプレートを適用し、言語モデルに与えるべきプロンプトを作成せよ。また、そのプロンプトに対する応答を生成し、表示せよ。

In [None]:
instruction = 'Answer the following question.'
text = 'What do you call a sweet eaten after dinner?'

messages = [
    {"role": "system", "content": instruction},
    {"role": "user", "content": text}
]

prompt = tokenizer.apply_chat_template(
    messages,
    tokenize=False,
    add_generation_prompt=True
)

print('Generated prompt:')
print(prompt)
print('=' * 100)

In [None]:
inputs = tokenizer(prompt, return_tensors="pt").to(device)
model.to(device)
with torch.no_grad():
    outputs = model.generate(
        **inputs,
        max_new_tokens=100,
        temperature=0.7,
        do_sample=True,
        pad_token_id=tokenizer.eos_token_id
    )

response = tokenizer.decode(outputs[0][inputs.input_ids.shape[1]:], skip_special_tokens=True)
print('Model response:')
print(response)

## 95. マルチターンのチャット

問題94で生成された応答に対して、追加で"Please give me the plural form of the word with its spelling in reverse order."と問いかけたときの応答を生成・表示せよ。また、その時に言語モデルに与えるプロンプトを確認せよ。

In [None]:
previous_response = response

messages = [
    {"role": "system", "content": instruction},
    {"role": "user", "content": "What do you call a sweet eaten after dinner?"},
    {"role": "assistant", "content": previous_response},
    {"role": "user", "content": "Please give me the plural form of the word with its spelling in reverse order."}
]

multi_turn_prompt = tokenizer.apply_chat_template(
    messages,
    tokenize=False,
    add_generation_prompt=True
)

print('Multi-turn chat prompt')
print(multi_turn_prompt)
print('=' * 100)

In [None]:
inputs = tokenizer(multi_turn_prompt, return_tensors='pt').to(device)
with torch.no_grad():
    outputs = model.generate(
        **inputs,
        max_new_tokens=100,
        temperature=0.7,
        do_sample=True,
        pad_token_id=tokenizer.eos_token_id
    )

multi_turn_response = tokenizer.decode(outputs[0][inputs.input_ids.shape[1]:], skip_special_tokens=True)
print('Multi-turn response:')
print(multi_turn_response)


## 96. プロンプトによる感情分析

事前学習済み言語モデルで感情分析を行いたい。テキストを含むプロンプトを事前学習済み言語モデルに与え、（ファインチューニングは行わずに）テキストのポジネガを予測するという戦略で、[SST-2](https://dl.fbaipublicfiles.com/glue/data/SST-2.zip)の開発データにおける正解率を測定せよ。

In [None]:
import pandas as pd

train_path = './data/SST-2/train.tsv'
dev_path = './data/SST-2/dev.tsv'

train_df = pd.read_csv(train_path, sep='\t')
dev_df = pd.read_csv(dev_path, sep='\t')
train_df

In [None]:
from tqdm import tqdm

dev_sentences = dev_df.sentence
dev_labels = dev_df.label

predictions = []

instruction = 'Determine if the sentiment of this sentence is positive or negative. Answer with only "positive" or "negative".'

for i in tqdm(range(len(dev_sentences)), desc='Processing sentences'):
    sentence = dev_sentences.iloc[i]

    messages = [
        {"role": "system", "content": instruction},
        {"role": "user", "content": sentence}
    ]

    prompt = tokenizer.apply_chat_template(
        messages,
        tokenize=False,
        add_generation_prompt=True
    )

    inputs = tokenizer(prompt, return_tensors='pt').to(device)
    with torch.no_grad():
        outputs = model.generate(
            **inputs,
            max_new_tokens=15,
            temperature=0.1,
            do_sample=True,
            pad_token_id=tokenizer.eos_token_id
        )

    response = tokenizer.decode(outputs[0][inputs.input_ids.shape[1]:], skip_special_tokens=True)
    predictions.append(response.strip())

correct = 0
unable_to_predict = 0
for i, pred in enumerate(predictions):
    true_label = dev_labels.iloc[i]
    pred_lower = pred.lower()

    if true_label == 1 and 'positive' in pred_lower:
        correct += 1
    elif true_label == 0 and 'negative' in pred_lower:
        correct += 1
    elif 'positive' not in pred_lower and 'negative' not in pred_lower:
        unable_to_predict += 1

total = len(predictions)
accuracy = correct / total
unable_rate = unable_to_predict / total

print(f'Total samples: {total}')
print(f'Correct predictions: {correct}')
print(f'Unable to predict: {unable_to_predict}')
print(f'Wrong predictions: {total - correct - unable_to_predict}')
print(f'Accuracy: {accuracy:.4f} ({correct}/{total})')
print(f'Unable to predict rate: {unable_rate:.4f} ({unable_to_predict}/{total})')

In [None]:
if unable_to_predict > 0:
    print(f'\nExamples of unable to predict:')
    count = 0
    for i, pred in enumerate(predictions):
        pred_lower = pred.lower()
        if 'positive' not in pred_lower and 'negative' not in pred_lower:
            print(f'  Sentence: "{dev_sentences.iloc[i]}"')
            print(f'  Prediction: "{pred}"')
            print(f'  True label: {dev_labels.iloc[i]}')
            print('-' * 50)
            count += 1
            if count >= 5:  # 最初の5件だけ表示
                break

## 97. 埋め込みに基づく感情分析

事前学習済み言語モデルでテキストをベクトルで表現（エンコード）し、そのベクトルにフィードフォワード層を通すことで極性ラベルを予測するモデルを学習せよ。

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import accuracy_score
import numpy as np
from tqdm import tqdm
from transformers import AutoTokenizer, LlamaModel

In [None]:
import pandas as pd

train_path = './data/SST-2/train.tsv'
dev_path = './data/SST-2/dev.tsv'

train_df = pd.read_csv(train_path, sep='\t')
dev_df = pd.read_csv(dev_path, sep='\t')
train_df

In [None]:
class LlamaBinaryClassifier(nn.Module):
    def __init__(self, llama_model, hidden_size):
        super().__init__()
        self.llama_model = llama_model

        self.classifier = nn.Linear(hidden_size, 1)

        self.dropout = nn.Dropout(0.1)

        for param in self.llama_model.parameters():
            param.requires_grad = False
    
    def forward(self, input_ids, attention_mask=None):
        outputs = self.llama_model(
            input_ids=input_ids,
            attention_mask=attention_mask,
            output_hidden_states=True,
            use_cache=False
        )

        hidden_states = outputs.hidden_states[-1]

        if attention_mask is not None:
            masked_hidden = hidden_states.clone()
            masked_hidden[attention_mask == 0] = float('-inf')
            pooled = torch.max(masked_hidden, dim=1)[0]
        else:
            pooled = torch.max(hidden_states, dim=1)[0]

        pooled = self.dropout(pooled)
        logits = self.classifier(pooled)
        return logits

In [None]:
class SST2Dataset(Dataset):
    def __init__(self, sentences, labels, tokenizer, max_length=256):
        self.sentences = sentences
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.sentences)

    def __getitem__(self, idx):
        sentence = str(self.sentences.iloc[idx])
        label = self.labels.iloc[idx]

        encoding = self.tokenizer(
            sentence,
            truncation=True,
            padding='max_length',
            max_length=self.max_length,
            return_tensors='pt'
        )

        return {
            'input_ids': encoding.input_ids.flatten(),
            'attention_mask': encoding.attention_mask.flatten(),
            'label': torch.tensor(label, dtype=torch.float)
        }

In [None]:
def check_gpu_memory():
    if torch.cuda.is_available():
        print(f"GPU: {torch.cuda.get_device_name()}")
        print(f"Total VRAM: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")
        print(f"Available VRAM: {torch.cuda.memory_allocated() / 1e9:.1f} GB")
        print(f"Cached VRAM: {torch.cuda.memory_reserved() / 1e9:.1f} GB")

In [None]:
check_gpu_memory()

In [None]:
print("Loading tokenizer and base LlamaModel...")
tokenizer = AutoTokenizer.from_pretrained('meta-llama/Llama-3.2-3B-Instruct')
model = LlamaModel.from_pretrained(
    'meta-llama/Llama-3.2-3B-Instruct',
    device_map={"": 1},
    low_cpu_mem_usage=True
)

In [None]:
print("Model structure:")
for name, module in model.named_modules():
    if 'head' in name.lower() or 'output' in name.lower() or 'proj' in name.lower():
        print(f"{name}: {type(module)}")

print("\nModel attributes:")
print([attr for attr in dir(model) if 'head' in attr.lower()])

print(f"\nModel config: {model.config}")

In [None]:
print("Setting up padding token...")
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token
    print(f"Set pad_token to: {tokenizer.pad_token}")

if hasattr(model, 'config'):
    model.config.pad_token_id = tokenizer.pad_token_id
    print(f"Set model pad_token_id to: {model.config.pad_token_id}")

print(f"tokenizer.pad_token: {tokenizer.pad_token}")
print(f"tokenizer.pad_token_id: {tokenizer.pad_token_id}")
print(f"tokenizer.eos_token: {tokenizer.eos_token}")
print(f"tokenizer.eos_token_id: {tokenizer.eos_token_id}")

In [None]:
hidden_size = model.config.hidden_size
print(f'Hidden size: {hidden_size}')

In [None]:
classifier_model = LlamaBinaryClassifier(model, hidden_size)
classifier_model.to(device)
check_gpu_memory()

In [None]:
max_length = 256
batch_size = 2
accumulation_steps = 4

train_dataset = SST2Dataset(train_df.sentence, train_df.label, tokenizer, max_length)
dev_dataset = SST2Dataset(dev_df.sentence, dev_df.label, tokenizer, max_length)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
dev_loader = DataLoader(dev_dataset, batch_size=batch_size, shuffle=False)

In [None]:
criterion = nn.BCEWithLogitsLoss()

In [None]:
from torch.optim.lr_scheduler import LinearLR

optimizer = torch.optim.AdamW(classifier_model.parameters(), lr=1e-5, weight_decay=0.01)
total_steps = len(train_loader) * 3 // accumulation_steps
scheduler = LinearLR(optimizer, start_factor=1.0, end_factor=0.1, total_iters=total_steps)

In [None]:
num_epochs = 1

classifier_model.train()

In [None]:
print("Starting training...")
for epoch in range(num_epochs):
    total_loss = 0
    optimizer.zero_grad()
    
    for batch_idx, batch in enumerate(tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs}')):
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['label'].to(device)
        
        logits = classifier_model(input_ids, attention_mask)
        loss = criterion(logits.squeeze(), labels)
        
        loss.backward()
        
        optimizer.step()
        scheduler.step()
        optimizer.zero_grad()
        torch.cuda.empty_cache()
        
        total_loss += loss.item()

    avg_loss = total_loss / len(train_loader)
    print(f'Epoch {epoch+1}, Average Loss: {avg_loss:.4f}')

print("Training completed. Starting evaluation...")

# 評価
classifier_model.eval()
all_predictions = []
all_labels = []

with torch.no_grad():
    for batch in tqdm(dev_loader, desc='Evaluating'):
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['label'].to(device)
        
        logits = classifier_model(input_ids, attention_mask)
        predictions = torch.sigmoid(logits.squeeze()) > 0.5
        
        all_predictions.extend(predictions.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

# 精度の計算
accuracy = accuracy_score(all_labels, all_predictions)
print(f'Development Set Accuracy: {accuracy:.4f}')

# メモリ使用量の最終チェック
check_gpu_memory()

# いくつかの予測例を表示
print("\nPrediction examples:")
for i in range(5):
    sentence = dev_df['sentence'].iloc[i]
    true_label = dev_df['label'].iloc[i]
    pred_label = int(all_predictions[i])
    
    print(f"Sentence: {sentence}")
    print(f"True label: {true_label} ({'Positive' if true_label == 1 else 'Negative'})")
    print(f"Predicted: {pred_label} ({'Positive' if pred_label == 1 else 'Negative'})")
    print(f"Correct: {true_label == pred_label}")
    print("-" * 50)

## 98. ファインチューニング

問題96のプロンプトに対して、正解の感情ラベルをテキストの応答として返すように事前学習済みモデルをファインチューニングせよ。

## 99. 選好チューニング

問題96のプロンプトに対して、正解の感情ラベルを含むテキストを望ましい応答、間違った感情ラベルを含むテキストを望ましくない応答として、事前学習済み言語モデルを選好チューニング (preference tuning) を実施せよ。選好チューニングのアルゴリズムとしては、近傍方策最適化 (PPO: Proximal Policy Optimization) や直接選好最適化 (DPO: Direct Preference Optimization) などが考えられる。
