## Настройка окружения

#### Монтирование Google-Диска

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


### Установка зависимостей

In [None]:
%%capture --no-stderr
%pip install openpyxl deepdiff pillow nltk tqdm langchain_community langgraph_sdk langchain_openai langgraph langchain_core langsmith ftfy openai httpx==0.27.2

### Импорты

In [None]:
import os
import json

from deepdiff import DeepDiff

from google.colab import userdata

import operator

import uuid

import asyncio

import builtins

from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langgraph.graph import StateGraph, START, END, MessagesState
from langgraph.prebuilt import ToolNode, tools_condition
from langgraph.checkpoint.memory import MemorySaver
from langgraph.store.base import BaseStore
from langgraph.store.memory import InMemoryStore
from langchain_core.runnables.config import RunnableConfig
from langgraph.constants import Send

from IPython.display import Image, display

from langchain_community.document_loaders import WikipediaLoader
from langchain_community.tools import TavilySearchResults

from jinja2 import Template

from pydantic import BaseModel, Field, field_validator
from typing import Optional, Literal, Union
import datetime

from typing import Annotated, Any, Optional, List, Literal, Dict, Type, Tuple
from typing_extensions import TypedDict

from openai import OpenAI

import pandas as pd

import math

from collections import defaultdict

from tqdm import tqdm

import time

import yaml
from pathlib import Path

### Переменные окружения

In [None]:
# Укажите базовый URL для API, если используете альтернативный хост
os.environ['API_BASE_URL'] = None  # Например, "https://api.fireworks.ai/inference/v1"

# Укажите путь к конфигу с промптами
os.environ['PROMPT_CONFIG_PATH'] = "../../data/configs/prompts.yaml""

In [None]:
model_name = "gpt-4.1" # Укажите навзвание модели
main_dir = "drive/MyDrive/LLM_coding_challenge" # Рабочая директория

# Поддиректории с исходными и целевыми данными
init_data_dir = os.path.join(main_dir, "initial_data")
important_data_dir = os.path.join(main_dir, "important_data")

In [None]:
# Инициализация модели через API. Не забудьте задать OPENAI_API_KEY в переменных окружения.
model = ChatOpenAI(
    model=model_name,
    openai_api_key=os.getenv("OPENAI_API_KEY"),
    # openai_api_base=os.getenv("API_BASE_URL"), # Раскомментируйте, если нужен нестандартный endpoint
    max_retries=30,
    timeout=90,
)

### Конфиги

In [None]:
def load_prompts_config():
    """
    Загружает YAML-конфиг с промптами из файла, путь к которому задан через переменную окружения PROMPT_CONFIG_PATH
    """
    config_path = Path(os.environ["PROMPT_CONFIG_PATH"])
    if not config_path.exists():
        raise FileNotFoundError(f"Файл промптов не найден по пути {config_path}")
    with open(config_path, "r", encoding="utf-8") as f:
        return yaml.safe_load(f)

PROMPTS_CONFIG = load_prompts_config()

### Исследование и обработка данных

In [None]:
file_path = os.path.join(init_data_dir, "Примеры внесения информации из присылаемых сообщений в таблицы.xlsx")

df = pd.read_excel(file_path)

In [None]:
def extract_structured_data_from_df(df):
    structured_data = []
    marker_text = "Пример сообщения от агронома"
    message_column_index = 0

    i = 0
    while i < len(df):
        # Проверяем, является ли значение в ячейке строкой и начинается ли оно с маркерного текста
        cell_value = df.iloc[i, message_column_index]
        if isinstance(cell_value, str) and cell_value.strip().startswith(marker_text):

            # Нашли маркер. Сообщение агронома в следующей строке, в том же столбце
            if i + 1 < len(df):
                agronomist_message = df.iloc[i + 1, message_column_index]
            else:
                # Если маркер в последней строке, сообщения нет
                agronomist_message = None
                i += 1
                continue # Переходим к следующей итерации внешнего цикла

            # Заголовки таблицы находятся через одну строку от маркера (i+2)
            if i + 2 < len(df):
                header_row = df.iloc[i + 2]
                # Убираем возможные NaN из заголовков, заменяя их на что-то осмысленное или пустую строку
                table_headers = [str(h) if pd.notna(h) else f'Unnamed_{col_idx}' for col_idx, h in enumerate(header_row)]
            else:
                # Если нет места для заголовков, пропускаем этот блок
                i += 1 # Пропускаем строку маркера для следующей итерации
                continue

            # Данные таблицы начинаются со строки i + 3
            data_start_index = i + 3
            current_table_rows = []
            j = data_start_index

            while j < len(df):
                current_row = df.iloc[j]
                # Проверяем, состоит ли вся строка из NaN
                if current_row.isnull().all():
                    # Нашли конец таблицы (строка со всеми NaN)
                    break
                else:
                    # Это строка данных, добавляем её в список
                    current_table_rows.append(current_row.tolist()) # Преобразуем в список для DataFrame
                    j += 1

            # Если мы собрали какие-то строки для таблицы
            if current_table_rows:
                # Создаем DataFrame из собранных строк с правильными заголовками
                table_df = pd.DataFrame(current_table_rows, columns=table_headers)
                table_df = table_df.drop(columns=["Unnamed_0"])
                # Добавляем результат (сообщение и DataFrame таблицы) в общий список
                structured_data.append({
                    "agronomist_message": agronomist_message,
                    "data_table": table_df
                })
            else:
                # Если данных для таблицы не найдено после заголовков (например, сразу NaN строка)
                # Можно добавить запись с пустой таблицей или пропустить
                structured_data.append({
                    "agronomist_message": agronomist_message,
                    "data_table": pd.DataFrame(columns=table_headers) # Пустой DataFrame с заголовками
                })

            # Перемещаем основной индекс i за пределы только что обработанного блока
            i = j + 1 # Начинаем следующий поиск со строки после найденной NaN-строки

        else:
            # Если текущая строка не маркер, просто переходим к следующей
            i += 1

    return structured_data

In [None]:
structured_data = extract_structured_data_from_df(df)

print(f"\nНайдено и обработано {len(structured_data)} блоков сообщений.")
structured_data[0]['data_table']

## Structured Output

#### Schemas
Набор допустимых значений для полей

Используется при декодировании ответа модели с ограничениями (constrained decoding).

Модель генерирует ответ в формате, строго соответствующем Pydantic-схеме,
и не может выйти за рамки заданного множества значений.

