<a href="https://colab.research.google.com/github/Kruglikle/EduText-Analyzer/blob/data/Edu_preproccesing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [8]:
text_raw = open("starlight-9.txt", "r", encoding="utf-8").read()

In [9]:
# =========================
# CELL 1 — PREPROCESS (MVP) [UPDATED v2: MODULE/Module + no-merge headings]
# =========================

import re
from dataclasses import dataclass
from typing import List, Dict, Optional

# Поддерживает "----- PAGE 2 -----" и "===== PAGE 2 ====="
PAGE_RE_DEFAULT = r"^[=\-]{2,}\s*PAGE\s*(\d+)\s*[=\-]{2,}$"

@dataclass
class PageBlock:
    page_num: int
    text_raw: str
    text_clean: str
    text_en: str
    module_id: Optional[str] = None

def normalize_whitespace(s: str) -> str:
    # MVP (2): нормализация пробелов/табов/переводов строк без потери пунктуации
    s = (s or "").replace("\ufeff", "")
    s = s.replace("\r\n", "\n").replace("\r", "\n")
    s = s.replace("\t", " ")
    s = re.sub(r"[ \u00A0]+", " ", s)          # множественные пробелы / NBSP
    s = re.sub(r"[ \u00A0]+\n", "\n", s)       # пробелы перед \n
    return s.strip()

def dehyphenate_linebreaks(s: str) -> str:
    # MVP (3): inter-\nnational -> international
    return re.sub(r"(?<=\w)-\n(?=\w)", "", s)

def looks_like_new_block(line: str) -> bool:
    # MVP (4): эвристика "не склеивать", если похоже на новый блок/задание/заголовок
    t = (line or "").strip()
    if not t:
        return True

    # "1. " / "2) "
    if re.match(r"^\d+[\.\)]\s+", t):
        return True

    # "1 a) ..." / "12 b) ..."
    if re.match(r"^\d+\s*[a-zA-Z]\)\s+", t):
        return True

    # "1a People"
    if re.match(r"^\d+[a-zA-Z]\b", t):
        return True

    # bullets
    if re.match(r"^[•\-\*]\s+", t):
        return True

    # Module/Unit/Lesson/Starter module — в любом регистре (MODULE 3 тоже)
    if re.match(r"^(module|unit|lesson|starter module|модуль|юнит|урок)\b", t, re.IGNORECASE):
        return True

    # капслок-заголовки (осторожно)
    letters = re.sub(r"[^A-Za-zА-Яа-яЁё]", "", t)
    if letters and sum(ch.isupper() for ch in letters) / max(len(letters), 1) > 0.8 and len(letters) >= 6:
        return True

    return False

def join_wrapped_lines_preserve_paragraphs(s: str) -> str:
    """
    MVP (4): аккуратно склеиваем переносы строк внутри абзаца.
    FIX: не склеиваем, если текущая строка сама является заголовком/маркером блока
         (например, 'MODULE 3'), чтобы не превращать 'MODULE 3' + 'Body and Soul'
         в одну строку.
    """
    lines = s.split("\n")
    out = []
    i = 0

    while i < len(lines):
        line = lines[i]
        if line.strip() == "":
            out.append("")
            i += 1
            continue

        merged = line.rstrip()
        j = i

        while j + 1 < len(lines):
            nxt = lines[j + 1]
            if nxt.strip() == "":
                break

            # FIX 1: если текущая строка — маркер блока (Module/Starter/etc.), НЕ склеиваем дальше
            if looks_like_new_block(merged):
                break

            # FIX 2: если следующая строка — новый блок/задание, тоже не склеиваем
            if looks_like_new_block(nxt):
                break

            # сильная пунктуация в конце строки
            if re.search(r"[.!?…:;]$|[.!?…:;][\"”')\]]*$", merged.strip()):
                break

            merged = merged + " " + nxt.strip()
            j += 1

        out.append(merged)
        i = j + 1

    joined = "\n".join(out)
    joined = re.sub(r"\n{3,}", "\n\n", joined)
    return joined.strip()

def extract_english_layer(s: str) -> str:
    """
    MVP (5): EN-only слой:
    - оставляем токены с латиницей
    - оставляем цифры и базовую пунктуацию
    - сохраняем \n (чтобы можно было по страницам/блокам)
    """
    tokens = re.findall(r"[A-Za-z]+(?:'[A-Za-z]+)?|[0-9]+|[.!?,;:\-—()\[\]\"“”'…]+|\n+", s)
    kept = []
    for tok in tokens:
        if tok.startswith("\n"):
            kept.append(tok)
            continue
        if re.search(r"[A-Za-z]", tok):
            kept.append(tok)
        elif re.match(r"^[0-9]+$", tok):
            kept.append(tok)
        elif re.match(r"^[.!?,;:\-—()\[\]\"“”'…]+$", tok):
            kept.append(tok)

    out = " ".join(kept)
    out = out.replace(" \n ", "\n").replace(" \n", "\n").replace("\n ", "\n")
    out = re.sub(r"[ ]{2,}", " ", out)
    out = re.sub(r"\s+([.!?,;:])", r"\1", out)  # убрать пробел перед пунктуацией
    out = re.sub(r"\n{3,}", "\n\n", out)
    return out.strip()

def detect_module_id(text_clean: str) -> Optional[str]:
    """
    UPDATED v2:
    Ловит Module N / MODULE N / Модуль N (строго с начала строки),
    даже если после номера на строке есть слова (например 'MODULE 3 Body and Soul').
    'Starter module' игнорируем.
    """
    for line in (text_clean or "").splitlines():
        t = line.strip()
        if re.match(r"^starter\s+module\b", t, re.IGNORECASE):
            continue
        m = re.match(r"^(module|модуль)\s+([0-9IVXLC]+)\b", t, re.IGNORECASE)
        if m:
            return m.group(2)
    return None

def split_into_pages(text: str, page_re: str = PAGE_RE_DEFAULT) -> List[Dict]:
    text = normalize_whitespace(text)
    page_re_comp = re.compile(page_re, re.MULTILINE)
    matches = list(page_re_comp.finditer(text))

    if not matches:
        return [{"page_num": 1, "text": text}]

    pages = []

    # Если до первого маркера есть текст — считаем как PAGE 1
    prefix = text[:matches[0].start()].strip()
    if prefix:
        pages.append({"page_num": 1, "text": prefix})

    for idx, m in enumerate(matches):
        page_num = int(m.group(1))
        start = m.end()
        end = matches[idx + 1].start() if idx + 1 < len(matches) else len(text)
        page_text = text[start:end].strip()
        pages.append({"page_num": page_num, "text": page_text})

    return pages

def preprocess_document(text_raw: str, page_re: str = PAGE_RE_DEFAULT) -> List[PageBlock]:
    pages = split_into_pages(text_raw, page_re=page_re)

    out_pages: List[PageBlock] = []
    last_module: Optional[str] = None

    for p in pages:
        raw = p["text"]
        s = normalize_whitespace(raw)
        s = dehyphenate_linebreaks(s)
        s = join_wrapped_lines_preserve_paragraphs(s)

        module_here = detect_module_id(s)
        if module_here:
            last_module = module_here

        en = extract_english_layer(s)

        out_pages.append(PageBlock(
            page_num=p["page_num"],
            text_raw=raw,
            text_clean=s,
            text_en=en,
            module_id=last_module
        ))

    return out_pages

# --- RUN ---
pages = preprocess_document(text_raw)

print("Pages:", len(pages), "first:", pages[0].page_num)
print(pages[0].text_clean[:300])
print("--- EN ---")
print(pages[0].text_en[:300])

# Быстрый чек модулей (первые 10 отметок)
mods = [(p.page_num, p.module_id) for p in pages if p.module_id is not None]
print("First 10 module marks:", mods[:10])


Pages: 217 first: 1
Starlight 9

Virginia Evans — Jenny Dooley Ksenia Baranova — Victoria Kopylova Radislav Millrood

Student's Book

PROSVESHCHENIYE
PUBLISHERS

Express Publishing
--- EN ---
Starlight 9

Virginia Evans — Jenny Dooley Ksenia Baranova — Victoria Kopylova Radislav Millrood

Student's Book

PROSVESHCHENIYE
PUBLISHERS

Express Publishing
First 10 module marks: [(8, '1'), (9, '1'), (10, '1'), (11, '1'), (12, '1'), (13, '1'), (14, '1'), (15, '1'), (16, '1'), (17, '1')]


In [10]:
!pip install lexicalrichness
#токенизация
!pip install spacy pandas
!python -m spacy download en_core_web_sm #модель для англ яз
!pip install python-Levenshtein
!pip install transliterate
!pip install deep-translator
# textstat для автоматического анализа читаемости текста
!pip install textstat


