<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"></ul></div>

In [1]:
import torch, torch.nn.functional as F
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
import pandas as pd
from IPython.display import display
from pathlib import Path

"""
Скрипт сравнения распределений P(next_token) с поддержкой:
----------------------------------------------------------------
1. **Внутренних промптов** (список в коде) **или**
2. **Промптов из текстового файла** в той же директории.

Флаг `USE_FILE_PROMPTS` выбирает источник.
Если `True`, файл `PROMPT_FILE` читается целиком и используется как **один** длинный prompt.

Модификация «summary + слова» печатает краткое summary исходного prompt‑а.
Добавлена **безопасная токенизация**: если длиннее лимита модели (1024 для GPT‑2),
текст автоматически усечётся, а в консоль выведется предупреждение.
"""

# ---------- ПАРАМЕТРЫ ----------
MODEL_NAME   = "gpt2"
STEPS        = 3
DEVICE       = "cuda" if torch.cuda.is_available() else "cpu"

# Флаг, управляющий включением суммаризации как отдельной модификации.
SUMMARY_ENABLED = True

# Суммаризация
SUMM_MODEL   = "sshleifer/distilbart-cnn-12-6"
CUSTOM_WORDS = "Сводка:"

# Выбор источника промптов
USE_FILE_PROMPTS = True
PROMPT_FILE      = "prompt.txt"

# ---------- 1. Модели ----------
_tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
_model     = AutoModelForCausalLM.from_pretrained(MODEL_NAME).to(DEVICE).eval()

_summarizer = pipeline(
    "summarization",
    model=SUMM_MODEL,
    tokenizer=SUMM_MODEL,
    device=0 if DEVICE=="cuda" else -1,
)

CTX_LIMIT = _model.config.n_positions

# ---------- 2. Вспомогательные функции ----------
def _next_token_probs(ids):
    with torch.no_grad():
        logits = _model(ids).logits[0, -1]
    return torch.softmax(logits, dim=-1).cpu()

def _kl(p, q, eps=1e-8):
    p = p + eps
    q = q + eps
    return torch.sum(p * torch.log(p / q)).item()

def _cos(p, q):
    return F.cosine_similarity(p, q, dim=0).item()

def _multi_step(prompt: str, n: int):
    token_ids = _tokenizer.encode(prompt)
    if len(token_ids) > CTX_LIMIT:
        print(f"⚠️ Промпт содержит {len(token_ids)} токенов и будет усечён до {CTX_LIMIT}.")
    ids = _tokenizer(
        prompt,
        return_tensors="pt",
        truncation=True,
        max_length=CTX_LIMIT
    )["input_ids"].to(DEVICE)

    dists = []
    for _ in range(n):
        probs = _next_token_probs(ids)
        dists.append(probs)
        next_id = probs.argmax().unsqueeze(0).unsqueeze(0).to(DEVICE)
        ids = torch.cat([ids, next_id], dim=1)
    return dists

# ---------- 3. Источник промптов и вывод top‑5 токенов ----------
PROMPTS = []
if USE_FILE_PROMPTS:
    prompt = Path(PROMPT_FILE).read_text(encoding="utf-8").strip()
    PROMPTS = [prompt]
else:
    PROMPTS = [
        "What are the health benefits of green tea?",
        "Explain the process of photosynthesis.",
        "How can I improve my time management skills?"
    ]