In [None]:
department_set = tuple(['1', '10', '11', '12', '14', '16', '17', '18',
                                 '19', '20', '3', '4', '5', '6', '7', '9',
                                 'АО Кропоткинское', 'АОР', 'Восход', 'Рассвет','Кавказ',
                                 'Колхоз Прогресс', 'Мир', 'СП Коломейцево',
                                 'Север', 'ТСК', 'Центр', 'Юг'])

operation_set = tuple(['1-я междурядная культивация', '2-я междурядная культивация',
                       'Боронование довсходовое', 'Внесение минеральных удобрений',
                       'Выравнивание зяби', '2-е Выравнивание зяби',
                       'Гербицидная обработка', '1 Гербицидная обработка',
                       '2 Гербицидная обработка', '3 Гербицидная обработка',
                       '4 Гербицидная обработка', 'Дискование', 'Дискование 2-е', 'Дискование 3-е',
                       'Инсектицидная обработка', 'Культивация', 'Пахота',
                       'Подкормка', '2-я подкормка', 'Предпосевная культивация', 'Прикатывание посевов',
                       'Сев', 'Сплошная культивация', 'Уборка', 'Функицидная обработка',
                       'Чизлевание'])

crop_set = tuple(['Вика+Тритикале', 'Горох на зерно', 'Горох товарный', 'Гуар',
                  'Конопля', 'Кориандр', 'Кукуруза кормовая', 'Кукуруза семенная',
                  'Кукуруза товарная', 'Люцерна', 'Многолетние злаковые травы',
                  'Многолетние травы текущего года','Овес', 'Подсолнечник кондитерский',
                  'Подсолнечник семенной','Подсолнечник товарный', 'Просо','Пшеница озимая на зеленый корм',
                  'Пшеница озимая семенная', 'Пшеница озимая товарная', 'Рапс озимый',
                  'Рапс яровой', 'Свекла сахарная', 'Сорго', 'Сорго кормовой',
                  'Сорго-суданковый гибрид', 'Соя семенная', 'Соя товарная',
                  'Чистый пар', 'Чумиза', 'Ячмень озимый', 'Ячмень озимый семенной'])

In [None]:
def create_base_field_works_schemas(
    department_set: Tuple[str],
    operation_set: Tuple[str],
    crop_set: Tuple[str],
):
    class FieldWorkEntry(BaseModel):
        date: Optional[str] = Field(
            None, description="Дата проведения операции (формат: 'мм-дд')"
        )
        department_name: Literal[department_set] = Field(
            ..., description="Название подразделения, в котором проводилась операция"
        )
        operation: Literal[operation_set] = Field(
            ..., description="Название выполненной операции"
        )
        crop: Literal[crop_set] = Field(
            ..., description="Культура, к которой относится операция"
        )
        processed_area_day: int = Field(
            ..., description="Обработанная площадь за день, в гектарах"
        )
        processed_area_total: Optional[int] = Field(
            None, description="Общая обработанная площадь с начала операции, в гектарах"
        )
        yield_kg_day: Optional[int] = Field(
            None, description="Валовая продукция за день, в килограммах"
        )
        yield_kg_total: Optional[int] = Field(
            None, description="Суммарная валовая продукция с начала операции, в килограммах"
        )

        @field_validator('crop', mode='before')
        @classmethod
        def normalize_crop(cls, value):
            if isinstance(value, str):
                return value.replace('\xa0', ' ').strip()
            return value

        # Added for cleaner comparison later
        def model_dump_comparable(self) -> Dict[str, Any]:
            """Dumps model to dict, ensuring consistent handling of None/NaN."""
            dumped = self.model_dump() # Сначала получаем стандартный словарь модели
            for key, value in dumped.items():
                if is_empty(value): # Проверяем значение с помощью нашей функции is_empty
                    dumped[key] = None # Если значение "пустое" (None или NaN), заменяем его на None
            return dumped # Возвращаем словарь с нормализованными пустыми значениями


    class FieldWorkLog(BaseModel):
        entries: List[FieldWorkEntry] = Field(
            ..., description="Список записей о полевых операциях"
        )

    return FieldWorkEntry, FieldWorkLog

In [None]:
FieldWorkEntry, FieldWorkLog = create_base_field_works_schemas(
    department_set,
    operation_set,
    crop_set,
)

### BASELINE

#### Prompts

In [None]:
BASELINE_SYSTEM_PROMPT_TEMPLATE = Template(PROMPTS_CONFIG.get("baseline_system_prompt_template"))

#### Code

Функции для генерации few-shot примеров из JSON файла

In [None]:
def generate_few_shot_text(few_shot_examples: List[Dict[str, Any]]) -> str:
    blocks = []
    for i, example in enumerate(few_shot_examples, start=1):
        input_text = example["input"]
        output_block = example["output"].model_dump_json(indent=4)
        explanation = example.get("explanation")


        example_text = f"""# Example {i}:

INPUT:
<<<
{input_text}
>>>

OUTPUT:
<<<
{output_block}
>>>"""

        if explanation:
            example_text += f"""

EXPLANATION:
{explanation}"""

        blocks.append(example_text)

    return "\n\n---\n\n".join(blocks)


def form_few_shot_str(json_path, chosen_examples):
    """
    Формирует и возвращает в текстовом виде few-shot примеры.
    """
    with open(json_path, "r", encoding="utf-8") as f:
        data = json.load(f)
    few_shot_data = []

    for i, example in enumerate(data):
        if i in chosen_examples:
            input = example.get("message")
            explanation = example.get("explanation")
            # print(explanation)

            output_block = FieldWorkLog(entries=[
                    FieldWorkEntry(**entry) for entry in example.get("entries")
                ])

            few_shot_data.append({
                    "input": input,
                    "output": output_block,
                    "explanation": explanation
                })

    few_shot_examples_str = generate_few_shot_text(few_shot_data)

    return few_shot_examples_str

In [None]:
def run_model_on_structured_data_sync(
    structured_data: List[dict],
    excluded_ids: List[int],
    model_output_schema: Type[BaseModel],
    system_prompt_template,
    model,
    few_shot_examples_str: str,
    schema_hints: str,
    temperature: float = 0.1,
    top_p: float = 1.0,
    sleep_time: int = 0
) -> List[dict]:
    """
    Прогоняет LLM-модель по structured_data, исключая примеры из excluded_ids.
    Позволяет управлять параметрами генерации (temperature, top_p).
    """
    results = []

    bound_model = model.with_structured_output(model_output_schema).bind(
        temperature=temperature,
        top_p=top_p,
    )

    for i, entry in enumerate(structured_data):
        if i in excluded_ids:
            continue

        curr_message = entry['input']

        system_prompt = system_prompt_template.render(
            json_schema=json.dumps(model_output_schema.model_json_schema(), indent=2, ensure_ascii=False),
            few_shot_examples=few_shot_examples_str,
            message=curr_message,
            schema_hints=schema_hints,
        )

        try:
            answer = bound_model.invoke(
                [SystemMessage(content=system_prompt)],
            )
        except Exception as e:
            print(f"⚠️ Ошибка при обработке записи {i}: {e}")
            answer = FieldWorkLog(entries=[])

        results.append({
            "input": curr_message,
            "output": answer
        })

        time.sleep(sleep_time) # для того чтобы не превышать rate limit

    return results


