In [42]:
!pip install pandas reportlab pyarrow



[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.2[0m[39;49m -> [0m[32;49m25.3[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [43]:
# === КОНФИГ ===

# Входной файл (json / csv / tsv / xlsx / xls / parquet / txt)
INPUT_FILE = "sample_submission.tsv"

# Базовое имя выходных файлов (без расширения)
OUTPUT_BASENAME = "rag_pairs"

# Что генерировать
MAKE_PDF = True
MAKE_TXT = True

# Как соединять ID и текст
PAIR_SEPARATOR = " - "  # будет: "<id> - <text>"

# ---- НАСТРОЙКИ ДЛЯ JSON ----
# имя поля для ID, например "query_id"
JSON_ID_FIELD = "query_id"        # или None, если ID = номер записи
# имя поля для текста, например "question"
JSON_TEXT_FIELD = "question"      # ОБЯЗАТЕЛЬНО указать для JSON
# Если записи лежат по ключу, типа {"data": [ {...}, {...} ]}
JSON_LIST_PATH = None             # например "data" или None

# ---- НАСТРОЙКИ ДЛЯ ТАБЛИЧНЫХ ФОРМАТОВ (csv/xlsx/tsv/parquet) ----
# имя столбца для ID, например "id" или "query_id"
TABLE_ID_COLUMN = "app_name"      # или None, если ID = номер строки
# имя столбца для текста, например "question" / "text" / "answer"
TABLE_TEXT_COLUMN = "labels_str"    # ОБЯЗАТЕЛЬНО указать для таблиц

# ---- НАСТРОЙКИ ДЛЯ TXT ----
# для .txt каждая непустая строка = отдельная запись
TXT_USE_LINE_NUMBER_AS_ID = True  # если False, ID будет пустым
TXT_DEFAULT_ID_LABEL = ""         # только если TXT_USE_LINE_NUMBER_AS_ID = False

# Заголовок PDF (можно None)
TITLE = f"Пары ID - текст из {INPUT_FILE}"


In [44]:
import json
from pathlib import Path

import pandas as pd

from reportlab.lib.pagesizes import A4
from reportlab.platypus import SimpleDocTemplate, Paragraph, Preformatted
from reportlab.lib.styles import getSampleStyleSheet


In [45]:
TABULAR_EXTS = {".csv", ".tsv", ".tab", ".xlsx", ".xls", ".parquet"}

def load_tabular_to_dataframe(path: str) -> pd.DataFrame:
    p = Path(path)
    ext = p.suffix.lower()

    if ext == ".csv":
        df = pd.read_csv(p)
    elif ext in {".tsv", ".tab"}:
        df = pd.read_csv(p, sep="\t")
    elif ext in {".xlsx", ".xls"}:
        df = pd.read_excel(p)
    elif ext == ".parquet":
        df = pd.read_parquet(p)
    else:
        raise ValueError(f"Расширение {ext} не относится к табличным форматам")

    if isinstance(df.index, pd.MultiIndex):
        df = df.reset_index()

    return df


In [46]:
def pairs_from_dataframe(
    df: pd.DataFrame,
    id_column: str | None,
    text_column: str,
    separator: str = " - "
) -> list[str]:
    """
    Каждая строка таблицы -> одна пара:
    "<id> - <text>"

    Если id_column = None, то id = номер строки (начинаем с 1).
    """
    if text_column not in df.columns:
        raise KeyError(f"Столбец для текста '{text_column}' не найден в DataFrame")

    pairs = []

    for idx, row in df.iterrows():
        # id
        if id_column is None:
            id_val = str(idx + 1)
        else:
            if id_column not in df.columns:
                raise KeyError(f"Столбец для ID '{id_column}' не найден в DataFrame")
            id_val = row[id_column]

        text_val = row[text_column]

        id_str = "" if id_val is None else str(id_val).strip()
        text_str = "" if text_val is None else str(text_val).strip()

        if not text_str:
            continue  # пустой текст пропускаем

        # Если id пустой, то просто "text"
        if id_str:
            line = f"\"{id_str}{separator}{text_str}\""
        else:
            line = f"\"{text_str}\""

        pairs.append(line)

    return pairs


In [47]:
def pairs_from_json(
    path: str,
    id_field: str | None,
    text_field: str,
    list_path: str | None,
    separator: str = " - "
) -> list[str]:
    with open(path, "r", encoding="utf-8") as f:
        obj = json.load(f)

    # 1) Если указан JSON_LIST_PATH и это dict
    if list_path is not None and isinstance(obj, dict):
        records = obj.get(list_path, [])
    # 2) Если это список словарей
    elif isinstance(obj, list):
        records = obj
    # 3) Если это один словарь — считаем, что это одна запись
    elif isinstance(obj, dict):
        records = [obj]
    else:
        # странный формат — сделаем одну пару "1 - <pretty_json>"
        pretty = json.dumps(obj, ensure_ascii=False)
        return [f"\"1{separator}{pretty}\""]

    pairs = []

    for i, rec in enumerate(records, start=1):
        if not isinstance(rec, dict):
            # если в списке не словарь — просто дампим
            val_str = json.dumps(rec, ensure_ascii=False)
            line = f"\"{i}{separator}{val_str}\""
            pairs.append(line)
            continue

        # ID
        if id_field is None:
            id_val = str(i)
        else:
            id_val = rec.get(id_field)

        # Текст
        text_val = rec.get(text_field)

        if text_val is None:
            continue

        id_str = "" if id_val is None else str(id_val).strip()
        text_str = str(text_val).strip()

        if not text_str:
            continue

        if id_str:
            line = f"\"{id_str}{separator}{text_str}\""
        else:
            line = f"\"{text_str}\""

        pairs.append(line)

    return pairs


In [48]:
def pairs_from_txt(
    path: str,
    use_line_number_as_id: bool = True,
    default_id_label: str = "",
    separator: str = " - "
) -> list[str]:
    p = Path(path)
    lines_raw = p.read_text(encoding="utf-8").splitlines()
    pairs = []

    for i, line in enumerate(lines_raw, start=1):
        text_str = line.strip()
        if not text_str:
            continue

        if use_line_number_as_id:
            id_str = str(i)
        else:
            id_str = default_id_label.strip()

        if id_str:
            pair = f"\"{id_str}{separator}{text_str}\""
        else:
            pair = f"\"{text_str}\""

        pairs.append(pair)

    return pairs


In [49]:
def pairs_to_text(pairs: list[str]) -> str:
    return "\n".join(pairs)


In [50]:
def text_to_pdf(text: str, output_path: str, title: str | None = None):
    doc = SimpleDocTemplate(
        output_path,
        pagesize=A4,
        rightMargin=40,
        leftMargin=40,
        topMargin=40,
        bottomMargin=40,
    )

    styles = getSampleStyleSheet()
    story = []

    if title:
        story.append(Paragraph(title, styles["Title"]))

    story.append(Preformatted(text, styles["Code"]))

    doc.build(story)
    print("PDF создан →", output_path)


In [51]:
def convert_any_to_id_text_pairs(
    input_file: str,
    output_basename: str,
    *,
    title: str | None = None,
    make_pdf: bool = True,
    make_txt: bool = True,
    # json config
    json_id_field: str | None = None,
    json_text_field: str | None = None,
    json_list_path: str | None = None,
    # table config
    table_id_column: str | None = None,
    table_text_column: str | None = None,
    # txt config
    txt_use_line_number_as_id: bool = True,
    txt_default_id_label: str = "",
    # common
    separator: str = " - "
):
    p = Path(input_file)
    ext = p.suffix.lower()

    pdf_path = Path(f"{output_basename}.pdf")
    txt_path = Path(f"{output_basename}.txt")

    # Определяем пары в зависимости от формата
    if ext in TABULAR_EXTS:
        df = load_tabular_to_dataframe(input_file)
        pairs = pairs_from_dataframe(
            df,
            id_column=table_id_column,
            text_column=table_text_column,
            separator=separator,
        )
    elif ext == ".json":
        if json_text_field is None:
            raise ValueError("Для JSON необходимо указать JSON_TEXT_FIELD в конфиге")
        pairs = pairs_from_json(
            input_file,
            id_field=json_id_field,
            text_field=json_text_field,
            list_path=json_list_path,
            separator=separator,
        )
    elif ext == ".txt":
        pairs = pairs_from_txt(
            input_file,
            use_line_number_as_id=txt_use_line_number_as_id,
            default_id_label=txt_default_id_label,
            separator=separator,
        )
    else:
        # fallback: просто одна пара "имя файла - весь текст"
        try:
            text_raw = p.read_text(encoding="utf-8").strip()
        except UnicodeDecodeError:
            text_raw = p.read_text(encoding="cp1251").strip()

        if not text_raw:
            pairs = []
        else:
            line = f"\"{p.name}{separator}{text_raw}\""
            pairs = [line]

    text = pairs_to_text(pairs)

    if make_txt:
        txt_path.write_text(text, encoding="utf-8")
        print("TXT создан →", txt_path)

    if make_pdf:
        text_to_pdf(text, str(pdf_path), title=title or f"Пары ID - текст из {p.name}")


In [52]:
convert_any_to_id_text_pairs(
    INPUT_FILE,
    OUTPUT_BASENAME,
    title=TITLE,
    make_pdf=MAKE_PDF,
    make_txt=MAKE_TXT,
    json_id_field=JSON_ID_FIELD,
    json_text_field=JSON_TEXT_FIELD,
    json_list_path=JSON_LIST_PATH,
    table_id_column=TABLE_ID_COLUMN,
    table_text_column=TABLE_TEXT_COLUMN,
    txt_use_line_number_as_id=TXT_USE_LINE_NUMBER_AS_ID,
    txt_default_id_label=TXT_DEFAULT_ID_LABEL,
    separator=PAIR_SEPARATOR,
)


TXT создан → rag_pairs.txt
PDF создан → rag_pairs.pdf
