# Vocabulary and POS Analysis (Fridge vs TinyStories) — Colab

Цей ноутбук рахує унікальні токени, частоти, інтерсекшн/різниці та POS-статистику (дієслова, іменники, прикметники) для двох корпусів:
- A: ваш `fridge_dataset_v1.3_clean.json`
- B: TinyStories (через HuggingFace / локальні тексти / або як `train.bin` + `val.bin` токен-ідентифікатори)

Результати зберігаються у `reports/token_stats/` (у Colab — за замовчуванням в `/content/reports/token_stats`).

Кроки:
1. Встановити залежності
2. Налаштувати шляхи/режим завантаження TinyStories
3. (Опційно) підключити Google Drive
4. Запустити обчислення і подивитись результати


In [None]:
# Якщо працюєте в Google Colab — запустіть цю комірку
pip -q install python-docx
# Для завантаження TinyStories з HF
pip -q install datasets
# Для роботи з Excel
!pip install xlsxwriter

In [None]:
import os
import json
import re
from pathlib import Path
from collections import Counter
from typing import List, Iterable, Tuple, Dict, Optional, Generator

# --- Helpers from scripts/analyze_vocab.py (inlined for Colab use) ---

PCT_DECIMALS = 6  # точність відсотків у звітах


def read_json_array_records(json_path: str) -> List[dict]:
    with open(json_path, "r", encoding="utf-8") as f:
        data = json.load(f)
    if not isinstance(data, list):
        raise ValueError(f"Expected a JSON array at {json_path}")
    return data


def load_texts_fridge_json(json_path: str, tinyfridge=False) -> List[str]:
    records = read_json_array_records(json_path)
    texts: List[str] = []
    for rec in records:

        if tinyfridge:
            instr = rec.get("story") or ""
        else:
            instr = rec.get("instruction") or ""
        resp = rec.get("response") or ""
        joined = (str(instr).strip() + "\n" + str(resp).strip()).strip()
        if joined:
            texts.append(joined)
    return texts


def load_texts_from_path(path: str) -> List[str]:
    p = Path(path)
    if not p.exists():
        raise FileNotFoundError(f"Path not found: {path}")
    if p.is_dir():
        texts: List[str] = []
        for file in p.rglob("*.txt"):
            try:
                texts.append(file.read_text(encoding="utf-8"))
            except Exception:
                pass
        return texts
    if p.suffix.lower() == ".jsonl":
        texts = []
        with open(p, "r", encoding="utf-8") as f:
            for line in f:
                line = line.strip()
                if not line:
                    continue
                try:
                    obj = json.loads(line)
                    text = obj.get("text") or obj.get("story") or obj.get("content")
                    if text:
                        texts.append(str(text))
                except Exception:
                    texts.append(line)
        return texts
    if p.suffix.lower() == ".json":
        try:
            data = json.loads(p.read_text(encoding="utf-8"))
            if isinstance(data, list):
                texts: List[str] = []
                for obj in data:
                    if isinstance(obj, dict):
                        text = obj.get("text") or obj.get("story") or obj.get("content")
                        if text:
                            texts.append(str(text))
                if texts:
                    return texts
        except Exception:
            pass
        return [p.read_text(encoding="utf-8")]
    return [p.read_text(encoding="utf-8")]


def load_texts_tinystories(path: str = None, use_hf: bool = False) -> List[str]:
    if path:
        return load_texts_from_path(path)
    if use_hf:
        from datasets import load_dataset
        ds_train = load_dataset("roneneldan/TinyStories", split="train")
        ds_val = load_dataset("roneneldan/TinyStories", split="validation")
        texts = [r["text"] for r in ds_train] + [r["text"] for r in ds_val]
        return texts
    raise ValueError("TinyStories source not provided. Pass tinystories_path or use_hf=True")


import numpy as np

def np_fromfile(path: str, dtype_str: str):
    if dtype_str not in ("uint16", "uint32"):
        raise ValueError("Unsupported dtype. Use 'uint16' or 'uint32'.")
    dtype = np.uint16 if dtype_str == "uint16" else np.uint32
    return np.fromfile(path, dtype=dtype)