In [None]:
async def run_model_on_structured_data_async(
    structured_data: List[dict],
    excluded_ids: List[int],
    model_output_schema: Type[BaseModel],
    system_prompt_template,
    model,
    few_shot_examples_str: str,
    schema_hints: str,
    max_concurrent_tasks: int = 5,
) -> List[dict]:
    """
    Асинхронно прогоняет LLM-модель по structured_data, исключая примеры из excluded_ids.
    """

    semaphore = asyncio.Semaphore(max_concurrent_tasks)
    results = []

    bound_model = model.with_structured_output(model_output_schema)

    async def process_entry(i, entry):
        if i in excluded_ids:
            return None

        curr_message = entry['input']
        system_prompt = system_prompt_template.render(
            json_schema=json.dumps(model_output_schema.model_json_schema(), indent=2, ensure_ascii=False),
            few_shot_examples=few_shot_examples_str,
            message=curr_message,
            schema_hints=schema_hints,
        )

        async with semaphore:
            try:
                answer = await bound_model.ainvoke(
                    [SystemMessage(content=system_prompt)]
                )
            except Exception as e:
                print(f"Произошла ошибка при вызове модели: {e}")
                answer = FieldWorkLog(entries=[])
            return {
                "input": curr_message,
                "output": answer
            }

    tasks = [process_entry(i, entry) for i, entry in enumerate(structured_data)]
    raw_results = await asyncio.gather(*tasks)
    results = [r for r in raw_results if r is not None]
    return results

### Генерация полного тестового датасета

In [None]:
def run_inference_on_xlsx(
    xlsx_path: str,
    model_output_schema: Type,
    system_prompt_template,
    model,
    few_shot_examples_str: str,
    schema_hints: str,
    save_every: int = 10,
    output_json_path: str = "model_outputs.json"
):
    # Загрузка входных сообщений
    df = pd.read_excel(xlsx_path)
    messages = df.iloc[:, 0].dropna().tolist()

    # Подготовка данных в нужный формат
    structured_data = [{"agronomist_message": msg} for msg in messages]

    results = []
    for i, entry in enumerate(tqdm(structured_data, desc="Processing messages")):
        prompt = system_prompt_template.render(
            json_schema=model_output_schema.model_json_schema(),
            few_shot_examples=few_shot_examples_str,
            message=entry["agronomist_message"],
            schema_hints=schema_hints,
        )

        try:
            answer = model.with_structured_output(model_output_schema).invoke(
                [SystemMessage(content=prompt)]
            )

            results.append({
                "input": entry["agronomist_message"],
                "output": answer.model_dump()
            })

        except Exception as e:
            print(f"⚠️ Error on entry {i}: {e}")
            results.append({
                "input": entry["agronomist_message"],
                "output": None,
                "error": str(e)
            })

        # Сохраняем каждые `save_every` итераций
        if (i + 1) % save_every == 0 or (i + 1) == len(structured_data):
            with open(output_json_path, "w", encoding="utf-8") as f:
                json.dump(results, f, indent=2, ensure_ascii=False)
            print(f"✅ Saved {len(results)} results to {output_json_path}")

    return results

### Evaluate

#### Скрипт для автоматической проверки

Подсчёт метрик и валидация ошибок на основе ground truth (эталонных) данных (изначально размечены моделью, затем провалидированы и поправлены вручную):
- **field_accuracy** - средняя точность по всем полям
- **exact_match_accuracy** - полное совпадение для ВСЕХ записей входного сообщения (если хотя бы одна запись несоответствует - для всего сообщения считается ошибкой)
- **precision** , **f1**, **recall** - измеряют количество недостающих/лишних записей
- **total_correctly_matched_entries** - количество записей которое на 100% совпало с ground_truth данными
- **total_true_entries** - количество ground truth (эталонных) записей

После выполнения, функция формирует результирующий JSON файл с подробными метриками и ошибками (конкретно какой пример и в чём ошибка)

In [None]:
def is_empty(value):
    return value is None or (isinstance(value, float) and math.isnan(value))

