# Build EN->RU CEFR Corpus

This notebook loads the CEFR-labelled English texts, translates each entry into Russian using the Hugging Face Inference API, and stores the combined dataset as `data/text/en_ru_cefr_corpus.csv`.

## Prerequisites

- Install the required packages (`pandas`, `requests`, optionally `python-dotenv` for local secrets).
- Create a Hugging Face account and generate an [Inference API token](https://huggingface.co/settings/tokens) with access to public models.
- Expose the token to the notebook via the `HF_API_TOKEN` environment variable (for example, in Jupyter run `'%env HF_API_TOKEN=hf_xxx'`).

In [None]:
from __future__ import annotations

import os
import re
import time
from pathlib import Path
from typing import Sequence

import pandas as pd
import requests

SOURCE_CSV = Path("../data/text/cefr_leveled_texts.csv")
OUTPUT_CSV = Path("../data/text/en_ru_cefr_corpus.csv")
MODEL_URL = "https://api-inference.huggingface.co/models/Helsinki-NLP/opus-mt-en-ru"
BATCH_SIZE = 3
REQUEST_TIMEOUT = 60
RETRY_LIMIT = 6

HF_API_TOKEN = ""
# Optionally override in-place: HF_API_TOKEN = "hf_your_api_token"  # noqa: E605

if HF_API_TOKEN is None:
    raise RuntimeError(
        "Set HF_API_TOKEN environment variable (or assign in this cell) with your Hugging Face Inference token."
    )

if not SOURCE_CSV.exists():
    raise FileNotFoundError(f"Missing source corpus at {SOURCE_CSV.resolve()}")

OUTPUT_CSV.parent.mkdir(parents=True, exist_ok=True)

In [11]:
class HuggingFaceTranslator:
    """Thin wrapper around the Hugging Face inference endpoint for EN->RU translation."""

    def __init__(self, endpoint: str, token: str, *, timeout: int = 60, sleep: float = 1.0) -> None:
        if not token:
            raise ValueError("Token must be provided for the Hugging Face Inference API.")
        self.endpoint = endpoint
        self.headers = {"Authorization": f"Bearer {token.strip()}"}
        self.timeout = timeout
        self.sleep = max(sleep, 0.0)
        self._session = requests.Session()

    def translate_batch(self, texts: Sequence[str], *, retries: int = 6) -> list[str]:
        if not texts:
            return []
        payload = {
            "inputs": list(texts),
            "parameters": {"max_length": 1024},
            "options": {"wait_for_model": True},
        }
        last_error: Exception | None = None
        for attempt in range(retries):
            try:
                response = self._session.post(
                    self.endpoint,
                    headers=self.headers,
                    json=payload,
                    timeout=self.timeout,
                )
            except requests.RequestException as exc:  # network hiccup
                last_error = exc
            else:
                if response.status_code == requests.codes.ok:
                    data = response.json()
                    translations = self._extract_translations(data, expected=len(texts))
                    if self.sleep:
                        time.sleep(self.sleep)
                    return translations
                if response.status_code in {requests.codes.service_unavailable, requests.codes.too_many_requests}:
                    wait_time = self.sleep or 1.0
                    wait_time *= 2 ** attempt
                    time.sleep(min(wait_time, 30))
                    continue
                try:
                    details = response.json()
                except ValueError:
                    details = response.text
                raise RuntimeError(f"Translation failed: {response.status_code} -> {details}")
        raise RuntimeError("Repeated translation failures") from last_error

    @staticmethod
    def _extract_translations(data: object, *, expected: int) -> list[str]:
        if isinstance(data, dict):
            if "error" in data:
                raise RuntimeError(f"Model error: {data['error']}")
            if "translation_text" in data:
                data = [data]
        if isinstance(data, list) and data and isinstance(data[0], list):
            data = data[0]
        if not isinstance(data, list):
            raise RuntimeError(f"Unexpected response payload: {data}")
        translations: list[str] = []
        for item in data:
            if isinstance(item, dict) and "translation_text" in item:
                translations.append(item["translation_text"])
            else:
                raise RuntimeError(f"Unexpected item in response: {item}")
        if len(translations) != expected:
            raise RuntimeError(
                f"Expected {expected} translations but received {len(translations)}."
            )
        return translations


MAX_SEGMENT_CHARS = 700


def _split_paragraph(para: str, limit: int = MAX_SEGMENT_CHARS) -> list[str]:
    para = " ".join(para.strip().split())
    if not para:
        return []
    if len(para) <= limit:
        return [para]
    sentences = re.split(r'(?<=[.!?])\s+', para)
    segments: list[str] = []
    buffer = ""
    for sentence in sentences:
        sentence = sentence.strip()
        if not sentence:
            continue
        if len(sentence) > limit:
            if buffer:
                segments.append(buffer.strip())
                buffer = ""
            for start in range(0, len(sentence), limit):
                chunk = sentence[start : start + limit].strip()
                if chunk:
                    segments.append(chunk)
            continue
        next_len = len(buffer) + len(sentence) + (1 if buffer else 0)
        if next_len <= limit:
            buffer = f"{buffer} {sentence}".strip()
        else:
            if buffer:
                segments.append(buffer.strip())
            buffer = sentence
    if buffer:
        segments.append(buffer.strip())
    return segments


def split_text_for_api(text: str, limit: int = MAX_SEGMENT_CHARS) -> list[str]:
    if not isinstance(text, str):
        text = "" if text is None else str(text)
    text = text.strip()
    if not text:
        return []
    if len(text) <= limit:
        return [text]
    chunks: list[str] = []
    paragraphs = [para for para in re.split(r"\n{2,}", text) if para and para.strip()]
    if not paragraphs:
        paragraphs = [text]
    for paragraph in paragraphs:
        if len(paragraph) <= limit:
            cleaned = " ".join(paragraph.strip().split())
            if cleaned:
                chunks.append(cleaned)
        else:
            pieces = _split_paragraph(paragraph, limit=limit)
            chunks.extend(pieces)
    if not chunks:
        return [text[:limit]]
    return chunks


def translate_text_safe(
    text: str,
    translator: HuggingFaceTranslator,
    *,
    batch_size: int = BATCH_SIZE,
    retries: int = RETRY_LIMIT,
) -> str:
    segments = split_text_for_api(text)
    if not segments:
        return ""
    outputs: list[str] = []
    for start in range(0, len(segments), batch_size):
        batch = segments[start : start + batch_size]
        outputs.extend(translator.translate_batch(batch, retries=retries))
    return "\n\n".join(outputs)

In [12]:
source_df = pd.read_csv(SOURCE_CSV).reset_index().rename(columns={"index": "row_id"})
source_df = source_df.rename(columns={"text": "text_en"})
required_columns = {"row_id", "text_en", "label"}
missing = required_columns - set(source_df.columns)
if missing:
    raise ValueError(f"Missing required columns: {sorted(missing)}")
source_df = source_df[["row_id", "text_en", "label"]]
source_df = source_df.sort_values("row_id").reset_index(drop=True)
print(f"Loaded {len(source_df)} labelled texts.")

if OUTPUT_CSV.exists():
    translated_df = pd.read_csv(OUTPUT_CSV)
    if "row_id" in translated_df.columns:
        processed_ids = set(
            pd.to_numeric(translated_df["row_id"], errors="coerce").dropna().astype(int).tolist()
        )
    else:
        processed_ids = set()
    remaining_df = source_df[~source_df["row_id"].isin(processed_ids)].copy()
    print(
        f"Found existing translations for {len(processed_ids)} rows; "
        f"{len(remaining_df)} still pending."
    )
else:
    remaining_df = source_df.copy()
    print("No existing translations found; starting from scratch.")

remaining_df.head()

Loaded 1494 labelled texts.
No existing translations found; starting from scratch.


Unnamed: 0,row_id,text_en,label
0,0,Hi!\nI've been meaning to write for ages and f...,B2
1,1,﻿It was not so much how hard people found the ...,B2
2,2,Keith recently came back from a trip to Chicag...,B2
3,3,"The Griffith Observatory is a planetarium, and...",B2
4,4,-LRB- The Hollywood Reporter -RRB- It's offici...,B2


In [None]:
translator = HuggingFaceTranslator(
    MODEL_URL,
    HF_API_TOKEN,
    timeout=REQUEST_TIMEOUT,
    sleep=1.2,
)

if remaining_df.empty:
    print("All rows already translated.")
else:
    write_header = not OUTPUT_CSV.exists()
    total_pending = len(remaining_df)
    for start in range(0, total_pending, BATCH_SIZE):
        batch = remaining_df.iloc[start : start + BATCH_SIZE]
        english_texts = batch["text_en"].tolist()
        row_ids = batch["row_id"].tolist()
        translations: list[str] = []
        for row_id, text in zip(row_ids, english_texts):
            text_value = "" if pd.isna(text) else str(text)
            try:
                translated_text = translate_text_safe(
                    text_value,
                    translator,
                    batch_size=BATCH_SIZE,
                    retries=RETRY_LIMIT,
                )
            except RuntimeError as exc:
                print(f"Failed to translate row {row_id}: {exc}")
                raise
            translations.append(translated_text)
        batch_out = pd.DataFrame(
            {
                "row_id": row_ids,
                "text_en": english_texts,
                "text_ru": translations,
                "label": batch["label"].tolist(),
            }
        )
        batch_out.to_csv(OUTPUT_CSV, mode="a", header=write_header, index=False)
        write_header = False
        first_id = batch_out["row_id"].iloc[0]
        last_id = batch_out["row_id"].iloc[-1]
        completed = start + len(batch_out)
        print(
            f"Translated rows {first_id}-{last_id} "
            f"({completed}/{total_pending} in this session)."
        )
        time.sleep(0.3)

print("Done.")


Translated rows 0-2 (3/1494 in this session).
Translated rows 3-5 (6/1494 in this session).
Translated rows 6-8 (9/1494 in this session).
Translated rows 9-11 (12/1494 in this session).
Translated rows 12-14 (15/1494 in this session).
Translated rows 15-17 (18/1494 in this session).
Translated rows 18-20 (21/1494 in this session).


In [None]:
if OUTPUT_CSV.exists():
    preview_df = pd.read_csv(OUTPUT_CSV)
    print(f"Generated {len(preview_df)} translated rows so far.")
    preview_df.head()
else:
    raise FileNotFoundError("Translation output not found; run the previous cell first.")