def load_tinystories_ids_from_bins(train_bin: str, val_bin: str, dtype: str = "uint16"):
    arr_train = np_fromfile(train_bin, dtype)
    arr_val = np_fromfile(val_bin, dtype)
    return np.concatenate([arr_train, arr_val])


import tiktoken

def tokenize_bpe_gpt2(texts: Iterable[str]) -> List[str]:
    enc = tiktoken.get_encoding("gpt2")
    tokens: List[str] = []
    for t in texts:
        ids = enc.encode(t, disallowed_special=())
        tokens.extend(enc.decode_single_token_bytes(i).decode("utf-8", errors="replace") for i in ids)
    return tokens


def ids_counter_to_token_counter(id_counter: Dict[int, int]) -> Counter:
    enc = tiktoken.get_encoding("gpt2")
    token_counter: Counter = Counter()
    for tok_id, cnt in id_counter.items():
        token_str = enc.decode_single_token_bytes(int(tok_id)).decode("utf-8", errors="replace")
        token_counter[token_str] += int(cnt)
    return token_counter


def decode_ids_to_text_chunks(ids, chunk_tokens: int = 200_000) -> Generator[str, None, None]:
    enc = tiktoken.get_encoding("gpt2")
    total_len = len(ids)
    for i in range(0, total_len, chunk_tokens):
        chunk = ids[i : i + chunk_tokens]
        if not isinstance(chunk, list):
            try:
                chunk = chunk.tolist()
            except Exception:
                chunk = list(chunk)
        yield enc.decode(chunk)


_WORD_RE = re.compile(r"[A-Za-z']+")

def tokenize_words(texts: Iterable[str]) -> List[str]:
    tokens: List[str] = []
    for t in texts:
        tokens.extend(m.group(0).lower() for m in _WORD_RE.finditer(t))
    return tokens


def compute_counter(tokens: Iterable[str]) -> Counter:
    return Counter(tokens)


def jaccard_percent(a_vocab: set, b_vocab: set) -> float:
    if not a_vocab and not b_vocab:
        return 0.0
    inter = len(a_vocab & b_vocab)
    union = len(a_vocab | b_vocab)
    return 100.0 * inter / union if union else 0.0


def ensure_dir(path: str) -> None:
    Path(path).mkdir(parents=True, exist_ok=True)


from docx import Document

def write_docx_table(path: str, title: str, subtitle_lines: List[str], headers: List[str], rows: List[List[str]]) -> None:
    doc = Document()
    doc.add_heading(title, level=1)
    for line in subtitle_lines:
        if line:
            doc.add_paragraph(line)
    table = doc.add_table(rows=1, cols=len(headers))
    hdr_cells = table.rows[0].cells
    for i, h in enumerate(headers):
        hdr_cells[i].text = h
    for row in rows:
        row_cells = table.add_row().cells
        for i, cell_val in enumerate(row):
            row_cells[i].text = str(cell_val)
    doc.save(path)


import csv