def evaluate_predictions(ground_truth_data: List[Dict],
                         predicted_data: List[Dict],
                         excluded_ids: List[int],
                         output_json_path: str = "evaluation_results.json"):
    """
    Оценивает предсказания модели структурированных данных агронома.

    Args:
        ground_truth_data: Список словарей с эталонными данными ('message', 'entries').
        predicted_data: Список словарей с предсказанными данными ('output': FieldWorkLog).
        excluded_ids: Список индексов примеров для исключения из оценки.
        output_json_path: Путь для сохранения JSON-файла с результатами.

    Returns:
        Словарь с общими метриками оценки.
    """
    results = {
        "overall_metrics": {
            "total_examples_evaluated": 0,
            "field_accuracy": 0.0,
            "exact_match_accuracy": 0.0,
            "entry_level_metrics": {
                "precision": 0.0,
                "recall": 0.0,
                "f1_score": 0.0,
                "total_true_entries": 0,
                "total_predicted_entries": 0,
                "total_correctly_matched_entries": 0,  # True Positives (TP) for entries
                "total_extra_entries": 0,             # False Positives (FP) for entries
                "total_missing_entries": 0            # False Negatives (FN) for entries
            }
        },
        "per_field_accuracy": defaultdict(lambda: {"correct": 0, "total": 0, "accuracy": 0.0}),
        "mismatched_examples": []
    }

    # Инициализация счетчиков для полей и метрик
    total_fields_count = 0
    correct_fields_count = 0
    exact_matches_count = 0

    total_true_entries = 0
    total_pred_entries = 0
    total_tp_entries = 0  # Matched entries based on content comparison (все поля)
    total_fp_entries = 0  # Extra predicted entries
    total_fn_entries = 0  # Missing ground truth entries

    # Получаем список имен полей из Pydantic модели для инициализации счетчиков
    field_names = list(FieldWorkEntry.model_fields.keys())
    for field in field_names:
        results["per_field_accuracy"][field]  # Инициируем словари

    # Фильтруем данные, исключая id
    valid_indices = [i for i, _ in enumerate(ground_truth_data) if i not in excluded_ids]
    if len(valid_indices) != len(predicted_data):
        print(f"Warning: Number of ground truth items ({len(valid_indices)}) after exclusion "
              f"does not match number of predicted items ({len(predicted_data)}). "
              f"Evaluation will proceed with {min(len(valid_indices), len(predicted_data))} items.")
        # Обрезаем до минимальной длины для zip
        min_len = min(len(valid_indices), len(predicted_data))
        valid_indices = valid_indices[:min_len]
        predicted_data = predicted_data[:min_len]

    results["overall_metrics"]["total_examples_evaluated"] = len(valid_indices)

    for i, pred_index in enumerate(valid_indices):
        true_item = ground_truth_data[pred_index]
        pred_item = predicted_data[i]  # уже синхронизировали длины

        try:
            true_entries_raw = true_item.get('entries', [])
            true_log = FieldWorkLog(entries=[FieldWorkEntry(**entry) for entry in true_entries_raw])
            pred_log = pred_item['output']
            if not isinstance(pred_log, FieldWorkLog):
                # Попробуем собрать FieldWorkLog из dict
                if isinstance(pred_log, dict) and 'entries' in pred_log:
                    try:
                        pred_log = FieldWorkLog(**pred_log)
                    except Exception as parse_error:
                        print(f"Error parsing predicted_data item {i} into FieldWorkLog: {parse_error}")
                        # Считаем, что предсказание пустое
                        pred_log = FieldWorkLog(entries=[])
                else:
                    print(f"Predicted data item {i}['output'] is not a FieldWorkLog instance or dict.")
                    pred_log = FieldWorkLog(entries=[])
        except Exception as e:
            # Ошибка парсинга, фиксируем mismatch и переходим к следующему
            print(f"Error processing item index {pred_index} (prediction index {i}): {e}")
            results["mismatched_examples"].append({
                "example_index": pred_index,
                "input_message": true_item.get('message') or true_item.get("agronomist_message", "N/A"),
                "status": "Parsing Error",
                "error_details": str(e),
                "expected_entries_raw": true_item.get('entries', []),
                "predicted_output_raw": pred_item.get('output', 'N/A')
            })
            continue

        true_entries = true_log.entries
        pred_entries = pred_log.entries

        n_true = len(true_entries)
        n_pred = len(pred_entries)
        total_true_entries += n_true
        total_pred_entries += n_pred

        # Будем фиксировать всю информацию о сравнениях в log_mismatch_details
        log_mismatch_details = {
            "example_index": pred_index,
            "input_message": true_item.get('message') or true_item.get("agronomist_message", "N/A"),
            "status": "Match",  # переопределим ниже, если найдём расхождения
            "expected_entries": [e.model_dump_comparable() for e in true_entries],
            "predicted_entries": [e.model_dump_comparable() for e in pred_entries],
            "entry_comparison": [],
            "entry_count_mismatch": None
        }

        # По умолчанию считаем, что для "точного совпадения" лога нужно,
        # чтобы кол-во записей совпадало и все поля в парах совпадали
        is_exact_match_for_log = (n_true == n_pred and n_true > 0)

        # --- Главная развилка: если кол-во записей совпадает, используем старую логику (порядок важен).
        #     Иначе — «бес порядковое» сопоставление (bag-based).
        if n_true == n_pred:
            # --------------------- СТАРАЯ ЛОГИКА (порядок важен) ---------------------
            if n_true == 0 and n_pred == 0:
                # Если обе записи пустые, это тоже точное совпадение
                exact_matches_count += 1
            else:
                matched_count_for_log = 0

                for j, (true_entry, pred_entry) in enumerate(zip(true_entries, pred_entries)):
                    true_dict = true_entry.model_dump_comparable()
                    pred_dict = pred_entry.model_dump_comparable()
                    entry_comparison_result = {
                        "entry_index": j,
                        "status": "Match",
                        "field_errors": []
                    }
                    all_fields_match_in_entry = True

                    for field_name in field_names:
                        true_value = true_dict.get(field_name)
                        pred_value = pred_dict.get(field_name)

                        # Обновляем счётчик полей
                        results["per_field_accuracy"][field_name]["total"] += 1
                        total_fields_count += 1

                        # Сравниваем
                        if true_value == pred_value or (is_empty(true_value) and is_empty(pred_value)):
                            results["per_field_accuracy"][field_name]["correct"] += 1
                            correct_fields_count += 1
                        else:
                            all_fields_match_in_entry = False
                            is_exact_match_for_log = False
                            entry_comparison_result["status"] = "Field Mismatch"
                            entry_comparison_result["field_errors"].append({
                                "field": field_name,
                                "expected": true_value,
                                "predicted": pred_value
                            })

                    log_mismatch_details["entry_comparison"].append(entry_comparison_result)
                    if all_fields_match_in_entry:
                        matched_count_for_log += 1

                # TP = кол-во записей, где все поля совпали
                tp_for_log = matched_count_for_log
                # FP, FN = 0 в случае, если n_true == n_pred. Иначе ниже доначислим
                fp_for_log = 0
                fn_for_log = 0

                total_tp_entries += tp_for_log
                total_fp_entries += fp_for_log
                total_fn_entries += fn_for_log

                # Если все поля во всех записях совпали (и длины совпали), считаем exact match
                if is_exact_match_for_log:
                    exact_matches_count += 1

                # Если нашлись несоответствия
                if not is_exact_match_for_log:
                    log_mismatch_details["status"] = "Field Mismatch"
                    results["mismatched_examples"].append(log_mismatch_details)

        else:
            # --------------------- НОВАЯ ЛОГИКА (порядок не важен) ---------------------
            log_mismatch_details["status"] = "Length Mismatch"
            log_mismatch_details["entry_count_mismatch"] = {
                "expected_count": n_true,
                "predicted_count": n_pred
            }

            # 1) Составим списки индексов для GT (ground_truth) и Pred (предсказаний)
            unmatched_true = list(range(n_true))
            unmatched_pred = list(range(n_pred))
            # Сопоставленные пары (gt_idx, pred_idx)
            matched_pairs = []

            # Функция для подсчёта кол-ва совпавших полей
            def count_matching_fields(e1: FieldWorkEntry, e2: FieldWorkEntry) -> int:
                d1 = e1.model_dump_comparable()
                d2 = e2.model_dump_comparable()
                count = 0
                for fn in field_names:
                    v1 = d1.get(fn)
                    v2 = d2.get(fn)
                    # Совпадение: либо равенство, либо оба "пустые"
                    if v1 == v2 or (is_empty(v1) and is_empty(v2)):
                        count += 1
                return count

            # 2) Сначала связываем полностью идентичные записи (все поля совпадают)
            full_matches = []
            to_remove_true = []
            to_remove_pred = []
            for gt_i in unmatched_true:
                for pr_i in unmatched_pred:
                    if count_matching_fields(true_entries[gt_i], pred_entries[pr_i]) == len(field_names):
                        full_matches.append((gt_i, pr_i))
                        to_remove_true.append(gt_i)
                        to_remove_pred.append(pr_i)
                        # Сразу же выходим из внутреннего цикла, т.к. один pred не может быть использован дважды
                        break

            # Удалим из списков эти совпавшие пары
            for gt_i, pr_i in full_matches:
                matched_pairs.append((gt_i, pr_i))
            for x in to_remove_true:
                if x in unmatched_true:
                    unmatched_true.remove(x)
            for x in to_remove_pred:
                if x in unmatched_pred:
                    unmatched_pred.remove(x)

            # 3) Пока остались неиспользованные записи, сопоставляем самые "похожие" (по кол-ву совпавших полей)
            while unmatched_true and unmatched_pred:
                best_gt, best_pr = None, None
                best_match_count = -1
                for gt_i in unmatched_true:
                    for pr_i in unmatched_pred:
                        match_count = count_matching_fields(true_entries[gt_i], pred_entries[pr_i])
                        if match_count > best_match_count:
                            best_match_count = match_count
                            best_gt = gt_i
                            best_pr = pr_i

                # Если самая похожая пара вовсе не совпадает ни по одному полю — смысла дальше нет
                if best_match_count <= 0:
                    break

                # Иначе считаем эту пару сопоставленной
                matched_pairs.append((best_gt, best_pr))
                unmatched_true.remove(best_gt)
                unmatched_pred.remove(best_pr)

            # Далее формируем сравнение для каждой сопоставленной пары
            matched_count_for_log = 0
            for j, (gt_idx, pr_idx) in enumerate(matched_pairs):
                true_entry = true_entries[gt_idx]
                pred_entry = pred_entries[pr_idx]
                true_dict = true_entry.model_dump_comparable()
                pred_dict = pred_entry.model_dump_comparable()

                entry_comparison_result = {
                    "entry_index": j,  # индекс "пары" для отчёта
                    "status": "Match",
                    "field_errors": []
                }
                all_fields_match_in_entry = True

                for field_name in field_names:
                    true_value = true_dict.get(field_name)
                    pred_value = pred_dict.get(field_name)

                    # Обновляем счетчик общего количества полей
                    results["per_field_accuracy"][field_name]["total"] += 1
                    total_fields_count += 1

                    # Сравниваем значения
                    if true_value == pred_value or (is_empty(true_value) and is_empty(pred_value)):
                        results["per_field_accuracy"][field_name]["correct"] += 1
                        correct_fields_count += 1
                    else:
                        all_fields_match_in_entry = False
                        entry_comparison_result["status"] = "Field Mismatch"
                        entry_comparison_result["field_errors"].append({
                            "field": field_name,
                            "expected": true_value,
                            "predicted": pred_value
                        })

                log_mismatch_details["entry_comparison"].append(entry_comparison_result)
                if all_fields_match_in_entry:
                    matched_count_for_log += 1

            # TP — это сколько пар совпало по всем полям
            tp_for_log = matched_count_for_log
            # FP — это оставшиеся unmatched_pred
            fp_for_log = len(unmatched_pred)
            # FN — это оставшиеся unmatched_true
            fn_for_log = len(unmatched_true)

            total_tp_entries += tp_for_log
            total_fp_entries += fp_for_log
            total_fn_entries += fn_for_log

            # Отметим в отчёте "Extra" и "Missing" записи
            for leftover_gt in unmatched_true:
                log_mismatch_details["entry_comparison"].append({
                    "entry_index": leftover_gt,
                    "status": "Missing",
                    "field_errors": []
                })
            for leftover_pr in unmatched_pred:
                log_mismatch_details["entry_comparison"].append({
                    "entry_index": leftover_pr,
                    "status": "Extra",
                    "field_errors": []
                })

            # Проверка на "полное совпадение": если все записи совпали и все поля внутри них совпали
            # и их кол-во при этом в сумме совпадало — теоретически это редкая ситуация,
            # ведь мы сюда попадаем только если n_true != n_pred. Но если вдруг...
            if tp_for_log == n_true == n_pred:
                # Это значит все записи идентичны
                is_exact_match_for_log = True
            else:
                is_exact_match_for_log = False

            if not is_exact_match_for_log:
                # Если были расхождения, запишем в mismatched
                # Статус уже "Length Mismatch", но если были расхождения в полях, пусть так и будет.
                log_mismatch_details["status"] = "Length Mismatch"  # или "Field Mismatch", но оставим как есть
                results["mismatched_examples"].append(log_mismatch_details)
            else:
                # Точное совпадение
                exact_matches_count += 1

        # --- конец развилки по кол-ву записей ---

    # --- Расчёт итоговых метрик ---
    num_evaluated = results["overall_metrics"]["total_examples_evaluated"]

    # Field Accuracy
    if total_fields_count > 0:
        results["overall_metrics"]["field_accuracy"] = round((correct_fields_count / total_fields_count) * 100, 2)
    else:
        results["overall_metrics"]["field_accuracy"] = 100.0 if num_evaluated > 0 else 0.0

    # Exact Match Accuracy
    if num_evaluated > 0:
        results["overall_metrics"]["exact_match_accuracy"] = round((exact_matches_count / num_evaluated) * 100, 2)

    # Per-Field Accuracy
    for field in field_names:
        field_stats = results["per_field_accuracy"][field]
        if field_stats["total"] > 0:
            field_stats["accuracy"] = round((field_stats["correct"] / field_stats["total"]) * 100, 2)
        else:
            field_stats["accuracy"] = 100.0  # или "N/A"

    # Entry Level Precision, Recall, F1
    entry_metrics = results["overall_metrics"]["entry_level_metrics"]
    entry_metrics["total_true_entries"] = total_true_entries
    entry_metrics["total_predicted_entries"] = total_pred_entries
    entry_metrics["total_correctly_matched_entries"] = total_tp_entries
    entry_metrics["total_extra_entries"] = total_fp_entries
    entry_metrics["total_missing_entries"] = total_fn_entries

    # Precision = TP / (TP + FP)
    precision_denom = total_tp_entries + total_fp_entries
    if precision_denom > 0:
        precision = total_tp_entries / precision_denom
        entry_metrics["precision"] = round(precision * 100, 2)
    elif total_pred_entries == 0 and total_true_entries == 0:
        entry_metrics["precision"] = 100.0
    else:
        entry_metrics["precision"] = 0.0

    # Recall = TP / (TP + FN) == TP / total_true_entries
    recall_denom = total_tp_entries + total_fn_entries
    if recall_denom > 0:
        recall = total_tp_entries / recall_denom
        entry_metrics["recall"] = round(recall * 100, 2)
    elif total_true_entries == 0:
        entry_metrics["recall"] = 100.0
    else:
        entry_metrics["recall"] = 0.0

    # F1
    precision_val = entry_metrics["precision"] / 100
    recall_val = entry_metrics["recall"] / 100
    if (precision_val + recall_val) > 0:
        f1 = 2 * (precision_val * recall_val) / (precision_val + recall_val)
        entry_metrics["f1_score"] = round(f1 * 100, 2)
    elif precision_val == 1.0 and recall_val == 1.0:
        entry_metrics["f1_score"] = 100.0
    else:
        entry_metrics["f1_score"] = 0.0

    # Сохраняем результаты в JSON
    try:
        # Преобразуем defaultdict в обычный dict для сериализации
        results["per_field_accuracy"] = dict(results["per_field_accuracy"])
        with open(output_json_path, 'w', encoding='utf-8') as f:
            json.dump(results, f, ensure_ascii=False, indent=4)
        print(f"Результаты оценки сохранены в: {output_json_path}")
    except IOError as e:
        print(f"Ошибка при сохранении JSON файла: {e}")
    except TypeError as e:
        print(f"Ошибка сериализации JSON: {e}. Проверьте типы данных в результатах.")

    return results["overall_metrics"]

