In [5]:
import ast
import re
from dataclasses import dataclass
from typing import List, Tuple, Dict, Any

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn_crfsuite import CRF

In [6]:
# ========= 1) Чтение train =========

def read_train_semicolon(path: str) -> pd.DataFrame:
    df = pd.read_csv(path, sep=';', encoding='utf-8')
    assert {'sample', 'annotation'}.issubset(df.columns), "Ожидаются колонки: sample; annotation"
    df = df.rename(columns={'sample': 'search_query'})
    # приводим к строкам
    df['search_query'] = df['search_query'].astype(str)
    df['annotation'] = df['annotation'].astype(str)
    return df


# ========= 2) Парсер annotation =========

def normalize_tag(tag: str) -> str:
    t = str(tag).strip().upper()
    # на всякий, если встретятся редкие опечатки
    t = t.replace("IBRAND", "I-BRAND").replace("BBRAND", "B-BRAND")
    # оставляем только допустимые типы
    return t

def parse_annotation(cell: str) -> List[Tuple[int, int, str]]:
    if not isinstance(cell, str) or not cell.strip():
        return []
    try:
        items = ast.literal_eval(cell)
        out = []
        for it in items:
            if isinstance(it, (list, tuple)) and len(it) == 3:
                s, e, t = it
                out.append((int(s), int(e), normalize_tag(t)))
        return out
    except Exception:
        tuples = re.findall(r"\(\s*(\d+)\s*,\s*(\d+)\s*,\s*'([^']+)'\s*\)", cell)
        return [(int(s), int(e), normalize_tag(t)) for s, e, t in tuples]


# ========= 3) Токены с индексами =========

_token_re = re.compile(r'\S+')  # последовательности непробельных символов

@dataclass
class Token:
    text: str
    start: int  # включительно
    end: int    # исключительно

def tokenize_with_offsets(text: str) -> List[Token]:
    return [Token(m.group(0), m.start(), m.end()) for m in _token_re.finditer(text)]


# ========= 4) BIO по токенам из символьных спанов =========

ALL_ENTITY_TYPES = ["TYPE", "BRAND", "VOLUME", "PERCENT"]

def spans_to_token_bio(tokens: List[Token], spans: List[Tuple[int, int, str]]) -> List[str]:
    labels = ["O"] * len(tokens)
    # нормализуем и уберём мусор
    cleaned = []
    for s, e, tag in spans:
        tag = normalize_tag(tag)
        if not re.fullmatch(r'[BI]-(TYPE|BRAND|VOLUME|PERCENT)', tag):
            continue
        ent = tag.split('-', 1)[1]
        cleaned.append((s, e, ent))

    for s, e, ent in cleaned:
        first = True
        for i, tok in enumerate(tokens):
            # пересечение интервалов [tok.start,tok.end) и [s,e)
            if not (tok.end <= s or e <= tok.start):
                labels[i] = f"{'B' if first else 'I'}-{ent}"
                first = False
    return labels


# ========= 5) Фичи для CRF (простые, но рабочие) =========

def shape(word: str) -> str:
    out = []
    for ch in word:
        if ch.isalpha():
            out.append('X')
        elif ch.isdigit():
            out.append('d')
        else:
            out.append('_')
    return ''.join(out)

def token2features(sent_tokens: List[Token], i: int) -> Dict[str, Any]:
    w = sent_tokens[i].text
    wl = w.lower()
    feats = {
        "bias": 1.0,
        "w.lower": wl,
        "w[-3:]": wl[-3:],
        "w[-2:]": wl[-2:],
        "w[:2]": wl[:2],
        "w[:3]": wl[:3],
        "shape": shape(w),
        "is_alpha": w.isalpha(),
        "is_digit": w.isdigit(),
        "has_digit": any(c.isdigit() for c in w),
        "has_pct": ('%' in w) or ('процент' in wl),
        "has_comma": (',' in w),
        "has_dot": ('.' in w),
        "len": len(w),
        "is_latin": bool(re.search(r'[A-Za-z]', w)),
        "has_hyphen": '-' in w,
    }
    if i > 0:
        w1 = sent_tokens[i-1].text
        feats.update({"-1:w.lower": w1.lower(), "-1:shape": shape(w1)})
    else:
        feats["BOS"] = True
    if i < len(sent_tokens)-1:
        w1 = sent_tokens[i+1].text
        feats.update({"+1:w.lower": w1.lower(), "+1:shape": shape(w1)})
    else:
        feats["EOS"] = True
    return feats

def sent2features(tokens: List[Token]) -> List[Dict[str, Any]]:
    return [token2features(tokens, i) for i in range(len(tokens))]


# ========= 6) Регулярки чисел (VOLUME/PERCENT) =========
# Простые и понятные: число (., или ,), потом опциональный пробел/дефис, дальше единицы с разными формами.
VOLUME_RE = re.compile(
    r"""(?ix)
    (?<!\w)
    (?P<num>\d+(?:[.,]\d+)?)
    \s*[-]?\s*
    (?P<u>
        мл|миллилитр\w*|
        л|литр\w*|
        г|гр|грамм\w*|
        кг|
        шт|штук\w*
    )
    \b
    """
)

PERCENT_RE = re.compile(
    r"""(?ix)
    (?<!\w)
    (?:
        (?P<num>\d+(?:[.,]\d+)?)\s*%      # 2.5%
        |
        (?P<numw>\d+(?:[.,]\d+)?)\s*
        (процент\w*)                      # 2,5 процента / процентов
    )
    """
)