def write_csv(path: str, headers: List[str], rows: List[List[str]]) -> None:
    with open(path, "w", encoding="utf-8", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(headers)
        for row in rows:
            writer.writerow(row)


def format_pct(numerator: int, denominator: int) -> str:
    if denominator == 0:
        return f"{0:.{PCT_DECIMALS}f}%"
    return f"{(numerator / denominator) * 100:.{PCT_DECIMALS}f}%"


def top_rows_from_counter(counter: Counter, total: int) -> List[Tuple[str, int, str]]:
    rows: List[Tuple[str, int, str]] = []
    for token, cnt in counter.most_common():
        rows.append((token, cnt, format_pct(cnt, total)))
    return rows


def compute_and_save_freq_reports(out_dir: str, label: str, counter: Counter, total: int, unit: str) -> None:
    rows_triplets = top_rows_from_counter(counter, total)
    headers = ["token", "count", "percent"]
    rows = [[t, str(c), p] for (t, c, p) in rows_triplets]
    ensure_dir(out_dir)
    write_docx_table(
        os.path.join(out_dir, f"{unit}_freq_{label}.docx"),
        title=f"Top frequencies — {label} ({unit})",
        subtitle_lines=[f"Total tokens: {total}", "Sorted by count desc"],
        headers=headers,
        rows=rows,
    )
    write_csv(
        os.path.join(out_dir, f"{unit}_freq_{label}.csv"),
        headers=headers,
        rows=rows,
    )


def compute_and_save_set_reports(out_dir: str, unit: str, a_label: str, b_label: str, a_counter: Counter, b_counter: Counter) -> None:
    a_vocab = set(a_counter.keys())
    b_vocab = set(b_counter.keys())
    inter = a_vocab & b_vocab
    only_a = a_vocab - b_vocab
    only_b = b_vocab - a_vocab
    union = a_vocab | b_vocab

    jaccard = jaccard_percent(a_vocab, b_vocab)

    total_a_tokens = sum(a_counter.values())
    total_b_tokens = sum(b_counter.values())

    # intersection (без percent_combined)
    inter_rows: List[List[str]] = []
    for tok in sorted(inter, key=lambda t: (a_counter[t] + b_counter[t]), reverse=True):
        a_cnt = a_counter[tok]
        b_cnt = b_counter[tok]
        total_cnt = a_cnt + b_cnt
        pct_a = format_pct(a_cnt, total_a_tokens)
        pct_b = format_pct(b_cnt, total_b_tokens)
        inter_rows.append([tok, str(total_cnt), str(a_cnt), str(b_cnt), pct_a, pct_b])
    inter_headers = [
        "token",
        "total_count",
        f"{a_label}_count",
        f"{b_label}_count",
        f"{a_label}_percent",
        f"{b_label}_percent",
    ]
    write_docx_table(
        os.path.join(out_dir, f"{unit}_intersection.docx"),
        title=f"Vocabulary intersection ({unit})",
        subtitle_lines=[
            f"|A|={len(a_vocab)}, |B|={len(b_vocab)}, |A∩B|={len(inter)}, |A∪B|={len(union)}",
            f"Jaccard (|∩|/|∪|): {jaccard:.{PCT_DECIMALS}f}%",
        ],
        headers=inter_headers,
        rows=inter_rows,
    )
    write_csv(
        os.path.join(out_dir, f"{unit}_intersection.csv"),
        headers=inter_headers,
        rows=inter_rows,
    )

    # A - B
    diff_a_rows: List[List[str]] = []
    for tok in sorted(only_a, key=lambda t: a_counter[t], reverse=True):
        a_cnt = a_counter[tok]
        pct_a = format_pct(a_cnt, total_a_tokens)
        diff_a_rows.append([tok, pct_a, str(a_cnt)])
    diff_a_headers = ["token", f"{a_label}_percent", f"{a_label}_count"]
    write_docx_table(
        os.path.join(out_dir, f"{unit}_diff_{a_label}-minus-{b_label}.docx"),
        title=f"Vocabulary difference A-B ({unit})",
        subtitle_lines=[
            f"|A−B|={len(only_a)} (of |A∪B|={len(union)} → {(100.0*len(only_a)/len(union) if union else 0.0):.{PCT_DECIMALS}f}%)",
            f"A is {a_label}, B is {b_label}",
        ],
        headers=diff_a_headers,
        rows=diff_a_rows,
    )
    write_csv(
        os.path.join(out_dir, f"{unit}_diff_{a_label}-minus-{b_label}.csv"),
        headers=diff_a_headers,
        rows=diff_a_rows,
    )

    # B - A
    diff_b_rows: List[List[str]] = []
    for tok in sorted(only_b, key=lambda t: b_counter[t], reverse=True):
        b_cnt = b_counter[tok]
        pct_b = format_pct(b_cnt, total_b_tokens)
        diff_b_rows.append([tok, pct_b, str(b_cnt)])
    diff_b_headers = ["token", f"{b_label}_percent", f"{b_label}_count"]
    write_docx_table(
        os.path.join(out_dir, f"{unit}_diff_{b_label}-minus-{a_label}.docx"),
        title=f"Vocabulary difference B-A ({unit})",
        subtitle_lines=[
            f"|B−A|={len(only_b)} (of |A∪B|={len(union)} → {(100.0*len(only_b)/len(union) if union else 0.0):.{PCT_DECIMALS}f}%)",
            f"B is {b_label}, A is {a_label}",
        ],
        headers=diff_b_headers,
        rows=diff_b_rows,
    )
    write_csv(
        os.path.join(out_dir, f"{unit}_diff_{b_label}-minus-{a_label}.csv"),
        headers=diff_b_headers,
        rows=diff_b_rows,
    )

from tqdm import tqdm
import time

def pos_counts(texts: List[str]) -> Dict[str, int]:
    # Try spaCy; fallback NLTK
    counts = {"VERB": 0, "NOUN": 0, "ADJ": 0}
    print(f"Обробляється {len(texts)} текстів...")
    start_time = time.time()
    try:
        import spacy
        spacy.require_gpu()
        try:
            nlp = spacy.load("en_core_web_sm")
            nlp.to("cuda")
        except Exception:
            from spacy.cli import download as spacy_download
            spacy_download("en_core_web_sm")
            nlp = spacy.load("en_core_web_sm")
            nlp.to("cuda")
        for doc in tqdm(nlp.pipe(texts, disable=["ner", "parser"]), 
                       total=len(texts), desc="spaCy POS tagging"):
            for token in doc:
                if token.pos_ in counts:
                    counts[token.pos_] += 1
        elapsed = time.time() - start_time
        print(f"spaCy завершено за {elapsed:.1f}s")
        return counts
    except Exception:
        pass

    try:
        import nltk
        try:
            nltk.data.find("tokenizers/punkt")
        except LookupError:
            nltk.download("punkt", quiet=True)
        try:
            nltk.data.find("taggers/averaged_perceptron_tagger")
        except LookupError:
            nltk.download("averaged_perceptron_tagger", quiet=True)
        try:
            nltk.data.find("taggers/averaged_perceptron_tagger_eng")
        except LookupError:
            nltk.download("averaged_perceptron_tagger_eng", quiet=True)
        from nltk import word_tokenize, pos_tag
        for text in tqdm(texts, desc="NLTK POS tagging"):
            tokens = nltk.word_tokenize(text)
            tags = nltk.pos_tag(tokens)
            for _, tag in tags:
                if tag.startswith("VB"):
                    counts["VERB"] += 1
                elif tag.startswith("NN"):
                    counts["NOUN"] += 1
                elif tag.startswith("JJ"):
                    counts["ADJ"] += 1
        elapsed = time.time() - start_time
        print(f"NLTK завершено за {elapsed:.1f}s")
        return counts
    except Exception:
        return counts


def write_pos_docx(out_path: str, a_label: str, b_label: str, a_counts: Dict[str, int], b_counts: Dict[str, int]) -> None:
    total_a = sum(a_counts.values())
    total_b = sum(b_counts.values())
    headers = ["class", f"{a_label}_count", f"{a_label}_percent", f"{b_label}_count", f"{b_label}_percent"]
    rows: List[List[str]] = []
    def pct(v, tot):
        return f"{(v/tot*100):.{PCT_DECIMALS}f}%" if tot else f"{0:.{PCT_DECIMALS}f}%"
    for cls in ["VERB", "NOUN", "ADJ"]:
        rows.append([
            cls,
            str(a_counts.get(cls, 0)),
            pct(a_counts.get(cls, 0), total_a),
            str(b_counts.get(cls, 0)),
            pct(b_counts.get(cls, 0), total_b),
        ])
    write_docx_table(
        out_path,
        title="POS counts (VERB/NOUN/ADJ)",
        subtitle_lines=[f"Totals — {a_label}: {total_a}, {b_label}: {total_b}"],
        headers=headers,
        rows=rows,
    )


In [3]:
# Налаштування
USE_HF = False               # True: завантажити TinyStories з HuggingFace
TINYSTORIES_PATH = None      # шлях до тек/файлів TinyStories текстом (якщо не HF і не .bin)
TINYSTORIES_BIN_TRAIN = None # шлях до train.bin (якщо .bin-режим)
TINYSTORIES_BIN_VAL = None   # шлях до val.bin (якщо .bin-режим)
TINYSTORIES_BIN_DTYPE = 'uint16'  # 'uint16' або 'uint32'
DECODE_BIN_FOR_WORDS_POS = False  # декодувати .bin назад у текст для word/POS (повільніше)

FRIDGE_JSON = '../stories.json'
OUT_DIR = 'reports/token_stats'


In [7]:
# Завантаження корпусів і підрахунки

a_label = 'tinyfridge'
# b_label = 'tinystories'

# A: Fridge JSON
a_texts = load_texts_fridge_json(FRIDGE_JSON)

# B: TinyStories — HF / текст / або .bin
# b_texts: Optional[List[str]] = None
# b_ids = None
# if TINYSTORIES_BIN_TRAIN and TINYSTORIES_BIN_VAL:
#     b_ids = load_tinystories_ids_from_bins(TINYSTORIES_BIN_TRAIN, TINYSTORIES_BIN_VAL, dtype=TINYSTORIES_BIN_DTYPE)
# elif TINYSTORIES_PATH or USE_HF:
#     b_texts = load_texts_tinystories(TINYSTORIES_PATH, use_hf=USE_HF)
# else:
#     raise ValueError('Вкажіть джерело TinyStories: USE_HF=True, або TINYSTORIES_PATH, або шляхи до .bin')

# # BPE токени
a_bpe = tokenize_bpe_gpt2(a_texts)
a_bpe_counter = compute_counter(a_bpe)

# if b_ids is not None:
#     # Рахуємо частоти по id і мапимо до BPE-токенів
#     unique_ids, counts = np.unique(b_ids, return_counts=True)
#     id_counter = {int(i): int(c) for i, c in zip(unique_ids.tolist(), counts.tolist())}
#     b_bpe_counter = ids_counter_to_token_counter(id_counter)
#     total_b_bpe = sum(b_bpe_counter.values())
#     # Для TinyStories слів/POS треба або декодувати, або пропустити
#     if DECODE_BIN_FOR_WORDS_POS:
#         decoded_chunks = list(decode_ids_to_text_chunks(b_ids))
#         b_texts_for_pos = decoded_chunks
#     else:
#         b_texts_for_pos = []
# else:
#     assert b_texts is not None
#     b_bpe = tokenize_bpe_gpt2(b_texts)
#     b_bpe_counter = compute_counter(b_bpe)
#     total_b_bpe = len(b_bpe)
#     b_texts_for_pos = b_texts

# # Звіти по множинах (BPE)
ensure_dir(OUT_DIR)
# compute_and_save_set_reports(OUT_DIR, 'bpe', a_label, b_label, a_bpe_counter, b_bpe_counter)

# # Частоти (BPE)
# compute_and_save_freq_reports(OUT_DIR, b_label, b_bpe_counter, total_b_bpe, unit='bpe')
compute_and_save_freq_reports(OUT_DIR, a_label, a_bpe_counter, len(a_bpe), unit='bpe')

# # Word/POS
a_words = tokenize_words(a_texts)
a_word_counter = compute_counter(a_words)

# if len(b_texts_for_pos) > 0:
#     b_words = tokenize_words(b_texts_for_pos)
#     b_word_counter = compute_counter(b_words)
# else:
#     b_words = []
#     b_word_counter = Counter()

# # Частоти (word)
# if len(b_words) > 0:
#     compute_and_save_freq_reports(OUT_DIR, b_label, b_word_counter, len(b_words), unit='word')
compute_and_save_freq_reports(OUT_DIR, a_label, a_word_counter, len(a_words), unit='word')

# POS
a_pos = pos_counts(a_texts)
# b_pos = pos_counts(b_texts_for_pos) if len(b_texts_for_pos) > 0 else {"VERB": 0, "NOUN": 0, "ADJ": 0}
# write_pos_docx(os.path.join(OUT_DIR, 'pos_counts.docx'), a_label, b_label, a_counts=a_pos, b_counts=b_pos)

# # Додаткові резюме
with open(os.path.join(OUT_DIR, 'bpe_unique_counts.txt'), 'w', encoding='utf-8') as f:
    f.write(f"{a_label} unique tokens (BPE): {len(a_bpe_counter)}\n")
    # f.write(f"{b_label} unique tokens (BPE): {len(b_bpe_counter)}\n")

with open(os.path.join(OUT_DIR, 'bpe_sets_summary.txt'), 'w', encoding='utf-8') as f:
    a_vocab = set(a_bpe_counter.keys())
    # b_vocab = set(b_bpe_counter.keys())
    # inter = len(a_vocab & b_vocab)
    # union = len(a_vocab | b_vocab)
    # jacc = 100.0 * inter / union if union else 0.0
    # f.write(f"|A|={len(a_vocab)}, |B|={len(b_vocab)}, |A∩B|={inter}, |A∪B|={union}\n")
    f.write(f"|A|={len(a_vocab)}")
    # f.write(f"Jaccard (|∩|/|∪|): {jacc:.{PCT_DECIMALS}f}%\n")


Обробляється 85994 текстів...


NLTK POS tagging: 100%|██████████| 85994/85994 [16:13<00:00, 88.35it/s] 


NLTK завершено за 973.3s


In [None]:
# Додатковий аналіз: унікальні слова та частоти усередині POS-класів

from collections import defaultdict


def pos_tag_texts_spacy(texts: List[str]):
    try:
        import spacy
        try:
            nlp = spacy.load("en_core_web_sm")
        except Exception:
            from spacy.cli import download as spacy_download
            spacy_download("en_core_web_sm")
            nlp = spacy.load("en_core_web_sm")
        for doc in nlp.pipe(texts, disable=["ner", "parser"]):
            yield [(t.text, t.lemma_, t.pos_) for t in doc]
    except Exception:
        return None


def build_pos_vocab_and_freq(tagged_docs, allowed_pos=("NOUN", "VERB", "ADJ")):
    # Повертає по кожному POS: Counter лем, загальна кількість токенів цього POS
    pos_to_counter = {p: Counter() for p in allowed_pos}
    pos_to_total = {p: 0 for p in allowed_pos}
    for doc in tagged_docs:
        for text, lemma, pos in doc:
            if pos in pos_to_counter:
                lemma_norm = lemma.lower()
                pos_to_counter[pos][lemma_norm] += 1
                pos_to_total[pos] += 1
    return pos_to_counter, pos_to_total


def write_pos_vocab_docx(out_path: str, label: str, pos_to_counter: Dict[str, Counter], pos_to_total: Dict[str, int], top_k: int = 200):
    # Формат: POS, unique_count, top lemma, percent_in_pos, count
    rows: List[List[str]] = []
    headers = ["pos", "unique_in_pos", "lemma", "percent_in_pos", "count"]
    for pos_cls in ("NOUN", "VERB", "ADJ"):
        counter = pos_to_counter.get(pos_cls, Counter())
        total = pos_to_total.get(pos_cls, 0)
        unique = len(counter)
        for lemma, cnt in counter.most_common(top_k):
            rows.append([pos_cls, str(unique), lemma, format_pct(cnt, total), str(cnt)])
    write_docx_table(
        out_path,
        title=f"POS lemma frequencies — {label}",
        subtitle_lines=["percent_in_pos = частка відносно всіх токенів цього POS у корпусі"],
        headers=headers,
        rows=rows,
    )


# Обчислюємо POS-леми і частоти для обох корпусів (якщо є тексти)
fridge_tagged = list(pos_tag_texts_spacy(a_texts))
if len(b_texts_for_pos) > 0:
    tinystories_tagged = list(pos_tag_texts_spacy(b_texts_for_pos))
else:
    tinystories_tagged = []

# Fridge POS-леми
if fridge_tagged:
    fridge_pos_counters, fridge_pos_totals = build_pos_vocab_and_freq(fridge_tagged)
    write_pos_vocab_docx(os.path.join(OUT_DIR, 'pos_lemmas_fridge.docx'), 'fridge', fridge_pos_counters, fridge_pos_totals, top_k=500)

# TinyStories POS-леми (якщо були тексти)
if tinystories_tagged:
    ts_pos_counters, ts_pos_totals = build_pos_vocab_and_freq(tinystories_tagged)
    write_pos_vocab_docx(os.path.join(OUT_DIR, 'pos_lemmas_tinystories.docx'), 'tinystories', ts_pos_counters, ts_pos_totals, top_k=500)

print('Готово: додаткові POS-леми збережено (за наявності текстів).')