#### Подсказки по заполнению схемы

In [None]:
schema_hints = PROMPTS_CONFIG.get("baseline_schema_hints")

In [None]:
def convert_ground_truth_format(new_format_data: List[dict]) -> List[dict]:
    """
    Преобразует новый формат ground truth (input/output) в старый формат (message/entries).

    Args:
        new_format_data: список словарей с ключами 'input' и 'output'.

    Returns:
        Список словарей с ключами 'message' и 'entries'.
    """
    converted = []
    for item in new_format_data:
        message = item.get("input", "")
        entries = item.get("output", {}).get("entries", [])
        converted.append({
            "message": message,
            "entries": entries
        })
    return converted

In [None]:
def evaluate_model_on_dataset(
    important_data_dir: str,
    main_dir: str,
    model,
    FieldWorkLog,
    generate_few_shot_text,
    run_model_on_structured_data_sync,
    BASELINE_SYSTEM_PROMPT_TEMPLATE,
    schema_hints,
    evaluate_predictions,
    convert_ground_truth_format,
    test_filename: str = "test_dataset.json",
    few_shot_filename: str = "new_few_shots_auto_mode_data.json",
    output_filename: str = "deepseek_v3_preza_evaluation_results.json",
    verbose: bool = True
):
    # Загрузка тестового датасета
    test_dataset_json_path = os.path.join(important_data_dir, test_filename)
    with open(test_dataset_json_path, "r", encoding="utf-8") as f:
        ground_truth = json.load(f)

    # Загрузка few-shot примеров
    few_shot_json_path = os.path.join(important_data_dir, few_shot_filename)
    with open(few_shot_json_path, "r", encoding="utf-8") as f:
        few_shot_json_data = json.load(f)

    # Преобразование output в сериализуемую структуру
    serializable_results = [
        {
            "input": ex["input"],
            "output": FieldWorkLog(**ex["output"]),
            "explanation": ex.get("explanation", "")
        }
        for ex in few_shot_json_data
    ]

    # Генерация текста для few-shot примеров
    few_shot_examples_str = generate_few_shot_text(serializable_results)

    # Запуск модели
    results = run_model_on_structured_data_sync(
        ground_truth,
        [],
        FieldWorkLog,
        BASELINE_SYSTEM_PROMPT_TEMPLATE,
        model,
        few_shot_examples_str,
        schema_hints,
        # temperature=0.1,
        # sleep_time=5,
    )

    # Сохранение и вычисление метрик
    eval_save_path = os.path.join(main_dir, output_filename)
    evaluation_results = evaluate_predictions(
        convert_ground_truth_format(ground_truth),
        results,
        [],
        eval_save_path
    )

    if verbose:
        print("\nОбщие метрики:")
        print(json.dumps(evaluation_results, indent=4, ensure_ascii=False))

    return evaluation_results