Collecting en-core-web-sm==3.8.0
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.8.0/en_core_web_sm-3.8.0-py3-none-any.whl (12.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.8/12.8 MB[0m [31m49.2 MB/s[0m eta [36m0:00:00[0m
[?25h[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_sm')
[38;5;3m⚠ Restart to reload dependencies[0m
If you are in a Jupyter or Colab notebook, you may need to restart Python in
order to load all the package's dependencies. You can do this by selecting the
'Restart kernel' or 'Restart runtime' option.


In [11]:
# =========================
# CELL 2 — METRICS (by page / by module) [UPDATED v2]
# =========================

import re
import math
import numpy as np
import pandas as pd
import textstat

def tokenize_en(text: str) -> list[str]:
    return re.findall(r"[A-Za-z]+(?:'[A-Za-z]+)?", (text or "").lower())

def compute_ttr_family(tokens: list[str], segment_len: int = 100) -> dict:
    n = len(tokens)
    if n == 0:
        return {
            "tokens": 0, "types": 0,
            "ttr": np.nan, "rttr": np.nan, "cttr": np.nan, f"msttr_{segment_len}": np.nan
        }

    types = len(set(tokens))
    ttr  = types / n
    rttr = types / math.sqrt(n)
    cttr = types / math.sqrt(2 * n)

    n_full = n // segment_len
    if n_full > 0:
        seg_ttrs = []
        for i in range(n_full):
            seg = tokens[i*segment_len:(i+1)*segment_len]
            seg_ttrs.append(len(set(seg)) / segment_len)
        msttr = float(np.mean(seg_ttrs))
    else:
        msttr = np.nan

    return {
        "tokens": n,
        "types": types,
        "ttr": ttr,
        "rttr": rttr,
        "cttr": cttr,
        f"msttr_{segment_len}": msttr
    }

def safe_div(a, b):
    return a / b if b else np.nan

def compute_textstat_metrics(text: str) -> dict:
    text = text or ""
    words = textstat.lexicon_count(text)
    sents = textstat.sentence_count(text)
    syll  = textstat.syllable_count(text)

    return {
        "flesch_reading_ease": textstat.flesch_reading_ease(text) if words else np.nan,
        "flesch_kincaid_grade": textstat.flesch_kincaid_grade(text) if words else np.nan,
        "words_total": words,
        "sentences_total": sents,
        "syllables_total": syll,
        "avg_words_per_sentence": safe_div(words, sents),
    }

def compute_metrics_for_text(text: str, segment_len: int = 100) -> dict:
    m = {}
    m.update(compute_textstat_metrics(text))
    tokens = tokenize_en(text)
    m.update(compute_ttr_family(tokens, segment_len=segment_len))
    return m

# --- METRICS BY PAGE (EN-layer) ---
rows = []
for p in pages:
    m = compute_metrics_for_text(p.text_en, segment_len=100)
    rows.append({
        "page_num": p.page_num,
        "module_id": p.module_id,
        **m
    })

by_page = pd.DataFrame(rows).sort_values("page_num")

# --- flags for "too little text" (useful for UI/explanations) ---
MIN_TOKENS = 50
MIN_WORDS  = 50

by_page["is_sparse"] = (by_page["tokens"] < MIN_TOKENS) | (by_page["words_total"] < MIN_WORDS)

def sparse_reason(row):
    reasons = []
    if row["tokens"] < MIN_TOKENS:
        reasons.append(f"tokens<{MIN_TOKENS}")
    if row["words_total"] < MIN_WORDS:
        reasons.append(f"words<{MIN_WORDS}")
    return ", ".join(reasons) if reasons else ""

by_page["sparse_reason"] = by_page.apply(sparse_reason, axis=1)

display(by_page.head(10))

# --- SAVE ---
by_page.to_excel("metrics_by_page.xlsx", index=False)

# --- METRICS BY MODULE (means) ---
# v2 FIX: не усредняем page_num и служебные поля
metric_cols = [c for c in by_page.columns if c not in ("page_num", "is_sparse", "sparse_reason")]

by_module = (
    by_page[~by_page["is_sparse"]]
    .groupby("module_id", dropna=False)[metric_cols]
    .mean(numeric_only=True)
    .reset_index()
)

# если не хочешь строку с module_id = NaN (страницы до первого модуля), раскомментируй:
# by_module = by_module[by_module["module_id"].notna()].copy()

display(by_module)
by_module.to_excel("metrics_by_module.xlsx", index=False)

print("OK -> metrics_by_page.xlsx, metrics_by_module.xlsx")


Unnamed: 0,page_num,module_id,flesch_reading_ease,flesch_kincaid_grade,words_total,sentences_total,syllables_total,avg_words_per_sentence,tokens,types,ttr,rttr,cttr,msttr_100,is_sparse,sparse_reason
0,1,,5.265,16.996667,18,1,39,18.0,17,17,1.0,4.123106,2.915476,,True,"tokens<50, words<50"
1,2,,54.725,6.62,4,1,7,4.0,2,2,1.0,1.414214,1.0,,True,"tokens<50, words<50"
2,3,,63.885209,7.761238,261,19,398,13.736842,173,126,0.728324,9.579603,6.773802,0.79,False,
3,4,,48.617462,9.927493,361,26,615,13.884615,295,158,0.535593,9.199116,6.504757,0.71,False,
4,5,,19.756,20.263429,550,14,957,39.285714,528,260,0.492424,11.315048,8.000947,0.714,False,
5,6,,75.980359,5.250473,396,38,563,10.421053,349,217,0.621777,11.615744,8.213571,0.72,False,
6,7,,90.253021,2.645361,612,77,785,7.948052,551,272,0.493648,11.58759,8.193663,0.682,False,
7,8,1.0,59.879886,8.203095,199,15,314,13.266667,187,125,0.668449,9.140905,6.463596,0.78,False,
8,9,1.0,63.75533,9.069124,530,28,776,18.928571,518,273,0.527027,11.994931,8.481697,0.742,False,
9,10,1.0,72.655176,6.399669,659,50,941,13.18,619,329,0.531502,13.223626,9.350516,0.738333,False,


Unnamed: 0,module_id,flesch_reading_ease,flesch_kincaid_grade,words_total,sentences_total,syllables_total,avg_words_per_sentence,tokens,types,ttr,rttr,cttr,msttr_100
0,1.0,60.422752,11.009324,609.923077,42.307692,871.423077,24.867408,589.461538,274.884615,0.483768,11.36729,8.037888,0.705549
1,2.0,64.67246,9.054777,586.458333,43.791667,843.166667,19.385744,564.25,280.916667,0.515609,11.881588,8.401552,0.726078
2,3.0,56.425265,12.449376,570.44,39.48,814.44,28.419675,549.72,269.24,0.50337,11.454646,8.099658,0.714359
3,4.0,56.199862,12.275317,588.083333,38.791667,846.208333,27.592482,566.25,284.291667,0.51287,11.942031,8.444291,0.723658
4,5.0,68.126721,7.261541,625.5,47.125,913.291667,14.106803,604.708333,279.041667,0.476916,11.397654,8.059359,0.69832
5,6.0,57.902127,11.807032,536.317647,39.047059,770.941176,26.663227,506.764706,263.364706,0.539899,11.780925,8.330372,0.723945
6,,59.69841,9.169599,436.0,34.8,663.6,17.055255,379.2,206.6,0.574353,10.65942,7.537348,0.7232


OK -> metrics_by_page.xlsx, metrics_by_module.xlsx


In [12]:
# =========================
# CELL 3 — CEFR (overall + by page + by module) [for pages]
# =========================

import re
import numpy as np
import pandas as pd
from collections import Counter, defaultdict

import spacy

# --- (1) spaCy model ---
# Если модель не установлена в Colab, раскомментируй:
# !python -m spacy download en_core_web_sm

nlp = spacy.load("en_core_web_sm", disable=["ner"])  # NER не нужен
# ускоряем (опционально)
try:
    nlp.max_length = 5_000_000
except Exception:
    pass

# --- (2) Load CEFR lexicon ---
CEFR_PATH = "/content/len_cefr.csv"   # <-- путь к твоей базе CEFR

cefr_order = {"A1": 1, "A2": 2, "B1": 3, "B2": 4, "C1": 5, "C2": 6}
cefr_levels = ["A1", "A2", "B1", "B2", "C1", "C2"]

# если хочешь сохранять старый "костыль", включи:
LEVEL_MAP = {"C1": "B1"}   # <-- можно поставить {} если не нужно


def load_cefr_lexicon(path: str) -> dict:
    """
    Читает таблицу со столбцами:
      Word, CEFR Level
    Поддерживает csv с ',' или ';'.
    Возвращает dict: lemma(word)-> минимальный уровень
    """
    # пробуем разные разделители
    df = None
    for sep in [",", ";"]:
        try:
            df_try = pd.read_csv(path, sep=sep, encoding="utf-8")
            if {"Word", "CEFR Level"}.issubset(df_try.columns):
                df = df_try
                break
        except Exception:
            continue

    if df is None:
        raise ValueError("Не смог прочитать CEFR csv. Нужны колонки: 'Word', 'CEFR Level'.")

    word_levels = {}
    for _, row in df.iterrows():
        w = str(row["Word"]).strip().lower()
        lvl = str(row["CEFR Level"]).strip().upper()

        if not w or lvl not in cefr_order:
            continue

        if lvl in LEVEL_MAP:
            lvl = LEVEL_MAP[lvl]

        if w in word_levels:
            # берём минимальный (самый простой) уровень как приоритет
            if cefr_order[lvl] < cefr_order.get(word_levels[w], 999):
                word_levels[w] = lvl
        else:
            word_levels[w] = lvl

    return word_levels


word_levels = load_cefr_lexicon(CEFR_PATH)
print("Loaded CEFR entries:", len(word_levels))


# --- (3) Token/lemma extraction ---
LEMMA_RE = re.compile(r"^[a-z]+$")  # только латиница (леммы)

EXCLUDE_STOPWORDS = True  # как в твоём старом коде

def iter_lemmas_with_pos(doc):
    """
    Возвращает (lemma, pos) по токенам.
    Берём только слова (alpha), только латиницу, без пунктуации.
    """
    for token in doc:
        if token.is_punct or token.is_space:
            continue
        if EXCLUDE_STOPWORDS and token.is_stop:
            continue
        if not token.is_alpha:
            continue

        lemma = (token.lemma_ or token.text).lower().strip()
        if not lemma or not LEMMA_RE.match(lemma):
            continue

        yield lemma, token.pos_


# --- (4) CEFR for ALL pages (fast via nlp.pipe) ---
# Берём EN-layer из предобработки
page_texts = [p.text_en for p in pages]
page_meta  = [(p.page_num, p.module_id) for p in pages]

# Частоты по страницам
page_level_token_counts = []
page_level_type_counts  = []
page_total_tokens = []
page_known_tokens = []

# Общие частоты лемм по всему документу (для word table)
global_lemma_freq = Counter()
global_pos_for_lemma = {}  # lemma -> POS (первое встреченное)

docs = nlp.pipe(page_texts, batch_size=16)

for (page_num, module_id), doc in zip(page_meta, docs):
    lemmas = []
    for lemma, pos in iter_lemmas_with_pos(doc):
        lemmas.append(lemma)
        global_lemma_freq[lemma] += 1
        global_pos_for_lemma.setdefault(lemma, pos)

    page_total_tokens.append(len(lemmas))

    # уровень для каждого токена
    levels_for_tokens = []
    for lm in lemmas:
        lvl = word_levels.get(lm)
        if lvl:
            levels_for_tokens.append(lvl)

    page_known_tokens.append(len(levels_for_tokens))

    # tokens per level
    token_counts = Counter(levels_for_tokens)

    # types per level
    uniq = set(lemmas)
    type_levels = []
    for lm in uniq:
        lvl = word_levels.get(lm)
        if lvl:
            type_levels.append(lvl)
    type_counts = Counter(type_levels)

    page_level_token_counts.append(token_counts)
    page_level_type_counts.append(type_counts)


# --- (5) Build "word-level table" for whole document ---
rows = []
for lemma, freq in global_lemma_freq.items():
    lvl = word_levels.get(lemma)
    if not lvl:
        continue
    rows.append({
        "word": lemma,
        "level": lvl,
        "frequency": freq,
        "pos": global_pos_for_lemma.get(lemma, "X")
    })

df_words = pd.DataFrame(rows)
df_words = df_words.sort_values(by=["level", "frequency"], ascending=[True, False]).reset_index(drop=True)

# Топ-10 по частоте на уровень
df_top10 = (
    df_words.groupby("level", dropna=False)
    .apply(lambda x: x.nlargest(10, "frequency"))
    .reset_index(drop=True)
)

# POS distribution (только по словам с известным CEFR)
df_pos = df_words["pos"].value_counts().reset_index()
df_pos.columns = ["pos", "count"]


# --- (6) Build by_page CEFR distribution table ---
def expand_counts(counter: Counter, prefix: str) -> dict:
    out = {}
    for lvl in cefr_levels:
        out[f"{prefix}_{lvl}"] = int(counter.get(lvl, 0))
    return out

by_page_rows = []
for (page_num, module_id), tok_cnt, typ_cnt, total_toks, known_toks in zip(
    page_meta, page_level_token_counts, page_level_type_counts, page_total_tokens, page_known_tokens
):
    row = {
        "page_num": page_num,
        "module_id": module_id,
        "total_tokens": int(total_toks),
        "known_tokens": int(known_toks),
        "known_share": (known_toks / total_toks) if total_toks else np.nan,
    }
    row.update(expand_counts(tok_cnt, "tokens"))
    row.update(expand_counts(typ_cnt, "types"))

    # проценты по токенам (среди known_tokens)
    for lvl in cefr_levels:
        denom = known_toks if known_toks else 0
        row[f"tokens_pct_{lvl}"] = (row[f"tokens_{lvl}"] / denom) if denom else np.nan

    # проценты по типам (среди types known)
    types_known = sum(row[f"types_{lvl}"] for lvl in cefr_levels)
    for lvl in cefr_levels:
        row[f"types_pct_{lvl}"] = (row[f"types_{lvl}"] / types_known) if types_known else np.nan

    by_page_rows.append(row)

df_cefr_by_page = pd.DataFrame(by_page_rows).sort_values("page_num").reset_index(drop=True)

# флаг "мало текста" (чтобы на сайте объяснять, что метрики нестабильны)
MIN_TOKENS_CEFR = 50
df_cefr_by_page["is_sparse_cefr"] = df_cefr_by_page["total_tokens"] < MIN_TOKENS_CEFR


# --- (7) Aggregate by module ---
# (если module_id None — будет отдельная группа None)
agg_cols = [c for c in df_cefr_by_page.columns if c not in ("page_num")]
df_cefr_by_module = (
    df_cefr_by_page.groupby("module_id", dropna=False)[agg_cols]
    .sum(numeric_only=True)
    .reset_index()
)

# доли/проценты по модулю пересчитаем аккуратно:
def recompute_shares(df_mod: pd.DataFrame) -> pd.DataFrame:
    df_mod = df_mod.copy()
    # known_share
    df_mod["known_share"] = df_mod["known_tokens"] / df_mod["total_tokens"].replace(0, np.nan)

    for lvl in cefr_levels:
        df_mod[f"tokens_pct_{lvl}"] = df_mod[f"tokens_{lvl}"] / df_mod["known_tokens"].replace(0, np.nan)

    # types_pct по модулю: сначала посчитать types_known
    types_known = sum(df_mod[f"types_{lvl}"] for lvl in cefr_levels)
    for lvl in cefr_levels:
        df_mod[f"types_pct_{lvl}"] = df_mod[f"types_{lvl}"] / types_known.replace(0, np.nan)

    return df_mod

df_cefr_by_module = recompute_shares(df_cefr_by_module)


# --- (8) Save outputs ---
df_words.to_csv("cefr_word_levels_table.csv", index=False, encoding="utf-8")
df_top10.to_csv("cefr_top10_by_level.csv", index=False, encoding="utf-8")
df_pos.to_csv("cefr_pos_distribution.csv", index=False, encoding="utf-8")
df_cefr_by_page.to_csv("cefr_by_page.csv", index=False, encoding="utf-8")
df_cefr_by_module.to_csv("cefr_by_module.csv", index=False, encoding="utf-8")

with pd.ExcelWriter("cefr_outputs.xlsx") as w:
    df_words.to_excel(w, sheet_name="word_table", index=False)
    df_top10.to_excel(w, sheet_name="top10_by_level", index=False)
    df_pos.to_excel(w, sheet_name="pos_distribution", index=False)
    df_cefr_by_page.to_excel(w, sheet_name="by_page", index=False)
    df_cefr_by_module.to_excel(w, sheet_name="by_module", index=False)

print("OK -> cefr_outputs.xlsx + csv files")

display(df_cefr_by_page.head(10))
display(df_cefr_by_module.head(10))


Loaded CEFR entries: 7654


  .apply(lambda x: x.nlargest(10, "frequency"))


OK -> cefr_outputs.xlsx + csv files


Unnamed: 0,page_num,module_id,total_tokens,known_tokens,known_share,tokens_A1,tokens_A2,tokens_B1,tokens_B2,tokens_C1,...,tokens_pct_B2,tokens_pct_C1,tokens_pct_C2,types_pct_A1,types_pct_A2,types_pct_B1,types_pct_B2,types_pct_C1,types_pct_C2,is_sparse_cefr
0,1,,17,5,0.294118,1,1,2,1,0,...,0.2,0.0,0.0,0.2,0.2,0.4,0.2,0.0,0.0,True
1,2,,2,2,1.0,0,1,0,1,0,...,0.5,0.0,0.0,0.0,0.5,0.0,0.5,0.0,0.0,True
2,3,,111,64,0.576577,11,15,21,17,0,...,0.265625,0.0,0.0,0.16,0.22,0.36,0.26,0.0,0.0,False
3,4,,236,165,0.699153,71,32,39,23,0,...,0.139394,0.0,0.0,0.336364,0.227273,0.281818,0.154545,0.0,0.0,False
4,5,,381,302,0.792651,113,81,44,64,0,...,0.211921,0.0,0.0,0.41954,0.235632,0.189655,0.155172,0.0,0.0,False
5,6,,236,173,0.733051,68,40,34,31,0,...,0.179191,0.0,0.0,0.352113,0.239437,0.197183,0.211268,0.0,0.0,False
6,7,,265,202,0.762264,99,40,30,33,0,...,0.163366,0.0,0.0,0.491124,0.177515,0.171598,0.159763,0.0,0.0,False
7,8,1.0,109,91,0.834862,44,23,14,10,0,...,0.10989,0.0,0.0,0.464789,0.239437,0.183099,0.112676,0.0,0.0,False
8,9,1.0,281,226,0.80427,113,55,29,29,0,...,0.128319,0.0,0.0,0.435897,0.282051,0.153846,0.128205,0.0,0.0,False
9,10,1.0,308,247,0.801948,108,61,46,32,0,...,0.129555,0.0,0.0,0.414365,0.254144,0.198895,0.132597,0.0,0.0,False


Unnamed: 0,module_id,total_tokens,known_tokens,known_share,tokens_A1,tokens_A2,tokens_B1,tokens_B2,tokens_C1,tokens_C2,...,tokens_pct_B2,tokens_pct_C1,tokens_pct_C2,types_pct_A1,types_pct_A2,types_pct_B1,types_pct_B2,types_pct_C1,types_pct_C2,is_sparse_cefr
0,1.0,7845,6201,0.79044,3061,1271,1043,826,0,0,...,0.133204,0.0,0.0,0.458854,0.219108,0.186497,0.135541,0.0,0.0,0
1,2.0,6922,5452,0.787634,2476,1205,1068,703,0,0,...,0.128944,0.0,0.0,0.418766,0.241904,0.203709,0.135621,0.0,0.0,0
2,3.0,7112,5610,0.788808,2469,1302,1057,782,0,0,...,0.139394,0.0,0.0,0.408957,0.235988,0.204612,0.150442,0.0,0.0,0
3,4.0,7135,5590,0.783462,2649,1129,1087,725,0,0,...,0.129696,0.0,0.0,0.437302,0.21164,0.201852,0.149206,0.0,0.0,0
4,5.0,7252,5883,0.811224,2526,1337,1185,835,0,0,...,0.141934,0.0,0.0,0.407468,0.235036,0.205107,0.152389,0.0,0.0,0
5,6.0,22222,17903,0.805643,8010,3948,3464,2481,0,0,...,0.13858,0.0,0.0,0.413727,0.228013,0.206704,0.151556,0.0,0.0,2
6,,1248,913,0.731571,363,210,170,170,0,0,...,0.186199,0.0,0.0,0.386503,0.219325,0.216258,0.177914,0.0,0.0,2


In [13]:
# =========================
# CELL 4 — LEVENSHTEIN "international similarity" (EN lemma vs RU translation translit)
# =========================

# Если нужно установить зависимости в Colab — раскомментируй:
# !pip -q install python-Levenshtein transliterate deep-translator
# !python -m spacy download en_core_web_sm

import re
import time
import os
import numpy as np
import pandas as pd
from collections import Counter, defaultdict

import spacy
import Levenshtein
from transliterate import translit
from deep_translator import GoogleTranslator


# ---------- SETTINGS ----------
TOP_K = 2500
# TOP_K=None  # если хочешь переводить ВСЕ уникальные слова (может быть долго/лимиты)

EXCLUDE_STOPWORDS = True
BATCH_SIZE = 16
SLEEP_SEC = 0.15          # пауза между запросами перевода (бережнее к лимитам)
SAVE_EVERY = 200          # как часто сохранять кэш на диск
CACHE_PATH = "lev_translation_cache.csv"

THRESHOLDS = [0.6, 0.7]   # доли "похожих" слов

# sparse-флаг по страницам (как раньше)
MIN_TOKENS_LEV = 50


# ---------- NLP INIT ----------
nlp = spacy.load("en_core_web_sm", disable=["ner"])
try:
    nlp.max_length = 5_000_000
except Exception:
    pass

translator = GoogleTranslator(source="en", target="ru")

LEMMA_RE = re.compile(r"^[a-z]+$")


def iter_lemmas(doc):
    """EN lemmas from spaCy doc."""
    for token in doc:
        if token.is_punct or token.is_space:
            continue
        if EXCLUDE_STOPWORDS and token.is_stop:
            continue
        if not token.is_alpha:
            continue
        lemma = (token.lemma_ or token.text).lower().strip()
        if not lemma or not LEMMA_RE.match(lemma):
            continue
        yield lemma


def ru_to_lat_clean(s: str) -> str:
    """RU -> translit latin + keep only a-z."""
    s = str(s or "").lower()
    try:
        s = translit(s, "ru", reversed=True)
    except Exception:
        # если внезапно не кириллица / ошибка транслитерации
        pass
    s = re.sub(r"[^a-z]", "", s)
    return s


# ---------- (1) collect lemma counts: global + per page ----------
page_meta = [(p.page_num, p.module_id) for p in pages]
page_texts = [p.text_en for p in pages]

global_freq = Counter()
page_freqs = []  # list[Counter]

for doc in nlp.pipe(page_texts, batch_size=BATCH_SIZE):
    c = Counter(iter_lemmas(doc))
    page_freqs.append(c)
    global_freq.update(c)

print("Total unique lemmas:", len(global_freq))
print("Total lemma tokens:", sum(global_freq.values()))


# ---------- (2) choose vocab to translate ----------
vocab_sorted = [w for w, _ in global_freq.most_common()]  # by frequency
if TOP_K is not None:
    vocab_to_translate = vocab_sorted[:TOP_K]
else:
    vocab_to_translate = vocab_sorted

print("Vocab to translate:", len(vocab_to_translate))


# ---------- (3) load / init translation cache ----------
cache = {}
if os.path.exists(CACHE_PATH):
    try:
        df_cache = pd.read_csv(CACHE_PATH)
        if {"word", "translation_ru"}.issubset(df_cache.columns):
            cache = dict(zip(df_cache["word"].astype(str), df_cache["translation_ru"].astype(str)))
            print("Loaded cache:", len(cache))
    except Exception as e:
        print("Cache load failed:", e)

def save_cache():
    df_cache = pd.DataFrame({"word": list(cache.keys()), "translation_ru": list(cache.values())})
    df_cache.to_csv(CACHE_PATH, index=False, encoding="utf-8")

# ---------- (4) translate + compute similarity per word ----------
rows = []
done = 0

for w in vocab_to_translate:
    if w in cache:
        tr = cache[w]
    else:
        try:
            tr = translator.translate(w)
        except Exception as e:
            tr = ""  # пусто = нет перевода/ошибка
        cache[w] = tr
        time.sleep(SLEEP_SEC)

    tr_lat = ru_to_lat_clean(tr)
    sim = Levenshtein.ratio(w, tr_lat) if tr_lat else np.nan

    rows.append({
        "word": w,
        "frequency": int(global_freq[w]),
        "translation_ru": tr,
        "translation_lat": tr_lat,
        "similarity": sim
    })

    done += 1
    if done % SAVE_EVERY == 0:
        save_cache()
        print("...saved cache", done)

# финальное сохранение кэша
save_cache()

df_lev_words = pd.DataFrame(rows)
df_lev_words = df_lev_words.sort_values(["frequency"], ascending=False).reset_index(drop=True)

display(df_lev_words.head(20))
print("Mean similarity (unweighted):", float(df_lev_words["similarity"].mean()))


# ---------- (5) similarity map for aggregations ----------
sim_map = dict(zip(df_lev_words["word"], df_lev_words["similarity"]))

def weighted_mean_similarity(counter: Counter) -> float:
    num = 0.0
    den = 0
    for w, cnt in counter.items():
        sim = sim_map.get(w, np.nan)
        if sim is None or np.isnan(sim):
            continue
        num += sim * cnt
        den += cnt
    return (num / den) if den else np.nan

def token_share_above(counter: Counter, thr: float) -> float:
    num = 0
    den = 0
    for w, cnt in counter.items():
        sim = sim_map.get(w, np.nan)
        if sim is None or np.isnan(sim):
            continue
        den += cnt
        if sim >= thr:
            num += cnt
    return (num / den) if den else np.nan

def type_share_above(counter: Counter, thr: float) -> float:
    words = [w for w in counter.keys() if not np.isnan(sim_map.get(w, np.nan))]
    if not words:
        return np.nan
    good = sum(1 for w in words if sim_map.get(w, np.nan) >= thr)
    return good / len(words)


# ---------- (6) by page ----------
by_page_rows = []
for (page_num, module_id), c in zip(page_meta, page_freqs):
    total_tokens = int(sum(c.values()))
    mean_w = weighted_mean_similarity(c)

    row = {
        "page_num": page_num,
        "module_id": module_id,
        "total_tokens": total_tokens,
        "lev_mean_weighted": mean_w,
        "is_sparse_lev": total_tokens < MIN_TOKENS_LEV
    }
    for thr in THRESHOLDS:
        row[f"token_share_ge_{thr}"] = token_share_above(c, thr)
        row[f"type_share_ge_{thr}"] = type_share_above(c, thr)
    by_page_rows.append(row)

df_lev_by_page = pd.DataFrame(by_page_rows).sort_values("page_num").reset_index(drop=True)
display(df_lev_by_page.head(10))


# ---------- (7) by module ----------
module_counters = defaultdict(Counter)
for (page_num, module_id), c in zip(page_meta, page_freqs):
    module_counters[module_id].update(c)

by_module_rows = []
for module_id, c in module_counters.items():
    total_tokens = int(sum(c.values()))
    row = {
        "module_id": module_id,
        "total_tokens": total_tokens,
        "lev_mean_weighted": weighted_mean_similarity(c),
    }
    for thr in THRESHOLDS:
        row[f"token_share_ge_{thr}"] = token_share_above(c, thr)
        row[f"type_share_ge_{thr}"] = type_share_above(c, thr)
    by_module_rows.append(row)

df_lev_by_module = pd.DataFrame(by_module_rows).sort_values("module_id").reset_index(drop=True)
display(df_lev_by_module.head(20))


# ---------- (8) Save outputs ----------
df_lev_words.to_csv("lev_words_table.csv", index=False, encoding="utf-8")
df_lev_by_page.to_csv("lev_by_page.csv", index=False, encoding="utf-8")
df_lev_by_module.to_csv("lev_by_module.csv", index=False, encoding="utf-8")

with pd.ExcelWriter("lev_outputs.xlsx") as w:
    df_lev_words.to_excel(w, sheet_name="word_table", index=False)
    df_lev_by_page.to_excel(w, sheet_name="by_page", index=False)
    df_lev_by_module.to_excel(w, sheet_name="by_module", index=False)

print("OK -> lev_outputs.xlsx + lev_words_table.csv / lev_by_page.csv / lev_by_module.csv")
print("Cache saved ->", CACHE_PATH)


Total unique lemmas: 6660
Total lemma tokens: 59736
Vocab to translate: 2500
...saved cache 200
...saved cache 400
...saved cache 600
...saved cache 800


KeyboardInterrupt: 

In [14]:
# =========================
# CELL 3.1 — EXERCISE BLOCKS (better extraction for RU textbooks)
# =========================

import re
import pandas as pd

# -------------------------
# Heuristics dictionaries
# -------------------------

# Частые "служебные/мусорные" строки (титул, выходные данные, оглавление, копирайты)
FILTER_PATTERNS = [
    r"^\s*(удк|ббк)\b",
    r"\bISBN\b",
    r"^\s*©",
    r"\bExpress Publishing\b",
    r"\bPROSVESHCHENIYE\b",
    r"\bPUBLISHERS\b",
    r"^\s*Отпечатано\b",
    r"^\s*Подписано в печать\b",
    r"^\s*Тираж\b",
    r"^\s*Заказ\b",
    r"\bhttp[s]?://\S+",
    r"\bwww\.\S+",
    r"\be-mail\b",
    r"^\s*Contents\b",
    r"^\s*Содержание\b",
    r"^\s*Authors?\b",
    r"^\s*Acknowledgements?\b",
]

# Заголовки-разделители (не упражнение)
SECTION_PATTERNS = [
    r"^\s*(Module|MODULE)\s+\d+\b",
    r"^\s*(Starter module|STARTER MODULE)\b",
    r"^\s*(Vocabulary|VOCABULARY)\b",
    r"^\s*(Grammar|GRAMMAR)\b",
    r"^\s*(Everyday English|EVERYDAY ENGLISH)\b",
    r"^\s*(Study skills|STUDY SKILLS)\b",
    r"^\s*(Writing|WRITING)\b",
    r"^\s*(Reading|READING)\b",
    r"^\s*(Listening|LISTENING)\b",
    r"^\s*(Speaking|SPEAKING)\b",
    r"^\s*(Culture Corner|CULTURE CORNER)\b",
    r"^\s*(Revision|REVISION)\b",
    r"^\s*(Word List|WORD LIST)\b",
    r"^\s*(Irregular Verbs|IRREGULAR VERBS)\b",
    r"^\s*(Vocabulary Bank|VOCABULARY BANK)\b",
    r"^\s*(Writing Bank|WRITING BANK)\b",
    r"^\s*(Grammar Reference|GRAMMAR REFERENCE)\b",
    r"^\s*(Use of English|USE OF ENGLISH)\b",
]

# Императивы RU/EN (для коротких инструкций)
RU_IMP = [
    "прочитай", "прочтите", "послушай", "послушайте", "сделай", "сделайте",
    "выполни", "выполните", "выбери", "выберите", "вставь", "вставьте",
    "заполни", "заполните", "соедини", "соедините", "сопоставь", "сопоставьте",
    "ответь", "ответьте", "обсуди", "обсудите", "скажи", "скажите", "напиши",
    "напишите", "составь", "составьте", "переведи", "переведите", "найди", "найдите",
    "подчеркни", "подчеркните", "распредели", "распределите", "поставь", "поставьте",
    "отметь", "отметьте", "догадайся", "догадайтесь", "закончи", "закончите",
    "раскрой", "раскройте", "определи", "определите"
]

EN_IMP = [
    "read", "listen", "match", "choose", "complete", "fill", "write", "answer",
    "discuss", "look", "find", "say", "talk", "check", "circle", "underline"
]

RU_IMP_RE = re.compile(r"\b(" + "|".join(map(re.escape, RU_IMP)) + r")\b", re.IGNORECASE)
EN_IMP_RE = re.compile(r"\b(" + "|".join(map(re.escape, EN_IMP)) + r")\b", re.IGNORECASE)

FILTER_RE = re.compile("|".join(FILTER_PATTERNS), re.IGNORECASE)
SECTION_RE = re.compile("|".join(SECTION_PATTERNS), re.IGNORECASE)

# -------------------------
# Low-level helpers
# -------------------------

def is_filtered_line(line: str) -> bool:
    t = (line or "").strip()
    if not t:
        return True
    # одиночные номера страниц / мусор
    if re.fullmatch(r"\d{1,4}", t):
        return True
    return bool(FILTER_RE.search(t))

def is_section_heading(line: str) -> bool:
    t = (line or "").strip()
    if not t:
        return False
    # капс-заголовки (часто разделители)
    letters = re.sub(r"[^A-Za-zА-Яа-яЁё]", "", t)
    if letters and sum(ch.isupper() for ch in letters) / max(len(letters), 1) > 0.9 and len(letters) >= 8:
        return True
    return bool(SECTION_RE.search(t))

def latin_ratio(line: str) -> float:
    letters = re.findall(r"[A-Za-zА-Яа-яЁё]", line or "")
    if not letters:
        return 0.0
    lat = sum(1 for ch in letters if "A" <= ch <= "Z" or "a" <= ch <= "z")
    return lat / len(letters)

def looks_like_wordlist(line: str) -> bool:
    """
    Частые "Vocabulary lists":
    - почти вся строка латиница
    - много разделителей ; , •
    - мало глагольных инструкций
    """
    t = (line or "").strip()
    if not t:
        return False
    if latin_ratio(t) < 0.85:
        return False
    if (t.count(",") + t.count(";") + t.count("•")) >= 3 and not EN_IMP_RE.search(t):
        return True
    # просто набор слов
    if len(t.split()) >= 6 and not EN_IMP_RE.search(t) and not re.search(r"[.!?]$", t):
        return True
    return False

def is_numbered_task(line: str) -> bool:
    """
    Старт задания:
      1) ...
      1. ...
      1 ...
      12 a) ...
      Ex. 3 ...
    + защита от "pp. 7-21" и годов/выходных данных
    """
    t = (line or "").strip()
    if not t:
        return False

    # исключим "pp. 7-21", "p. 26"
    if re.search(r"\bpp?\.\s*\d", t, re.IGNORECASE):
        return False

    # Ex. 1 / Exercise 1
    if re.match(r"^(?:ex\.?|exercise)\s*\d+\b", t, re.IGNORECASE):
        return True

    m = re.match(r"^(\d{1,2})(?:\s*[.)]|)\s+(\S.*)$", t)
    if not m:
        return False

    n = int(m.group(1))
    # разумные номера упражнений, чтобы не ловить 2013/127521 и т.п.
    if not (1 <= n <= 50):
        return False

    tail = m.group(2)
    # если хвост выглядит как "pp. 7-21" — уже отрезали; доп. защита:
    if re.match(r"^\d+[-–]\d+$", tail):
        return False

    return True

def is_short_instruction(line: str) -> bool:
    t = (line or "").strip()
    if not t:
        return False
    if len(t) > 120:
        return False
    if looks_like_wordlist(t):
        return False
    return bool(RU_IMP_RE.search(t) or EN_IMP_RE.search(t))

def has_numbered_nearby(lines: list[str], i: int, window: int = 2) -> bool:
    for j in range(i + 1, min(len(lines), i + 1 + window)):
        tj = (lines[j] or "").strip()
        if not tj:
            continue
        if is_numbered_task(tj):
            return True
        # если встретили крупный заголовок — дальше не смотрим
        if is_section_heading(tj) or is_filtered_line(tj):
            break
    return False

def strip_task_prefix(s: str) -> str:
    s = (s or "").strip()
    s = re.sub(r"^(?:ex\.?|exercise)\s*\d+\s*", "", s, flags=re.IGNORECASE)
    s = re.sub(r"^\d{1,2}\s*[.)]?\s*", "", s)  # 1) / 1. / 1
    s = re.sub(r"\s+", " ", s).strip()
    return s

def to_ru_layer(s: str) -> str:
    # оставляем кириллицу+цифры+пунктуацию
    tokens = re.findall(r"[А-Яа-яЁё]+(?:-[А-Яа-яЁё]+)?|[0-9]+|[.!?,;:\-—()\[\]\"“”'…]+", s or "")
    out = " ".join(tokens)
    out = re.sub(r"\s+([.!?,;:])", r"\1", out)
    return out.strip()

def to_en_layer(s: str) -> str:
    tokens = re.findall(r"[A-Za-z]+(?:'[A-Za-z]+)?|[0-9]+|[.!?,;:\-—()\[\]\"“”'…]+", s or "")
    out = " ".join(tokens)
    out = re.sub(r"\s+([.!?,;:])", r"\1", out)
    return out.strip()

# -------------------------
# Core extraction per page
# -------------------------

def extract_exercise_blocks_from_page(text_clean: str) -> list[dict]:
    """
    State machine:
    - фильтруем мусорные строки
    - section headings режут поток
    - старт задания: numbered OR short-instruction рядом с numbered OR в exercise_mode
    """
    lines_raw = (text_clean or "").splitlines()
    # сохраним пустые строки как границы, но "мусорные" удалим
    lines = []
    for ln in lines_raw:
        t = ln.rstrip()
        if not t.strip():
            lines.append("")  # граница
            continue
        if is_filtered_line(t):
            continue
        lines.append(t)

    blocks = []
    cur = []
    exercise_mode = False
    started_reason = None

    def flush():
        nonlocal cur, started_reason
        text = "\n".join([x for x in cur if x != ""]).strip()
        if text:
            blocks.append({"block_text": text, "start_reason": started_reason or ""})
        cur = []
        started_reason = None

    for i, ln in enumerate(lines):
        t = (ln or "").strip()

        # границы абзацев не всегда конец блока — но если блок большой, можно завершить по двойной пустоте
        if t == "":
            # оставляем пустую строку внутри блока максимум как разделитель
            if cur and (len(cur) == 0 or cur[-1] != ""):
                cur.append("")
            continue

        # section heading — всегда граница режима
        if is_section_heading(t):
            flush()
            exercise_mode = False
            continue

        # старт задания
        start = False
        if is_numbered_task(t):
            start = True
            started_reason = "numbered_task"
        else:
            # короткая инструкция как старт — только если рядом есть numbered или мы уже в режиме упражнений
            if is_short_instruction(t) and (exercise_mode or has_numbered_nearby(lines, i, window=2)):
                start = True
                started_reason = "short_instruction_near_numbered" if not exercise_mode else "short_instruction_in_mode"

        if start:
            flush()
            exercise_mode = True
            cur.append(t)
        else:
            # если не в режиме упражнений — пропускаем всё, что похоже на wordlist/оглавление
            if not exercise_mode:
                if looks_like_wordlist(t):
                    continue
                # иногда “Vocabulary” списки без заголовка — тоже игнорим
                if latin_ratio(t) > 0.9 and not EN_IMP_RE.search(t) and len(t.split()) >= 6:
                    continue
            # если в режиме — накапливаем
            if exercise_mode:
                cur.append(t)

    flush()
    return blocks


# -------------------------
# Build ex_df for ALL pages
# -------------------------

rows = []
ex_id = 0

for p in pages:
    page_blocks = extract_exercise_blocks_from_page(p.text_clean)

    for b_i, b in enumerate(page_blocks, start=1):
        block_text = b["block_text"]

        # инструкция: первая непустая строка блока (подрезаем префиксы "1) ")
        first_line = next((x.strip() for x in block_text.splitlines() if x.strip()), "")
        instr_raw = strip_task_prefix(first_line)

        # если инструкция совсем пустая — пропускаем
        if not instr_raw or len(instr_raw) < 6:
            continue

        # слои
        instr_ru = to_ru_layer(instr_raw)
        instr_en = to_en_layer(instr_raw)

        # "best" — если кириллицы много, берем RU, иначе EN
        best = instr_ru if len(re.findall(r"[А-Яа-яЁё]", instr_ru)) >= 3 else instr_en

        ex_id += 1
        rows.append({
            "exercise_id": ex_id,
            "page_num": p.page_num,
            "module_id": p.module_id,
            "block_local_id": b_i,
            "start_reason": b.get("start_reason", ""),
            "instruction_raw": instr_raw,
            "instruction_ru": instr_ru,
            "instruction_en": instr_en,
            "instruction_best": best,
            "block_text": block_text,
        })

ex_df = pd.DataFrame(rows)

print("Extracted exercise blocks:", len(ex_df))
display(ex_df.head(15))

# Сохраним для дебага
ex_df.to_excel("exercise_blocks_extracted.xlsx", index=False)
print("OK -> exercise_blocks_extracted.xlsx")


Extracted exercise blocks: 2704


Unnamed: 0,exercise_id,page_num,module_id,block_local_id,start_reason,instruction_raw,instruction_ru,instruction_en,instruction_best,block_text
0,1,6,,1,numbered_task,Shopping 4 Match the words to form phrases.,4.,Shopping 4 Match the words to form phrases.,Shopping 4 Match the words to form phrases.,Shopping 4 Match the words to form phrases.
1,2,6,,2,short_instruction_in_mode,designer A conditions 2 recycled B prices 3 wo...,2 3 4 5 6 7 8,designer A conditions 2 recycled B prices 3 wo...,designer A conditions 2 recycled B prices 3 wo...,1 designer A conditions 2 recycled B prices 3 ...
2,3,6,,3,numbered_task,Shops 2 Write the name of the shop.,2.,Shops 2 Write the name of the shop.,Shops 2 Write the name of the shop.,Shops 2 Write the name of the shop.
3,4,6,,4,short_instruction_in_mode,It sells boots and sandals. s ___ s ___ 2 You ...,. 2. 3. ' 4. ' 5. 6. ' 7. ' 8. ' 9. ' 10.,It sells boots and sandals. s s 2 You can find...,It sells boots and sandals. s s 2 You can find...,1 It sells boots and sandals. s ___ s ___ 2 Yo...
4,5,6,,5,numbered_task,Faulty products 3 Choose the correct word.,3.,Faulty products 3 Choose the correct word.,Faulty products 3 Choose the correct word.,Faulty products 3 Choose the correct word.
5,6,6,,6,numbered_task,I can’t carry the bag. The strap is broken/inj...,..,I can t carry the bag. The strap is broken inj...,I can t carry the bag. The strap is broken inj...,1 I can’t carry the bag. The strap is broken/i...
6,7,6,,7,numbered_task,I need to have the lens replaced. It is scratc...,..,I need to have the lens replaced. It is scratc...,I need to have the lens replaced. It is scratc...,2 I need to have the lens replaced. It is scra...
7,8,6,,8,numbered_task,Don’t use this teapot. The lid is cracked/torn.,..,Don t use this teapot. The lid is cracked torn.,Don t use this teapot. The lid is cracked torn.,3 Don’t use this teapot. The lid is cracked/torn.
8,9,6,,9,numbered_task,Don’t drink from this mug. There’s a hole/chip...,..,Don t drink from this mug. There s a hole chip...,Don t drink from this mug. There s a hole chip...,4 Don’t drink from this mug. There’s a hole/ch...
9,10,6,,10,numbered_task,I can’t wear my sandals. The heels are cracked...,..,I can t wear my sandals. The heels are cracked...,I can t wear my sandals. The heels are cracked...,5 I can’t wear my sandals. The heels are crack...


OK -> exercise_blocks_extracted.xlsx


# **Автоматическая обработка**

In [15]:
from google.colab import drive
drive.mount('/content/drive')

# ВАЖНО: путь должен 1-в-1 совпадать с именами папок в MyDrive
ROOT_DIR = "/content/drive/MyDrive/EduText Analyzer/Учебники TXT"

import os
print("ROOT exists:", os.path.exists(ROOT_DIR))
print("Subfolders:", os.listdir(ROOT_DIR))


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
ROOT exists: True
Subfolders: ['spotlight-txt', 'starlight-txt', 'books_manifest.csv']


In [16]:
import re
from pathlib import Path
import pandas as pd

def slugify(s: str) -> str:
    s = s.lower()
    s = re.sub(r"\.txt$", "", s)
    s = re.sub(r"[^a-z0-9]+", "_", s)
    s = re.sub(r"_+", "_", s).strip("_")
    return s

def infer_series_from_parent(p: Path) -> str:
    # spotlight-txt -> spotlight
    return p.parent.name.replace("-txt", "").strip()

def infer_grade_from_name(name: str):
    # spotlight-2.txt -> 2
    # starlight-2-1.txt -> 2 (первое число)
    m = re.search(r"(\d{1,2})", name)
    return int(m.group(1)) if m else ""

def infer_title(series: str, grade, filename: str) -> str:
    if grade != "":
        return f"{series.title()} {grade}"
    # если не нашли класс — просто имя файла
    return filename.replace(".txt", "")

def detect_lang_quick(text: str) -> str:
    # грубая эвристика: по доле кириллицы/латиницы в сэмпле
    sample = (text or "")[:20000]
    cyr = len(re.findall(r"[А-Яа-яЁё]", sample))
    lat = len(re.findall(r"[A-Za-z]", sample))
    total = cyr + lat
    if total == 0:
        return ""
    cyr_ratio = cyr / total
    lat_ratio = lat / total
    if cyr_ratio > 0.7:
        return "ru"
    if lat_ratio > 0.7:
        return "en"
    return "mixed"

txt_files = sorted(Path(ROOT_DIR).rglob("*.txt"))
print("Found txt files:", len(txt_files))
print("Example:", txt_files[:3])

rows = []
for p in txt_files:
    series = infer_series_from_parent(p)
    grade = infer_grade_from_name(p.name)
    title = infer_title(series, grade, p.name)

    # быстрый детект языка по первым 20k символов
    try:
        with open(p, "r", encoding="utf-8") as f:
            lang = detect_lang_quick(f.read())
    except Exception:
        lang = ""

    rows.append({
        "book_id": slugify(f"{series}_{p.stem}"),
        "title": title,
        "series": series,
        "grade": grade,
        "year": "",         # можно заполнить позже, если нужно
        "txt_path": str(p),
        "lang": lang
    })

manifest = pd.DataFrame(rows).sort_values(["series", "grade", "title"])
manifest_path = f"{ROOT_DIR}/books_manifest.csv"
manifest.to_csv(manifest_path, index=False, encoding="utf-8")

display(manifest.head(20))
print("Saved manifest ->", manifest_path)


Found txt files: 22
Example: [PosixPath('/content/drive/MyDrive/EduText Analyzer/Учебники TXT/spotlight-txt/spotlight-10.txt'), PosixPath('/content/drive/MyDrive/EduText Analyzer/Учебники TXT/spotlight-txt/spotlight-2.txt'), PosixPath('/content/drive/MyDrive/EduText Analyzer/Учебники TXT/spotlight-txt/spotlight-3.txt')]


Unnamed: 0,book_id,title,series,grade,year,txt_path,lang
1,spotlight_spotlight_2,Spotlight 2,spotlight,2,,/content/drive/MyDrive/EduText Analyzer/Учебни...,mixed
2,spotlight_spotlight_3,Spotlight 3,spotlight,3,,/content/drive/MyDrive/EduText Analyzer/Учебни...,en
3,spotlight_spotlight_4,Spotlight 4,spotlight,4,,/content/drive/MyDrive/EduText Analyzer/Учебни...,en
4,spotlight_spotlight_5,Spotlight 5,spotlight,5,,/content/drive/MyDrive/EduText Analyzer/Учебни...,en
5,spotlight_spotlight_6,Spotlight 6,spotlight,6,,/content/drive/MyDrive/EduText Analyzer/Учебни...,en
6,spotlight_spotlight_7,Spotlight 7,spotlight,7,,/content/drive/MyDrive/EduText Analyzer/Учебни...,en
7,spotlight_spotlight_8,Spotlight 8,spotlight,8,,/content/drive/MyDrive/EduText Analyzer/Учебни...,en
8,spotlight_spotlight_9,Spotlight 9,spotlight,9,,/content/drive/MyDrive/EduText Analyzer/Учебни...,en
0,spotlight_spotlight_10,Spotlight 10,spotlight,10,,/content/drive/MyDrive/EduText Analyzer/Учебни...,en
10,starlight_starlight_2_1,Starlight 2,starlight,2,,/content/drive/MyDrive/EduText Analyzer/Учебни...,mixed


Saved manifest -> /content/drive/MyDrive/EduText Analyzer/Учебники TXT/books_manifest.csv


In [17]:
# =========================
# CELL C — Catalog batch helpers + metrics + CEFR + Levenshtein
# =========================

!pip -q install textstat spacy python-Levenshtein deep-translator transliterate
!python -m spacy download en_core_web_sm -q

import os, re, json, math, time, traceback
from pathlib import Path
from dataclasses import asdict
from typing import Optional, Dict, List, Tuple

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import textstat
import spacy

import Levenshtein
from transliterate import translit
from deep_translator import GoogleTranslator


# -------------------------
# CONFIG
# -------------------------
CEFR_CSV_PATH = "/content/drive/MyDrive/EduText Analyzer/Cefr/len_cefr.csv"  # <-- поправь под свой путь
ENABLE_CEFR = True
ENABLE_LEV  = True   # если будет долго/лимиты — поставь False и посчитаешь позже отдельным прогоном
LEV_TOP_K   = 2500   # как у тебя
LEV_CACHE_PATH_DEFAULT = None  # зададим при batch-run через OUT_ROOT/cache/...

# -------------------------
# SAVE UTILS
# -------------------------
def ensure_dir(p: str):
    os.makedirs(p, exist_ok=True)

def relpath(path: str, start: str) -> str:
    try:
        return os.path.relpath(path, start)
    except Exception:
        return path

def save_table_bundle(df: pd.DataFrame, out_dir: str, name: str) -> Dict[str, str]:
    ensure_dir(out_dir)
    csv_path  = os.path.join(out_dir, f"{name}.csv")
    xlsx_path = os.path.join(out_dir, f"{name}.xlsx")
    html_path = os.path.join(out_dir, f"{name}.html")
    df.to_csv(csv_path, index=False, encoding="utf-8")
    df.to_excel(xlsx_path, index=False)
    df.to_html(html_path, index=False)
    return {"csv": csv_path, "xlsx": xlsx_path, "html": html_path}

def save_plot(fig, out_dir: str, name: str) -> str:
    ensure_dir(out_dir)
    path = os.path.join(out_dir, f"{name}.png")
    fig.savefig(path, dpi=200, bbox_inches="tight")
    plt.close(fig)
    return path


# -------------------------
# METRICS (by page / by module) — как в твоём CELL 2, только в функциях
# -------------------------
def tokenize_en(text: str) -> List[str]:
    return re.findall(r"[A-Za-z]+(?:'[A-Za-z]+)?", (text or "").lower())

def compute_ttr_family(tokens: List[str], segment_len: int = 100) -> Dict:
    n = len(tokens)
    if n == 0:
        return {"tokens": 0, "types": 0, "ttr": np.nan, "rttr": np.nan, "cttr": np.nan, f"msttr_{segment_len}": np.nan}
    types = len(set(tokens))
    ttr  = types / n
    rttr = types / math.sqrt(n)
    cttr = types / math.sqrt(2 * n)

    n_full = n // segment_len
    if n_full > 0:
        seg_ttrs = []
        for i in range(n_full):
            seg = tokens[i*segment_len:(i+1)*segment_len]
            seg_ttrs.append(len(set(seg)) / segment_len)
        msttr = float(np.mean(seg_ttrs))
    else:
        msttr = np.nan

    return {"tokens": n, "types": types, "ttr": ttr, "rttr": rttr, "cttr": cttr, f"msttr_{segment_len}": msttr}

def safe_div(a, b):
    return a / b if b else np.nan

def compute_textstat_metrics(text: str) -> Dict:
    text = text or ""
    words = textstat.lexicon_count(text)
    sents = textstat.sentence_count(text)
    syll  = textstat.syllable_count(text)
    return {
        "flesch_reading_ease": textstat.flesch_reading_ease(text) if words else np.nan,
        "flesch_kincaid_grade": textstat.flesch_kincaid_grade(text) if words else np.nan,
        "words_total": words,
        "sentences_total": sents,
        "syllables_total": syll,
        "avg_words_per_sentence": safe_div(words, sents),
    }

def compute_metrics_for_text(text: str, segment_len: int = 100) -> Dict:
    m = {}
    m.update(compute_textstat_metrics(text))
    tokens = tokenize_en(text)
    m.update(compute_ttr_family(tokens, segment_len=segment_len))
    return m

def compute_metrics_df_by_page(pages, segment_len=100, min_tokens=50, min_words=50) -> pd.DataFrame:
    rows = []
    for p in pages:
        m = compute_metrics_for_text(p.text_en, segment_len=segment_len)
        rows.append({"page_num": p.page_num, "module_id": p.module_id, **m})
    df = pd.DataFrame(rows).sort_values("page_num")

    df["is_sparse"] = (df["tokens"] < min_tokens) | (df["words_total"] < min_words)

    def sparse_reason(row):
        reasons = []
        if row["tokens"] < min_tokens:
            reasons.append(f"tokens<{min_tokens}")
        if row["words_total"] < min_words:
            reasons.append(f"words<{min_words}")
        return ", ".join(reasons) if reasons else ""

    df["sparse_reason"] = df.apply(sparse_reason, axis=1)
    return df

def compute_metrics_df_by_module(by_page: pd.DataFrame) -> pd.DataFrame:
    # как ты делала: агрегируем по НЕ sparse страницам
    return (
        by_page[~by_page["is_sparse"]]
        .groupby("module_id", dropna=False)
        .mean(numeric_only=True)
        .reset_index()
    )


# -------------------------
# CEFR
# -------------------------
CEFR_ORDER = {"A1": 1, "A2": 2, "B1": 3, "B2": 4, "C1": 5, "C2": 6}

def load_cefr_lexicon(csv_path: str, map_c1_to_b1: bool = True) -> Dict[str, str]:
    # грузим csv; поддержим случай с ';'
    with open(csv_path, "r", encoding="utf-8") as f:
        content = f.read()
    if ";" in content and "," not in content:
        content = content.replace(";", ",")
    # читаем через pandas
    from io import StringIO
    df = pd.read_csv(StringIO(content))
    # ожидаем колонки Word и CEFR Level
    word_levels = {}
    for _, row in df.iterrows():
        w = str(row["Word"]).strip().lower()
        lvl = str(row["CEFR Level"]).strip().upper()
        if map_c1_to_b1 and lvl == "C1":
            lvl = "B1"
        if w in word_levels:
            cur = word_levels[w]
            if CEFR_ORDER.get(lvl, 99) < CEFR_ORDER.get(cur, 99):
                word_levels[w] = lvl
        else:
            word_levels[w] = lvl
    return word_levels

def compute_cefr_tables(pages, word_levels: Dict[str, str], nlp) -> Tuple[pd.DataFrame, pd.DataFrame, Dict]:
    """
    Возвращает:
    - cefr_word_table: lemma, level, frequency, pos
    - cefr_by_page: page_num + counts/percents по уровням
    - cefr_summary: общий процент по токенам
    """
    level_names = ["A1", "A2", "B1", "B2", "C1", "C2"]

    lemma_freq = {}
    lemma_pos_counts = {}  # lemma -> Counter(pos)
    page_level_counts = []  # list of dicts per page
    page_token_total = []

    texts = [p.text_en or "" for p in pages]
    # nlp.pipe быстрее
    for p, doc in zip(pages, nlp.pipe(texts, batch_size=16)):
        lvl_counter = dict.fromkeys(level_names, 0)
        token_total = 0

        for tok in doc:
            if not tok.is_alpha or tok.is_stop:
                continue
            lemma = tok.lemma_.lower()
            token_total += 1

            # частоты лемм
            lemma_freq[lemma] = lemma_freq.get(lemma, 0) + 1
            # POS
            if lemma not in lemma_pos_counts:
                lemma_pos_counts[lemma] = {}
            lemma_pos_counts[lemma][tok.pos_] = lemma_pos_counts[lemma].get(tok.pos_, 0) + 1

            lvl = word_levels.get(lemma)
            if lvl in lvl_counter:
                lvl_counter[lvl] += 1

        page_level_counts.append({
            "page_num": p.page_num,
            "module_id": p.module_id,
            **{f"tokens_{lvl}": lvl_counter[lvl] for lvl in level_names},
        })
        page_token_total.append(token_total)

    cefr_by_page = pd.DataFrame(page_level_counts).sort_values("page_num")
    cefr_by_page["tokens_total"] = page_token_total

    for lvl in level_names:
        cefr_by_page[f"pct_{lvl}"] = cefr_by_page.apply(
            lambda r: (r[f"tokens_{lvl}"] / r["tokens_total"]) if r["tokens_total"] else np.nan, axis=1
        )

    # word table
    rows = []
    for lemma, freq in lemma_freq.items():
        lvl = word_levels.get(lemma)
        if lvl not in CEFR_ORDER:
            continue
        # top pos
        pos_counts = lemma_pos_counts.get(lemma, {})
        if pos_counts:
            pos = max(pos_counts.items(), key=lambda x: x[1])[0]
        else:
            pos = "X"
        rows.append({"word": lemma, "level": lvl, "frequency": freq, "pos": pos})

    cefr_word_table = pd.DataFrame(rows)
    if not cefr_word_table.empty:
        cefr_word_table["level_order"] = cefr_word_table["level"].map(CEFR_ORDER)
        cefr_word_table = cefr_word_table.sort_values(["level_order", "frequency"], ascending=[True, False]).drop(columns=["level_order"])

    # summary by tokens
    total_by_level = {lvl: int(cefr_by_page[f"tokens_{lvl}"].sum()) for lvl in level_names} if not cefr_by_page.empty else {lvl: 0 for lvl in level_names}
    total_tokens = int(cefr_by_page["tokens_total"].sum()) if not cefr_by_page.empty else 0
    cefr_summary = {
        "tokens_total": total_tokens,
        "by_level_tokens": total_by_level,
        "by_level_pct": {lvl: (total_by_level[lvl] / total_tokens) if total_tokens else None for lvl in level_names}
    }

    return cefr_word_table, cefr_by_page, cefr_summary


# -------------------------
# LEVENSHTEIN (формальное сходство) с кэшем переводов
# -------------------------
def transliterate_ru_to_en(word_ru: str) -> str:
    try:
        return translit(str(word_ru), "ru", reversed=True).lower()
    except Exception:
        return ""

def load_translation_cache(cache_path: str) -> Dict[str, str]:
    cache = {}
    if cache_path and os.path.exists(cache_path):
        try:
            df = pd.read_csv(cache_path)
            if {"word", "translation_ru"}.issubset(df.columns):
                cache = dict(zip(df["word"].astype(str), df["translation_ru"].astype(str)))
        except Exception:
            pass
    return cache

def save_translation_cache(cache: Dict[str, str], cache_path: str):
    if not cache_path:
        return
    ensure_dir(os.path.dirname(cache_path))
    df = pd.DataFrame({"word": list(cache.keys()), "translation_ru": list(cache.values())})
    df.to_csv(cache_path, index=False, encoding="utf-8")

def compute_levenshtein_tables(pages, nlp, top_k: int, cache_path: str) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, Dict]:
    """
    Возвращает:
    - lev_words: word, frequency, translation_ru, translation_en_translit, similarity
    - lev_by_page: page_num, lev_mean_weighted, covered_tokens, total_tokens
    - lev_by_module: module_id, lev_mean_weighted_mean
    - lev_summary: общий mean
    """
    # 1) соберём частоты лемм по страницам (EN слой)
    page_lemma_counts = []
    total_lemma_freq = {}

    texts = [p.text_en or "" for p in pages]
    for p, doc in zip(pages, nlp.pipe(texts, batch_size=16)):
        c = {}
        for tok in doc:
            if not tok.is_alpha or tok.is_stop:
                continue
            lemma = tok.lemma_.lower()
            c[lemma] = c.get(lemma, 0) + 1
            total_lemma_freq[lemma] = total_lemma_freq.get(lemma, 0) + 1
        page_lemma_counts.append((p.page_num, p.module_id, c))

    # 2) top_k лемм для перевода
    vocab_sorted = sorted(total_lemma_freq.items(), key=lambda x: x[1], reverse=True)
    vocab = [w for w, _ in vocab_sorted[:top_k]]

    # 3) кэш переводов
    cache = load_translation_cache(cache_path)
    translator = GoogleTranslator(source="en", target="ru")

    missing = [w for w in vocab if w not in cache]
    # ⚠️ это может быть долго и упираться в лимиты — кэш спасает
    for i, w in enumerate(missing, start=1):
        try:
            cache[w] = translator.translate(w)
        except Exception:
            cache[w] = ""
        if i % 50 == 0:
            save_translation_cache(cache, cache_path)
            # небольшой троттлинг
            time.sleep(0.2)

    save_translation_cache(cache, cache_path)

    # 4) lev_words
    rows = []
    for w in vocab:
        tr_ru = cache.get(w, "")
        tr_en = transliterate_ru_to_en(tr_ru) if tr_ru else ""
        sim = Levenshtein.ratio(w, tr_en) if tr_en else np.nan
        rows.append({"word": w, "frequency": total_lemma_freq.get(w, 0), "translation_ru": tr_ru, "translation_en_translit": tr_en, "similarity": sim})
    lev_words = pd.DataFrame(rows)

    # 5) by_page weighted mean
    sim_map = dict(zip(lev_words["word"], lev_words["similarity"]))

    page_rows = []
    for (page_num, module_id, c) in page_lemma_counts:
        total_tokens = sum(c.values())
        # берём только покрытые слова (у которых есть similarity)
        w_sum = 0.0
        w_den = 0.0
        covered = 0

        for w, f in c.items():
            sim = sim_map.get(w, np.nan)
            if sim is None or (isinstance(sim, float) and np.isnan(sim)):
                continue
            w_sum += sim * f
            w_den += f
            covered += f

        mean_w = (w_sum / w_den) if w_den else np.nan
        page_rows.append({
            "page_num": page_num,
            "module_id": module_id,
            "lev_mean_weighted": mean_w,
            "covered_tokens": covered,
            "total_tokens": total_tokens,
            "coverage": (covered / total_tokens) if total_tokens else np.nan
        })

    lev_by_page = pd.DataFrame(page_rows).sort_values("page_num")

    # 6) by_module (среднее по страницам)
    lev_by_module = (
        lev_by_page.groupby("module_id", dropna=False)
        .agg(lev_mean_weighted_mean=("lev_mean_weighted", "mean"),
             coverage_mean=("coverage", "mean"))
        .reset_index()
    )

    lev_summary = {
        "lev_mean_weighted_overall": float(np.nanmean(lev_by_page["lev_mean_weighted"])) if not lev_by_page.empty else None,
        "coverage_mean_overall": float(np.nanmean(lev_by_page["coverage"])) if not lev_by_page.empty else None,
        "top_k": top_k
    }

    return lev_words, lev_by_page, lev_by_module, lev_summary


# -------------------------
# PLOTS
# -------------------------
def plot_line(df: pd.DataFrame, x: str, y: str, title: str, xlabel: str, ylabel: str):
    fig = plt.figure(figsize=(10, 4))
    plt.plot(df[x], df[y])
    plt.title(title)
    plt.xlabel(xlabel)
    plt.ylabel(ylabel)
    plt.tight_layout()
    return fig

def plot_cefr_distribution(cefr_summary: Dict):
    levels = ["A1","A2","B1","B2","C1","C2"]
    vals = [cefr_summary["by_level_pct"].get(lvl) or 0 for lvl in levels]
    fig = plt.figure(figsize=(8, 4))
    plt.bar(levels, vals)
    plt.title("CEFR distribution (by tokens)")
    plt.xlabel("Level")
    plt.ylabel("Share")
    plt.tight_layout()
    return fig


# -------------------------
# ONE BOOK ANALYSIS
# -------------------------
def analyze_one_book(book_row: Dict, out_root: str, nlp, word_levels: Optional[Dict], lev_cache_path: Optional[str]) -> Dict:
    book_id = book_row["book_id"]
    txt_path = book_row["txt_path"]

    book_dir   = os.path.join(out_root, "books", book_id)
    tables_dir = os.path.join(book_dir, "tables")
    plots_dir  = os.path.join(book_dir, "plots")
    ensure_dir(tables_dir); ensure_dir(plots_dir)

    # read text
    with open(txt_path, "r", encoding="utf-8") as f:
        text_raw = f.read()

    # preprocess (твой CELL 1 должен быть уже выполнен)
    pages = preprocess_document(text_raw)

    # metrics
    by_page = compute_metrics_df_by_page(pages)
    by_module = compute_metrics_df_by_module(by_page)

    tables = {}
    plots = {}
    tables["metrics_by_page"]   = save_table_bundle(by_page, tables_dir, "metrics_by_page")
    tables["metrics_by_module"] = save_table_bundle(by_module, tables_dir, "metrics_by_module")

    # plots for metrics
    if "flesch_reading_ease" in by_page.columns:
        fig = plot_line(by_page.dropna(subset=["flesch_reading_ease"]), "page_num", "flesch_reading_ease",
                        "Flesch Reading Ease by page", "Page", "Flesch Reading Ease")
        plots["flesch_reading_ease_by_page"] = save_plot(fig, plots_dir, "flesch_reading_ease_by_page")

    if "flesch_kincaid_grade" in by_page.columns:
        fig = plot_line(by_page.dropna(subset=["flesch_kincaid_grade"]), "page_num", "flesch_kincaid_grade",
                        "Flesch–Kincaid Grade by page", "Page", "FK Grade")
        plots["fk_grade_by_page"] = save_plot(fig, plots_dir, "fk_grade_by_page")

    if "ttr" in by_page.columns:
        fig = plot_line(by_page.dropna(subset=["ttr"]), "page_num", "ttr",
                        "TTR by page", "Page", "TTR")
        plots["ttr_by_page"] = save_plot(fig, plots_dir, "ttr_by_page")

    # CEFR
    cefr_summary = None
    if ENABLE_CEFR and word_levels is not None:
        cefr_word_table, cefr_by_page, cefr_summary = compute_cefr_tables(pages, word_levels, nlp)
        tables["cefr_word_table"] = save_table_bundle(cefr_word_table, tables_dir, "cefr_word_table")
        tables["cefr_by_page"]    = save_table_bundle(cefr_by_page, tables_dir, "cefr_by_page")

        # plot CEFR distribution
        if cefr_summary:
            fig = plot_cefr_distribution(cefr_summary)
            plots["cefr_distribution"] = save_plot(fig, plots_dir, "cefr_distribution")

    # Levenshtein
    lev_summary = None
    if ENABLE_LEV and lev_cache_path:
        lev_words, lev_by_page, lev_by_module, lev_summary = compute_levenshtein_tables(
            pages, nlp, top_k=LEV_TOP_K, cache_path=lev_cache_path
        )
        tables["lev_words"]     = save_table_bundle(lev_words, tables_dir, "lev_words")
        tables["lev_by_page"]   = save_table_bundle(lev_by_page, tables_dir, "lev_by_page")
        tables["lev_by_module"] = save_table_bundle(lev_by_module, tables_dir, "lev_by_module")

        if "lev_mean_weighted" in lev_by_page.columns:
            fig = plot_line(lev_by_page.dropna(subset=["lev_mean_weighted"]), "page_num", "lev_mean_weighted",
                            "Levenshtein similarity (weighted) by page", "Page", "Mean similarity")
            plots["lev_mean_by_page"] = save_plot(fig, plots_dir, "lev_mean_by_page")

    # meta summary
    means = {
        "flesch_reading_ease": float(by_page["flesch_reading_ease"].dropna().mean()) if "flesch_reading_ease" in by_page else None,
        "flesch_kincaid_grade": float(by_page["flesch_kincaid_grade"].dropna().mean()) if "flesch_kincaid_grade" in by_page else None,
        "ttr": float(by_page["ttr"].dropna().mean()) if "ttr" in by_page else None,
        "msttr_100": float(by_page["msttr_100"].dropna().mean()) if "msttr_100" in by_page else None,
        "lev_mean_weighted": (lev_summary.get("lev_mean_weighted_overall") if lev_summary else None),
    }

    meta = {
        "book_id": book_id,
        "title": book_row.get("title"),
        "series": book_row.get("series"),
        "grade": book_row.get("grade"),
        "year": book_row.get("year"),
        "lang": book_row.get("lang"),
        "txt_path": txt_path,
        "pages_total": int(len(pages)),
        "means": means,
        "cefr_summary": cefr_summary,
        "lev_summary": lev_summary,
        "artifacts": {
            "tables": {k: {kk: relpath(vv, out_root) for kk, vv in v.items()} for k, v in tables.items()},
            "plots": {k: relpath(v, out_root) for k, v in plots.items()}
        }
    }

    meta_path = os.path.join(book_dir, "meta.json")
    with open(meta_path, "w", encoding="utf-8") as f:
        json.dump(meta, f, ensure_ascii=False, indent=2)

    return meta


[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.8/12.8 MB[0m [31m57.1 MB/s[0m eta [36m0:00:00[0m
[?25h[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_sm')
[38;5;3m⚠ Restart to reload dependencies[0m
If you are in a Jupyter or Colab notebook, you may need to restart Python in
order to load all the package's dependencies. You can do this by selecting the
'Restart kernel' or 'Restart runtime' option.


In [18]:
# =========================
# CELL D — BATCH RUN (22 books) -> catalog_out
# =========================

MANIFEST_PATH = "/content/drive/MyDrive/EduText Analyzer/Учебники TXT/books_manifest.csv"
OUT_ROOT      = "/content/drive/MyDrive/EduText Analyzer/catalog_out"

ensure_dir(OUT_ROOT)

# Levenshtein cache (общий на все книги)
LEV_CACHE_PATH = os.path.join(OUT_ROOT, "cache", "lev_translation_cache.csv")
ensure_dir(os.path.dirname(LEV_CACHE_PATH))

# spaCy
nlp = spacy.load("en_core_web_sm")

# CEFR lexicon
word_levels = None
if ENABLE_CEFR:
    word_levels = load_cefr_lexicon(CEFR_CSV_PATH, map_c1_to_b1=True)
    print("CEFR lexicon loaded:", len(word_levels))

manifest = pd.read_csv(MANIFEST_PATH).replace({np.nan: None})
print("Books:", len(manifest))

index = []
errors = []

for i, row in enumerate(manifest.to_dict("records"), start=1):
    book_id = row.get("book_id")
    print(f"\n[{i}/{len(manifest)}] Processing: {book_id} -> {row.get('txt_path')}")

    try:
        meta = analyze_one_book(
            book_row=row,
            out_root=OUT_ROOT,
            nlp=nlp,
            word_levels=word_levels,
            lev_cache_path=LEV_CACHE_PATH if ENABLE_LEV else None
        )
        index.append(meta)
        print("OK:", book_id)
    except Exception as e:
        err = {"book_id": book_id, "error": str(e), "traceback": traceback.format_exc()}
        errors.append(err)
        print("FAILED:", book_id, "-", e)

# save index/errors
index_path = os.path.join(OUT_ROOT, "index.json")
errors_path = os.path.join(OUT_ROOT, "errors.json")

with open(index_path, "w", encoding="utf-8") as f:
    json.dump(index, f, ensure_ascii=False, indent=2)

with open(errors_path, "w", encoding="utf-8") as f:
    json.dump(errors, f, ensure_ascii=False, indent=2)

print("\nDONE")
print("index.json:", index_path)
print("errors.json:", errors_path)
print("OK:", len(index), "FAILED:", len(errors))

# ZIP для удобной выгрузки
import shutil
zip_path = os.path.join(os.path.dirname(OUT_ROOT), "catalog_out.zip")
if os.path.exists(zip_path):
    os.remove(zip_path)
shutil.make_archive(base_name=zip_path.replace(".zip",""), format="zip", root_dir=OUT_ROOT)
print("ZIP:", zip_path)


CEFR lexicon loaded: 7654
Books: 22

[1/22] Processing: spotlight_spotlight_2 -> /content/drive/MyDrive/EduText Analyzer/Учебники TXT/spotlight-txt/spotlight-2.txt
OK: spotlight_spotlight_2

[2/22] Processing: spotlight_spotlight_3 -> /content/drive/MyDrive/EduText Analyzer/Учебники TXT/spotlight-txt/spotlight-3.txt
OK: spotlight_spotlight_3

[3/22] Processing: spotlight_spotlight_4 -> /content/drive/MyDrive/EduText Analyzer/Учебники TXT/spotlight-txt/spotlight-4.txt
OK: spotlight_spotlight_4

[4/22] Processing: spotlight_spotlight_5 -> /content/drive/MyDrive/EduText Analyzer/Учебники TXT/spotlight-txt/spotlight-5.txt
OK: spotlight_spotlight_5

[5/22] Processing: spotlight_spotlight_6 -> /content/drive/MyDrive/EduText Analyzer/Учебники TXT/spotlight-txt/spotlight-6.txt
OK: spotlight_spotlight_6

[6/22] Processing: spotlight_spotlight_7 -> /content/drive/MyDrive/EduText Analyzer/Учебники TXT/spotlight-txt/spotlight-7.txt
OK: spotlight_spotlight_7

[7/22] Processing: spotlight_spotlight_