MODS = [
    ("original", lambda p: p),
    ("typo first e", lambda p: p.replace("e", "3", 1)),
    ("add salutation", lambda p: "Dear user, " + p),
    ("префикс ======", lambda p: "="*10 + p),
    ("префикс вопрос", lambda p: "I have a question. " + p),
    ("суффикс 10 лет?", lambda p: p + " in the next decade?"),
    ("summary", lambda p: CUSTOM_WORDS + _summarizer(p, max_length=60, min_length=10, do_sample=False)[0]['summary_text'])
]



  from pandas.core.computation.check import NUMEXPR_INSTALLED
  from pandas.core import (


In [2]:
# ---------- 4. Вывод top‑5 токенов ----------
topk_table = {}

for mod_name, mod_fn in MODS:
    mod_prompt = mod_fn(PROMPTS[0])  # только первый промпт, для topk вывода
    dists = _multi_step(mod_prompt, STEPS)

    rows = []
    for step_idx, probs in enumerate(dists):
        topk = torch.topk(probs, 5)
        token_ids = topk.indices.tolist()
        probs_vals = topk.values.tolist()

        for rank, (tok_id, prob) in enumerate(zip(token_ids, probs_vals), 1):
            decoded = _tokenizer.decode([tok_id])
            print(f"[{mod_name}] Шаг {step_idx+1}, #{rank}: {decoded!r} (P={prob:.4f})")
            rows.append(decoded)

    topk_table[mod_name] = rows




[original] Шаг 1, #1: '\n' (P=0.3962)
[original] Шаг 1, #2: ' The' (P=0.1303)
[original] Шаг 1, #3: ' We' (P=0.0600)
[original] Шаг 1, #4: ' If' (P=0.0539)
[original] Шаг 1, #5: ' This' (P=0.0333)
[original] Шаг 2, #1: 'The' (P=0.2432)
[original] Шаг 2, #2: 'We' (P=0.1141)
[original] Шаг 2, #3: 'If' (P=0.0396)
[original] Шаг 2, #4: 'This' (P=0.0330)
[original] Шаг 2, #5: 'In' (P=0.0286)
[original] Шаг 3, #1: ' __' (P=0.1237)
[original] Шаг 3, #2: ' first' (P=0.0422)
[original] Шаг 3, #3: ' object' (P=0.0351)
[original] Шаг 3, #4: ' result' (P=0.0299)
[original] Шаг 3, #5: ' product' (P=0.0298)
[typo first e] Шаг 1, #1: '\n' (P=0.4088)
[typo first e] Шаг 1, #2: ' The' (P=0.1271)
[typo first e] Шаг 1, #3: ' We' (P=0.0607)
[typo first e] Шаг 1, #4: ' If' (P=0.0501)
[typo first e] Шаг 1, #5: ' This' (P=0.0330)
[typo first e] Шаг 2, #1: 'The' (P=0.2425)
[typo first e] Шаг 2, #2: 'We' (P=0.1190)
[typo first e] Шаг 2, #3: 'If' (P=0.0373)
[typo first e] Шаг 2, #4: 'This' (P=0.0316)
[typo first

In [3]:
df_topk = pd.DataFrame(topk_table)
print("\n=== Таблица top-5 токенов на каждом шаге ===")
display(df_topk)



=== Таблица top-5 токенов на каждом шаге ===


Unnamed: 0,original,typo first e,add salutation,префикс ======,префикс вопрос,суффикс 10 лет?,summary
0,\n,\n,\n,\n,\n,\n,\n
1,The,The,The,The,The,We,We
2,We,We,We,We,We,s,The
3,If,If,If,If,If,�,This
4,This,This,This,This,This,we,Python
5,The,The,The,The,The,The,\n
6,We,We,We,We,We,We,The
7,If,If,Let,Let,If,This,We
8,This,This,If,If,Let,In,In
9,In,In,This,Here,This,If,This


In [4]:
# ---------- 5. Метрики KL и Cos для всех промптов ----------
kl_tables = [pd.DataFrame(index=[f"prompt {i+1}" for i in range(len(PROMPTS))], columns=[name for name, _ in MODS if name != "original"]) for _ in range(STEPS)]
cos_tables = [pd.DataFrame(index=[f"prompt {i+1}" for i in range(len(PROMPTS))], columns=[name for name, _ in MODS if name != "original"]) for _ in range(STEPS)]

for i, prompt in enumerate(PROMPTS):
    base_dists = _multi_step(prompt, STEPS)
    for mod_name, mod_fn in MODS:
        if mod_name == "original":
            continue
        mod_prompt = mod_fn(prompt)
        mod_dists = _multi_step(mod_prompt, STEPS)
        for step in range(STEPS):
            kl = _kl(base_dists[step], mod_dists[step])
            cos = _cos(base_dists[step], mod_dists[step])
            kl_tables[step].loc[f"prompt {i+1}", mod_name] = kl
            cos_tables[step].loc[f"prompt {i+1}", mod_name] = cos

print("\n=== Cosine similarity ===")
for step, df in enumerate(cos_tables):
    print(f"\nStep {step+1}")
    display(df)

print("\n=== KL-divergence ===")
for step, df in enumerate(kl_tables):
    print(f"\nStep {step+1}")
    display(df)



=== Cosine similarity ===

Step 1


Unnamed: 0,typo first e,add salutation,префикс ======,префикс вопрос,суффикс 10 лет?,summary
prompt 1,0.999771,0.999758,0.999615,0.999368,0.884264,0.94267



Step 2


Unnamed: 0,typo first e,add salutation,префикс ======,префикс вопрос,суффикс 10 лет?,summary
prompt 1,0.999519,0.994995,0.992026,0.993661,0.941748,0.009206



Step 3


Unnamed: 0,typo first e,add salutation,префикс ======,префикс вопрос,суффикс 10 лет?,summary
prompt 1,0.999106,0.998334,0.996072,0.993689,0.848659,0.002137



=== KL-divergence ===

Step 1


Unnamed: 0,typo first e,add salutation,префикс ======,префикс вопрос,суффикс 10 лет?,summary
prompt 1,0.001607,0.004065,0.005754,0.010449,0.904726,0.303859



Step 2


Unnamed: 0,typo first e,add salutation,префикс ======,префикс вопрос,суффикс 10 лет?,summary
prompt 1,0.003405,0.016056,0.025017,0.025585,0.165439,3.005191



Step 3


Unnamed: 0,typo first e,add salutation,префикс ======,префикс вопрос,суффикс 10 лет?,summary
prompt 1,0.002648,0.00569,0.006568,0.013234,0.263314,7.900978