In [None]:
evaluation = evaluate_model_on_dataset(
    important_data_dir=important_data_dir,
    main_dir=main_dir,
    model=model,
    FieldWorkLog=FieldWorkLog,
    generate_few_shot_text=generate_few_shot_text,
    run_model_on_structured_data_sync=run_model_on_structured_data_sync,
    BASELINE_SYSTEM_PROMPT_TEMPLATE=BASELINE_SYSTEM_PROMPT_TEMPLATE,
    schema_hints=schema_hints,
    evaluate_predictions=evaluate_predictions,
    convert_ground_truth_format=convert_ground_truth_format
)

Результаты оценки сохранены в: drive/MyDrive/LLM_coding_challenge/deepseek_v3_preza_evaluation_results.json

Общие метрики:
{
    "total_examples_evaluated": 106,
    "field_accuracy": 99.9,
    "exact_match_accuracy": 97.17,
    "entry_level_metrics": {
        "precision": 100.0,
        "recall": 100.0,
        "f1_score": 100.0,
        "total_true_entries": 363,
        "total_predicted_entries": 363,
        "total_correctly_matched_entries": 360,
        "total_extra_entries": 0,
        "total_missing_entries": 0
    }
}


### Определение спама
Является ли пришедшее на вход сообщение записями агронома, которые нужно обработать или нет.

In [None]:
SPAM_FILTER_SYSTEM_PROMPT_TEMPLATE = Template(PROMPTS_CONFIG.get("spam_filter_system_prompt_template"))
spam_filter_few_shot_examples_str = generate_few_shot_text(PROMPTS_CONFIG.get("spam_filter_few_shot_examples_str"))

#### Схема для Structured Output