def find_rule_spans(text: str) -> List[Tuple[int, int, str]]:
    out = []
    for m in VOLUME_RE.finditer(text):
        out.append((m.start(), m.end(), "B-VOLUME"))
    for m in PERCENT_RE.finditer(text):
        out.append((m.start(), m.end(), "B-PERCENT"))
    return out


# ========= 7) Корпус для обучения =========

@dataclass
class SentExample:
    tokens: List[Token]
    labels: List[str]

def build_corpus(df: pd.DataFrame) -> List[SentExample]:
    data = []
    for _, row in df.iterrows():
        q = row["search_query"]
        spans = parse_annotation(row["annotation"])
        toks = tokenize_with_offsets(q)
        labs = spans_to_token_bio(toks, spans)
        data.append(SentExample(tokens=toks, labels=labs))
    return data


# ========= 8) Обучение CRF =========

def train_crf(train_data: List[SentExample]) -> CRF:
    X = [sent2features(s.tokens) for s in train_data]
    y = [s.labels for s in train_data]
    crf = CRF(
        algorithm="lbfgs",
        c1=0.1, c2=0.1,
        max_iterations=150,
        all_possible_transitions=True,
    )
    crf.fit(X, y)
    return crf


# ========= 9) BIO -> символьные спаны =========

def bio_to_spans(tokens: List[Token], labels: List[str]) -> List[Tuple[int, int, str]]:
    spans = []
    cur_ent, cur_s, cur_e = None, None, None

    def push():
        if cur_ent is not None and cur_s is not None and cur_e is not None:
            spans.append((cur_s, cur_e, f"B-{cur_ent}"))

    for tok, lab in zip(tokens, labels):
        if lab == "O" or lab is None:
            if cur_ent is not None:
                push()
                cur_ent, cur_s, cur_e = None, None, None
            continue

        if '-' not in lab:
            if cur_ent is not None:
                push()
                cur_ent, cur_s, cur_e = None, None, None
            continue

        pref, ent = lab.split('-', 1)
        if pref == "B":
            if cur_ent is not None:
                push()
            cur_ent, cur_s, cur_e = ent, tok.start, tok.end
        elif pref == "I":
            if cur_ent == ent:
                cur_e = tok.end
            else:
                if cur_ent is not None:
                    push()
                cur_ent, cur_s, cur_e = ent, tok.start, tok.end

    if cur_ent is not None:
        push()
    return spans


# ========= 10) Инференс одной строки =========

def predict_spans(crf: CRF, text: str) -> List[Tuple[int, int, str]]:
    tokens = tokenize_with_offsets(text)
    rule_spans = find_rule_spans(text)
    X = [sent2features(tokens)]
    y_hat = crf.predict(X)[0] if tokens else []
    model_spans = bio_to_spans(tokens, y_hat)

    # Числовым сущностям даём приоритет правил
    final_spans = []
    def is_num(tag): return tag.endswith("VOLUME") or tag.endswith("PERCENT")

    for s, e, t in model_spans:
        ent = t.split('-', 1)[1]
        if ent in ("VOLUME", "PERCENT"):
            if any(not (e2 <= s or e <= s2) for s2, e2, t2 in rule_spans if t2.endswith(ent)):
                continue
        final_spans.append((s, e, f"B-{ent}"))

    final_spans.extend(rule_spans)
    final_spans.sort(key=lambda z: (z[0], z[1]))

    # Склейка одинаковых пересекающихся спанов
    merged = []
    for s, e, t in final_spans:
        if not merged:
            merged.append([s, e, t])
        else:
            ps, pe, pt = merged[-1]
            if pt == t and s <= pe:
                merged[-1][1] = max(pe, e)
            else:
                merged.append([s, e, t])
    return [(s, e, t) for s, e, t in merged]


# ========= 11) F1 по спанам (строгий) =========

def spans_exact_f1(y_true: List[List[Tuple[int,int,str]]],
                   y_pred: List[List[Tuple[int,int,str]]]) -> float:
    types = ["TYPE", "BRAND", "VOLUME", "PERCENT"]
    f1s = []
    for ent in types:
        tp = fp = fn = 0
        for t_sp, p_sp in zip(y_true, y_pred):
            t = {(s, e) for s, e, tag in t_sp if tag.endswith(ent)}
            p = {(s, e) for s, e, tag in p_sp if tag.endswith(ent)}
            inter = t & p
            tp += len(inter); fp += len(p - inter); fn += len(t - inter)
        prec = tp/(tp+fp) if tp+fp else 0.0
        rec  = tp/(tp+fn) if tp+fn else 0.0
        f1   = 2*prec*rec/(prec+rec) if prec+rec else 0.0
        f1s.append(f1)
    return float(np.mean(f1s))


In [7]:
TRAIN_PATH = ".data/train.csv"

df = read_train_semicolon(TRAIN_PATH)
df = df[df["search_query"].str.len() > 0].reset_index(drop=True)

# Парсим спаны
df["spans"] = df["annotation"].apply(parse_annotation)

# Разбиваем train -> train/valid
tr_df, va_df = train_test_split(df, test_size=0.15, random_state=42, shuffle=True)

# Собираем корпус
tr_data = build_corpus(tr_df)
va_data = build_corpus(va_df)

# Учим CRF
crf = train_crf(tr_data)

# Оцениваем
va_true_spans = va_df["spans"].tolist()
va_pred_spans = []
for text in va_df["search_query"].tolist():
    va_pred_spans.append(predict_spans(crf, text))
f1 = spans_exact_f1(va_true_spans, va_pred_spans)
print(f"Validation macro-F1: {f1:.4f}")


Validation macro-F1: 0.6757