In [None]:
class MessageClassification(BaseModel):
    explanation: str = Field(
        ...,
        description="Short explanation in Russian why the message should or should not be processed."
    )
    message_type: Literal["field_report", "non_report"] = Field(
        ...,
        description="Message classification result: 'field_report' — if it's a pure fieldwork report, 'non_report' — if it's a question, discussion, planning, or irrelevant content."
    )

#### Функция для классификации в асинхронном режиме

In [None]:
async def classify_message_async(
    message: str,
    model_output_schema: Type,
    few_shot_examples_classification,
    system_prompt_template,
    model,):
    prompt = system_prompt_template.render(
        json_schema=model_output_schema.model_json_schema(),
        few_shot_examples_str=few_shot_examples_classification,
        message=message)

    answer = await model.with_structured_output(model_output_schema).ainvoke(
        [SystemMessage(content=prompt)]
    )
    return answer

def classify_message_sync(
    message: str,
    model_output_schema: Type,
    few_shot_examples_classification,
    system_prompt_template,
    model,):
    prompt = system_prompt_template.render(
        json_schema=model_output_schema.model_json_schema(),
        few_shot_examples_str=few_shot_examples_classification,
        message=message)

    answer = model.with_structured_output(model_output_schema).invoke(
        [SystemMessage(content=prompt)]
    )
    return answer

#### Проверим на тестовых данных (в идеале не должно помечать как спам, потому что это действительные записи агрономов)

In [None]:
def classify_messages(
    dataset_path: str,
    model,
    MessageClassification,
    classify_message_sync,
    few_shot_examples_class_str: str,
    system_prompt_template,
    sleep_seconds: int = 5,
    verbose: bool = True
) -> List[Dict]:
    """
    Классифицирует сообщения из JSON-файла и выводит сводку.

    :param dataset_path: путь к JSON-файлу с сообщениями
    :param model: модель для классификации
    :param MessageClassification: Pydantic-схема результата
    :param classify_message_sync: функция запуска модели
    :param few_shot_examples_class_str: few-shot примеры в виде строки
    :param system_prompt_template: шаблон системного промпта (Template)
    :param sleep_seconds: пауза между запросами
    :param verbose: выводить ли статистику после классификации
    :return: список классифицированных результатов
    """
    with open(dataset_path, "r", encoding="utf-8") as f:
        test_data = json.load(f)

    field_report_count = 0
    non_report_count = 0
    classified_results = []

    for item in test_data:
        message_text = item["input"]

        result = classify_message_sync(
            message=message_text,
            model_output_schema=MessageClassification,
            few_shot_examples_classification=few_shot_examples_class_str,
            system_prompt_template=system_prompt_template,
            model=model,
        )

        classified_results.append({
            "input": message_text,
            "classification": result.message_type,
            "explanation": result.explanation,
        })

        if result.message_type == "field_report":
            field_report_count += 1
        else:
            non_report_count += 1

        if sleep_seconds:
            time.sleep(sleep_seconds)

    if verbose:
        print("\n📊 Классификация завершена:")
        print(f"  - Всего сообщений:         {len(test_data)}")
        print(f"  - field_report:            {field_report_count}")
        print(f"  - non_report (спам):       {non_report_count}")

    return classified_results

In [None]:
results = classify_messages(
    dataset_path=os.path.join(important_data_dir, "test_dataset.json"),
    model=model,
    MessageClassification=MessageClassification,
    classify_message_sync=classify_message_sync,
    few_shot_examples_class_str=few_shot_examples_class_str,
    system_prompt_template=SPAM_FILTER_SYSTEM_PROMPT_TEMPLATE,
)


📊 Классификация завершена:
  - Всего сообщений:         106
  - field_report:            106
  - non_report (спам):       0


### Режим работы с обратной связью

Модель может дать обратную связь в случае если посчитала что агроном неправильно отписал в сообщении

#### Схема для Structured Output

In [None]:
def create_annotated_field_work_log_schema(
    department_set: Tuple[str],
    operation_set: Tuple[str],
    crop_set: Tuple[str],
):
    # --- Аннотированные типы для department_name ---
    class DepartmentValid(BaseModel):
        status: Literal['valid']
        value: Literal[department_set] = Field(..., description="Корректное название подразделения")

    class DepartmentPredict(BaseModel):
        status: Literal['predict']
        value: Literal[department_set] = Field(..., description="Наиболее вероятное название подразделения")
        explanation: str = Field(..., description="Почему выбрано это значение")

    class DepartmentRaw(BaseModel):
        status: Literal['raw']
        value: str = Field(..., description="Произвольное значение подразделения")
        explanation: str = Field(..., description="Почему сохранено исходное значение")

    DepartmentNameAnnotated = Union[DepartmentValid, DepartmentPredict, DepartmentRaw]

    # --- Аннотированные типы для operation ---
    class OperationValid(BaseModel):
        status: Literal['valid']
        value: Literal[operation_set] = Field(..., description="Корректное название операции")

    class OperationPredict(BaseModel):
        status: Literal['predict']
        value: Literal[operation_set] = Field(..., description="Наиболее вероятное название операции")
        explanation: str = Field(..., description="Почему выбрано это значение")

    class OperationRaw(BaseModel):
        status: Literal['raw']
        value: str = Field(..., description="Произвольное значение операции")
        explanation: str = Field(..., description="Почему сохранено исходное значение")

    OperationAnnotated = Union[OperationValid, OperationPredict, OperationRaw]

    # --- Аннотированные типы для crop ---
    class CropValid(BaseModel):
        status: Literal['valid']
        value: Literal[crop_set] = Field(..., description="Корректное название культуры")

    class CropPredict(BaseModel):
        status: Literal['predict']
        value: Literal[crop_set] = Field(..., description="Наиболее вероятное название культуры")
        explanation: str = Field(..., description="Почему выбрано это значение")

    class CropRaw(BaseModel):
        status: Literal['raw']
        value: str = Field(..., description="Произвольное значение культуры")
        explanation: str = Field(..., description="Почему сохранено исходное значение")

    CropAnnotated = Union[CropValid, CropPredict, CropRaw]

    # --- Основная запись ---
    class FieldWorkEntryAnnotated(BaseModel):
        date: Optional[str] = Field(
            None, description="Дата проведения операции (формат: 'мм-дд')"
        )

        department_name: DepartmentNameAnnotated = Field(..., description="Название подразделения с аннотацией")
        operation: OperationAnnotated = Field(..., description="Название операции с аннотацией")
        crop: CropAnnotated = Field(..., description="Название культуры с аннотацией")

        processed_area_day: int = Field(..., description="Обработанная площадь за день, в гектарах")
        processed_area_total: Optional[int] = Field(..., description="Общая обработанная площадь с начала операции, в гектарах")
        yield_kg_day: Optional[int] = Field(None, description="Валовая продукция за день, в килограммах")
        yield_kg_total: Optional[int] = Field(None, description="Суммарная валовая продукция с начала операции, в килограммах")

        def model_dump_comparable(self) -> Dict[str, Any]:
            dumped = self.model_dump()
            for key, value in dumped.items():
                if value in [None, '', [], {}, float('nan')]:
                    dumped[key] = None
            return dumped

    # --- Список записей ---
    class FieldWorkLogAnnotated(BaseModel):
        entries: List[FieldWorkEntryAnnotated] = Field(
            ..., description="Список записей о полевых операциях с аннотированными полями"
        )

    return FieldWorkEntryAnnotated, FieldWorkLogAnnotated

In [None]:
FieldWorkEntryAnnotated, FieldWorkLogAnnotated = create_annotated_field_work_log_schema(
    department_set=department_set,
    operation_set=operation_set,
    crop_set=crop_set,
)

#### few shots

#### Prompt

In [None]:
MODE_DEMO_SYSTEM_PROMPT_TEMPLATE = Template(PROMPTS_CONFIG.get("mode_demo_system_prompt_template"))

In [None]:
demo_mode_schema_hints = PROMPTS_CONFIG.get("demo_mode_schema_hints")

In [None]:
def convert_annotated_to_strict(log_annotated: FieldWorkLogAnnotated) -> FieldWorkLog:
    # Переводит формат из режима с обратной связью в строгий формат с чёткими полями
    converted_entries: List[FieldWorkEntry] = []

    for entry in log_annotated.entries:
        # Если хотя бы одно из трёх полей помечено как raw — пропускаем
        if (
            entry.operation.status == "raw"
            or entry.department_name.status == "raw"
            or entry.crop.status == "raw"
        ):
            continue  # не включаем такую запись в результат

        converted_entries.append(FieldWorkEntry(
            date=entry.date,
            department_name=entry.department_name.value,
            operation=entry.operation.value,
            crop=entry.crop.value,
            processed_area_day=entry.processed_area_day,
            processed_area_total=entry.processed_area_total,
            yield_kg_day=entry.yield_kg_day,
            yield_kg_total=entry.yield_kg_total
        ))

    return FieldWorkLog(entries=converted_entries)

In [None]:
def run_demo_mode_evaluation(
    important_data_dir: str,
    main_dir: str,
    model,
    FieldWorkLogAnnotated,
    generate_few_shot_text: Callable,
    run_model_on_structured_data: Callable,
    MODE_DEMO_SYSTEM_PROMPT_TEMPLATE,
    demo_mode_schema_hints: Dict,
    convert_annotated_to_strict: Callable,
    evaluate_predictions: Callable,
    convert_ground_truth_format: Callable,
    test_filename: str = "test_dataset.json",
    few_shot_filename: str = "emo_few_shot_examples.json",
    raw_output_filename: str = "demo_mode_iter_2_evaluation_results.json",
    final_output_filename: str = "demo_mode_iter_2_evaluation_results.json",
    temperature: float = 0.1,
    sleep_time: int = 3,
    verbose: bool = True
) -> Dict:
    """
    Запускает оценку модели в demo-режиме (annotated schema).

    :return: словарь с метриками
    """
    # Загружаем few-shot примеры
    few_shot_path = os.path.join(important_data_dir, few_shot_filename)
    with open(few_shot_path, "r", encoding="utf-8") as f:
        few_shot_data = json.load(f)

    for item in few_shot_data:
        item["output"] = FieldWorkLogAnnotated(**item["output"])

    few_shot_examples_str = generate_few_shot_text(few_shot_data)

    # Загружаем тестовый датасет
    test_path = os.path.join(important_data_dir, test_filename)
    with open(test_path, "r", encoding="utf-8") as f:
        ground_truth = json.load(f)

    # Запускаем модель
    results = run_model_on_structured_data(
        ground_truth,
        [],
        FieldWorkLogAnnotated,
        MODE_DEMO_SYSTEM_PROMPT_TEMPLATE,
        model,
        few_shot_examples_str,
        demo_mode_schema_hints,
        temperature=temperature,
        sleep_time=sleep_time,
    )

    # Сохраняем "сырые" результаты
    serializable_results = [
        {
            "input": res["input"],
            "output": res["output"].model_dump(),
            "explanation": res.get("explanation", "")
        }
        for res in results
    ]
    raw_save_path = os.path.join(main_dir, raw_output_filename)
    with open(raw_save_path, "w", encoding="utf-8") as f:
        json.dump(serializable_results, f, ensure_ascii=False, indent=2)

    # Преобразуем результаты в strict-схему
    for res in results:
        res["output"] = convert_annotated_to_strict(res["output"])

    # Оцениваем и сохраняем метрики
    eval_save_path = os.path.join(main_dir, final_output_filename)
    evaluation_results = evaluate_predictions(
        convert_ground_truth_format(ground_truth),
        results,
        [],
        eval_save_path
    )

    if verbose:
        print("\nОбщие метрики:")
        print(json.dumps(evaluation_results, indent=4, ensure_ascii=False))

    return evaluation_results

In [None]:
metrics = run_demo_mode_evaluation(
    important_data_dir=important_data_dir,
    main_dir=main_dir,
    model=model,
    FieldWorkLogAnnotated=FieldWorkLogAnnotated,
    generate_few_shot_text=generate_few_shot_text,
    run_model_on_structured_data=run_model_on_structured_data,
    MODE_DEMO_SYSTEM_PROMPT_TEMPLATE=MODE_DEMO_SYSTEM_PROMPT_TEMPLATE,
    demo_mode_schema_hints=demo_mode_schema_hints,
    convert_annotated_to_strict=convert_annotated_to_strict,
    evaluate_predictions=evaluate_predictions,
    convert_ground_truth_format=convert_ground_truth_format,
)

Результаты оценки сохранены в: drive/MyDrive/LLM_coding_challenge/deepseek_demo_mode_iter_2_evaluation_results.json

Общие метрики:
{
    "total_examples_evaluated": 106,
    "field_accuracy": 99.72,
    "exact_match_accuracy": 95.28,
    "entry_level_metrics": {
        "precision": 98.89,
        "recall": 100.0,
        "f1_score": 99.44,
        "total_true_entries": 363,
        "total_predicted_entries": 367,
        "total_correctly_matched_entries": 355,
        "total_extra_entries": 4,
        "total_missing_entries": 0
    }
}
