In [6]:
from __future__ import annotations
import re
import ast
import math
import json
import random
import itertools
from dataclasses import dataclass, field
from typing import List, Tuple, Dict, Any, Optional
from collections import defaultdict, Counter

import numpy as np
import pandas as pd

try:
    import pymorphy2
    _MORPH = pymorphy2.MorphAnalyzer()
except Exception:
    _MORPH = None

from sklearn.model_selection import KFold
from sklearn.preprocessing import LabelBinarizer
from sklearn_crfsuite import CRF

In [7]:
TAGS = ['O', 'B-BRAND', 'I-BRAND', 'B-TYPE', 'I-TYPE', 'B-VOLUME', 'I-VOLUME', 'B-PERCENT', 'I-PERCENT']

# предкомпилированные регексы
RE_NUM = re.compile(r'^\d+([.,]\d+)?$')
RE_UNIT = re.compile(r'(?i)^(мл|л|г|кг|шт|уп|пак|бут|бан|таб|мг|мм|см|м)$')
RE_NUM_UNIT_STUCK = re.compile(r'(?i)^\d{1,5}([.,]\d{1,2})?(мл|л|г|кг|шт|уп|пак|бут|бан|таб|мг|мм|см|м)$')
RE_VOLUME_ANY = re.compile(r'(?i)\b\d{1,5}([.,]\d{1,2})?\s?(мл|л|г|кг|шт|уп|пак|бут|бан|таб|мг|мм|см|м)\b')
RE_PERCENT = re.compile(r'(?i)\b\d{1,2}([.,]\d{1,2})?\s?%')
RE_HAS_TM = re.compile(r'[®™]')
RE_LAT = re.compile(r'[A-Za-z]')
RE_CYR = re.compile(r'[А-Яа-яЁё]')

In [8]:
def mixed_script(s: str) -> bool:
    return bool(RE_LAT.search(s) and RE_CYR.search(s))

def word_shape(s: str) -> str:
    out = []
    for ch in s:
        if ch.isdigit(): out.append('d')
        elif ch.isalpha(): out.append('X' if ch.isupper() else 'x')
        else: out.append(ch)
    return ''.join(out)

def safe_lemma(s: str) -> str:
    if not _MORPH:
        return s.lower()
    try:
        p = _MORPH.parse(s)[0]
        return p.normal_form
    except Exception:
        return s.lower()

def tokenize_with_offsets(text: str) -> List[Tuple[str, int, int]]:
    """
    Простая токенизация: числа, слова (лат/кир), отдельные символы.
    Важно сохранять оффсеты.
    """
    tokens = []
    for m in re.finditer(r'\d+[.,]?\d*%?|'       # числа и проценты
                         r'[A-Za-zА-Яа-яЁё]+'   # слова лат/кир
                         r'|[^\s\w]',           # одиночные знаки
                         text):
        tokens.append((m.group(0), m.start(), m.end()))
    return tokens

def spans_to_bio(tokens: List[Tuple[str,int,int]], spans: List[Tuple[int,int,str]], tagset: List[str]=TAGS) -> List[str]:
    """
    Преобразует char-спаны в токеновые BIO-ярлыки.
    Жёсткое правило: на один токен один тег; если спан пересекает — считаем попадание.
    """
    y = ['O'] * len(tokens)
    # нормализуем входные спаны
    norm = []
    for a,b,t in spans:
        t = t.strip()
        if t.startswith('B-') or t.startswith('I-') or t=='O':
            ent = t.split('-')[-1]
        else:
            ent = t
        norm.append((a,b,ent))

    # маркируем
    mark = [None]*len(tokens)
    for i,(tok, a, b) in enumerate(tokens):
        for sa,sb,ent in norm:
            overlap = max(0, min(b, sb) - max(a, sa))
            if overlap>0:
                mark[i] = ent
                break

    # в BIO
    prev_ent = None
    for i, ent in enumerate(mark):
        if ent is None:
            y[i] = 'O'
            prev_ent = None
        else:
            if prev_ent == ent:
                y[i] = f'I-{ent}'
            else:
                y[i] = f'B-{ent}'
            prev_ent = ent
    # фильтр к допустимому множеству
    y = [t if t in tagset else 'O' for t in y]
    return y

def bio_validate(tags: List[str]) -> List[str]:
    """BIO-валидатор: запрещаем I-* без предшествующего B-*. Исправляем на B-* или O."""
    res = tags[:]
    prev = 'O'
    prev_ent = None
    for i,t in enumerate(res):
        if t=='O':
            prev, prev_ent = 'O', None
            continue
        p = t.split('-',1)
        if len(p)!=2:
            res[i]='O'; prev,prev_ent='O',None; continue
        bi, ent = p
        if bi=='B':
            prev='B'; prev_ent=ent
        elif bi=='I':
            if prev in ('B','I') and prev_ent==ent:
                prev='I'
            else:
                # некорректный I-: превращаем в B-
                res[i]=f'B-{ent}'
                prev='B'; prev_ent=ent
    return res

In [9]:
def build_lexicons(train_df: pd.DataFrame) -> Dict[str,set]:
    """
    Строим простые лексиконы брендов/типов/юнитов по train.
    Ожидает колонки: text, spans (строка со списком кортежей).
    """
    brand, ttype, unit = set(), set(), set()
    for _,row in train_df.iterrows():
        text = str(row['text'])
        spans = parse_spans(row['spans'])
        toks = tokenize_with_offsets(text)
        bio = spans_to_bio(toks, spans)
        for (w,_,_), tag in zip(toks, bio):
            lw = w.lower()
            if tag.endswith('BRAND'):
                brand.add(lw)
            if tag.endswith('TYPE'):
                ttype.add(lw)
            if RE_UNIT.fullmatch(lw):
                unit.add(lw)
            # склейки типа 500мл учитываем как факт юнитов
            if RE_NUM_UNIT_STUCK.fullmatch(lw):
                unit.add(re.sub(r'^\d+([.,]\d+)?', '', lw))
    return {'brand': brand, 'type': ttype, 'unit': unit}

def parse_spans(spans_str: str) -> List[Tuple[int,int,str]]:
    if not isinstance(spans_str, str):
        return []
    s = spans_str.strip()
    if not s.startswith('[') or not s.endswith(']'):
        return []
    s = s.replace("’", "'").replace("‘", "'").replace("“", '"').replace("”", '"')
    try:
        val = ast.literal_eval(s)
        return [(int(a), int(b), str(c)) for a,b,c in val]
    except Exception:
        return []

In [10]:
@dataclass
class FeatureConfig:
    use_lemma: bool = True
    window: int = 2
    add_context_bigrams: bool = True

@dataclass
class FeatureBuilder:
    lexicons: Dict[str,set] = field(default_factory=lambda: {'brand':set(),'type':set(),'unit':set()})
    cfg: FeatureConfig = field(default_factory=FeatureConfig)

    def token_feats(self, sent: List[Tuple[str,int,int]], i: int) -> Dict[str,Any]:
        w, a, b = sent[i]
        lw = w.lower()
        feats = {
            'bias': 1.0,
            'w': lw,
            'shape': word_shape(w),
            'is_title': w.istitle(),
            'is_upper': w.isupper(),
            'is_digit': w.isdigit(),
            'len': len(w),
            'pre1': lw[:1], 'pre2': lw[:2], 'pre3': lw[:3], 'pre4': lw[:4],
            'suf1': lw[-1:], 'suf2': lw[-2:], 'suf3': lw[-3:], 'suf4': lw[-4:],
            'has_tm': bool(RE_HAS_TM.search(w)),
            'is_num_unit_stuck': bool(RE_NUM_UNIT_STUCK.fullmatch(lw)),
            'is_volume_like': bool(RE_VOLUME_ANY.search(w)),
            'is_percent_like': bool(RE_PERCENT.search(w)),
            'has_mixed_script': mixed_script(w),
            'in_brand_lex': lw in self.lexicons['brand'],
            'in_type_lex': lw in self.lexicons['type'],
            'in_unit_lex': lw in self.lexicons['unit'],
            'BOS': i==0, 'EOS': i==len(sent)-1,
        }
        if self.cfg.use_lemma:
            feats['lemma'] = safe_lemma(w)

        # контекст
        W = self.cfg.window
        for off in range(1, W+1):
            j = i-off
            if j>=0:
                wj = sent[j][0]
                feats.update({
                    f'-{off}:w': wj.lower(),
                    f'-{off}:shape': word_shape(wj)
                })
        for off in range(1, W+1):
            j = i+off
            if j<len(sent):
                wj = sent[j][0]
                feats.update({
                    f'+{off}:w': wj.lower(),
                    f'+{off}:shape': word_shape(wj)
                })

        if self.cfg.add_context_bigrams and len(sent)>1:
            if i>0:
                feats['-1_bigram'] = f"{sent[i-1][0].lower()}__{lw}"
            if i+1<len(sent):
                feats['+1_bigram'] = f"{lw}__{sent[i+1][0].lower()}"
        return feats

    def sent2features(self, sent: List[Tuple[str,int,int]]) -> List[Dict[str,Any]]:
        return [self.token_feats(sent, i) for i in range(len(sent))]

In [11]:
def make_crf(algorithm: str='lbfgs', c1: float=0.1, c2: float=0.1, max_iter: int=200, all_transitions: bool=True) -> CRF:
    return CRF(
        algorithm=algorithm,
        c1=c1, c2=c2,
        max_iterations=max_iter,
        all_possible_transitions=all_transitions,
        verbose=False
    )

def feature_variant(builder: FeatureBuilder, kind: str) -> FeatureBuilder:
    """
    A: лёгкие ортографические/регексы (use_lemma=False, window=1, no bigrams)
    B: добавляем лексиконы (как есть)
    C: усиливаем контекст (window=2, bigrams=True)
    """
    b = FeatureBuilder(lexicons=builder.lexicons, cfg=FeatureConfig(**vars(builder.cfg)))
    if kind=='A':
        b.cfg.use_lemma = False
        b.cfg.window = 1
        b.cfg.add_context_bigrams = False
    elif kind=='B':
        b.cfg.use_lemma = True
        b.cfg.window = 1
        b.cfg.add_context_bigrams = True
    elif kind=='C':
        b.cfg.use_lemma = True
        b.cfg.window = 2
        b.cfg.add_context_bigrams = True
    return b

In [12]:
class StackedCRF:
    def __init__(self, tags: List[str]=TAGS, n_splits: int=5, random_state: int=42):
        self.tags = tags
        self.n_splits = n_splits
        self.random_state = random_state
        # базовые фичебилдеры
        self.base_builders = {}
        self.base_models = {}
        self.meta_builder = None
        self.meta_model = None

    def _probs_as_features(self, probs_seq: List[Dict[str,float]], prefix: str) -> List[Dict[str,float]]:
        """
        Преобразует маргинальные вероятности CRF к фичам: {prefix:TAG -> prob}
        """
        feats_seq = []
        for p in probs_seq:
            d = {}
            for tag in self.tags:
                d[f'{prefix}:{tag}'] = float(p.get(tag, 0.0))
            feats_seq.append(d)
        return feats_seq

    def _merge_feature_dicts(self, base_feats: List[Dict[str,Any]], *others: List[Dict[str,Any]]) -> List[Dict[str,Any]]:
        out = []
        for i in range(len(base_feats)):
            m = dict(base_feats[i])
            for block in others:
                m.update(block[i])
            out.append(m)
        return out

    def fit(self, texts: List[str], spans_list: List[List[Tuple[int,int,str]]]):
        # 1) токенизация и BIO
        sents = [tokenize_with_offsets(t) for t in texts]
        y = [spans_to_bio(s, spans) for s,spans in zip(sents, spans_list)]

        # 2) лексиконы
        tmp_df = pd.DataFrame({'text':texts, 'spans':[str(s) for s in spans_list]})
        lex = build_lexicons(tmp_df)

        # 3) билдеры A/B/C
        base_builder = FeatureBuilder(lexicons=lex)
        self.base_builders = {
            'A': feature_variant(base_builder, 'A'),
            'B': feature_variant(base_builder, 'B'),
            'C': feature_variant(base_builder, 'C'),
        }
        self.meta_builder = FeatureBuilder(
            lexicons=lex,
            cfg=FeatureConfig(use_lemma=True, window=2, add_context_bigrams=True)
        )

        # 4) OOF предсказания для A/B/C
        kf = KFold(n_splits=self.n_splits, shuffle=True, random_state=self.random_state)
        # храним по предложению: список из dict-ов на каждый токен
        oof_probs = {k: [None] * len(sents) for k in 'ABC'}
        self.base_models = {k: [] for k in 'ABC'}

        idxs = np.arange(len(sents))
        for fold, (tr, va) in enumerate(kf.split(idxs), start=1):
            # подготовка фич
            X_tr = {k: [self.base_builders[k].sent2features(sents[i]) for i in tr] for k in 'ABC'}
            X_va = {k: [self.base_builders[k].sent2features(sents[i]) for i in va] for k in 'ABC'}
            y_tr = [y[i] for i in tr]

            # обучаем 3 базовых CRF
            models_fold = {}
            for k in 'ABC':
                crf = make_crf(c1=0.05 if k=='A' else 0.1,
                               c2=0.1 if k!='C' else 0.2,
                               max_iter=200)
                crf.fit(X_tr[k], y_tr)
                models_fold[k] = crf

                # маржинали на валидации:
                # probs: List[List[Dict[tag, prob]]] — по предложениям → по токенам
                probs = crf.predict_marginals(X_va[k])

                # ВАЖНО: аккуратно разложить по исходным индексам предложений
                for pos, i_sent in enumerate(va):
                    oof_probs[k][i_sent] = probs[pos]

            # сохраняем модели фолда (для усреднения на инференсе)
            for k in 'ABC':
                self.base_models[k].append(models_fold[k])

        # sanity-check: все предложения должны иметь заполненные маржинали
        for k in 'ABC':
            for i, seq in enumerate(oof_probs[k]):
                if seq is None:
                    raise RuntimeError(f"OOF probs for model {k} is None at sentence {i}. "
                                       "Проверьте KFold/раскладку.")

        # 5) Формируем meta-фичи из OOF
        X_meta = []
        for i in range(len(sents)):
            base_feats = self.meta_builder.sent2features(sents[i])  # базовые фичи мета-слоя

            # длины должны совпасть по количеству токенов
            n_tokens = len(base_feats)
            if not (len(oof_probs['A'][i]) == len(oof_probs['B'][i]) == len(oof_probs['C'][i]) == n_tokens):
                raise RuntimeError(f"Token-length mismatch at sentence {i}: "
                                   f"{len(oof_probs['A'][i])}/{len(oof_probs['B'][i])}/"
                                   f"{len(oof_probs['C'][i])} vs {n_tokens}")

            pa = self._probs_as_features(oof_probs['A'][i], 'A')
            pb = self._probs_as_features(oof_probs['B'][i], 'B')
            pc = self._probs_as_features(oof_probs['C'][i], 'C')

            mix = self._merge_feature_dicts(base_feats, pa, pb, pc)
            X_meta.append(mix)
    
        # 6) Обучаем meta-CRF
        self.meta_model = make_crf(c1=0.05, c2=0.2, max_iter=300)
        self.meta_model.fit(X_meta, y)

        self._train_sents = sents
        self._train_y = y
        return self

    def _avg_predict_marginals(self, builder: FeatureBuilder, models: List[CRF], sent: List[Tuple[str,int,int]]) -> List[Dict[str,float]]:
        X = builder.sent2features(sent)
        # усредняем маржинали по k моделям (фолдам)
        probs_list = [m.predict_marginals_single(X) for m in models]
        out = []
        for t in range(len(X)):
            acc = defaultdict(float)
            for probs in probs_list:
                for tag, p in probs[t].items():
                    acc[tag] += p
            # нормализация
            s = sum(acc.values()) or 1.0
            out.append({k: v/s for k,v in acc.items()})
        return out

    def predict(self, texts: List[str]) -> List[List[str]]:
        res = []
        for text in texts:
            sent = tokenize_with_offsets(text)
            # базовые маржинали
            pa = self._avg_predict_marginals(self.base_builders['A'], self.base_models['A'], sent)
            pb = self._avg_predict_marginals(self.base_builders['B'], self.base_models['B'], sent)
            pc = self._avg_predict_marginals(self.base_builders['C'], self.base_models['C'], sent)
            # meta-фичи
            base_feats = self.meta_builder.sent2features(sent)
            fa = self._probs_as_features(pa, 'A')
            fb = self._probs_as_features(pb, 'B')
            fc = self._probs_as_features(pc, 'C')
            X_meta = self._merge_feature_dicts(base_feats, fa, fb, fc)
            # предикт
            tags = self.meta_model.predict_single(X_meta)
            tags = bio_validate(tags)
            res.append(tags)
        return res

    def predict_spans(self, texts: List[str]) -> List[List[Tuple[int,int,str]]]:
        """
        Возвращает списки (start, end, LABEL) по каждому тексту,
        НЕ удаляя 'O' — т.е. 'O' тоже агрегируется в интервалы.
        LABEL здесь «плоский» (без B-/I-): {'O','TYPE','BRAND','VOLUME','PERCENT'}.
        """
        out_all = []
        tag_seqs = self.predict(texts)  # BIO-последовательности: ['B-TYPE','I-TYPE','O',...]
        for text, tags in zip(texts, tag_seqs):
            sent = tokenize_with_offsets(text)  # [(tok, start, end), ...]
            spans = []
            cur_label = None
            cur_start = None
            prev_end = None
    
            for (w, a, b), t in zip(sent, tags):
                # плоский лейбл для BIO: 'B-TYPE'/'I-TYPE' -> 'TYPE', 'O' -> 'O'
                if t == 'O':
                    flat = 'O'
                else:
                    bi, lab = t.split('-', 1)
                    flat = lab
    
                if cur_label is None:
                    # старт первого сегмента
                    cur_label = flat
                    cur_start = a
                    prev_end = b
                else:
                    if flat == cur_label:
                        # продолжаем текущий сегмент
                        prev_end = b
                    else:
                        # закрываем предыдущий сегмент и открываем новый
                        spans.append((cur_start, prev_end, cur_label))
                        cur_label = flat
                        cur_start = a
                        prev_end = b
    
            # финализация
            if cur_label is not None:
                spans.append((cur_start, prev_end, cur_label))
    
            out_all.append(spans)
        return out_all

In [13]:
from typing import List, Tuple, Optional

Span = Tuple[int, int, str]

def _words_by_spaces(text: str) -> List[Tuple[int,int]]:
    """Разбить всю строку на «слова» как непрерывные группы непробельных символов."""
    words = []
    i, n = 0, len(text)
    while i < n:
        while i < n and text[i].isspace():
            i += 1
        if i >= n: break
        j = i
        while j < n and not text[j].isspace():
            j += 1
        words.append((i, j))
        i = j
    return words

def _label_for_word(word: Tuple[int,int], spans: List[Span]) -> Optional[str]:
    """Вернуть базовую метку ('TYPE','BRAND','VOLUME','PERCENT') для слова по пересечению со спанами, иначе None."""
    wa, wb = word
    best_lab, best_ol = None, 0
    for a, b, lab in spans:
        # пересечение длинной >0
        ol = max(0, min(wb, b) - max(wa, a))
        if ol > best_ol:
            best_ol = ol
            best_lab = lab
    return best_lab if best_ol > 0 else None

def spans_to_bio_splits(text: str, spans: List[Span]) -> List[Span]:
    """
    Гарантирует: одно слово -> один кортеж (start,end,label).
    Лейблы: 'O' для слов вне сущностей; внутри сущностей — BIO по последовательным словам.
    """
    # нормализуем входные метки до базовых (без 'B-','I-')
    norm_spans: List[Span] = []
    for a, b, lab in sorted(spans, key=lambda x: (x[0], x[1])):
        base = lab.upper().strip()
        if base.startswith('B-') or base.startswith('I-'):
            base = base.split('-', 1)[1]
        norm_spans.append((a, b, base))

    words = _words_by_spaces(text)
    result: List[Span] = []

    prev_base = None
    for (a, b) in words:
        base = _label_for_word((a, b), norm_spans)  # None -> O
        if base is None or base == 'O':
            result.append((a, b, 'O'))
            prev_base = None
        else:
            tag = 'B-' + base if prev_base != base else 'I-' + base
            result.append((a, b, tag))
            prev_base = base
    return result

In [14]:
def load_train_semicol_csv(path: str) -> Tuple[List[str], List[List[Tuple[int,int,str]]]]:
    """
    Ожидается две колонки: text;spans
    """
    df = pd.read_csv(path, sep=';', header=None, names=['text','spans'], encoding='utf-8', engine='python')
    texts = df['text'].astype(str).tolist()
    spans = [parse_spans(s) for s in df['spans']]
    return texts[1:], spans[1:]

In [15]:
path = "data_base/dataset_all.csv"   # поменяй на свой
texts, spans = load_train_semicol_csv(path)

In [16]:
model = StackedCRF(n_splits=7, random_state=13)
model.fit(texts, spans)

<__main__.StackedCRF at 0x7f7e3ad2bca0>

In [17]:
version = "unichtozhenie_petuhov_v14"
sub = pd.read_csv("submissions/sub_base.csv", sep=';')
preds = []
for t in sub["sample"]:
    pred_spans = model.predict_spans([t])
    converted = spans_to_bio_splits(t, pred_spans[0])
    preds.append(converted)
sub["annotation"] = preds
sub.to_csv(f"submissions/sub_{version}.csv", sep=';', index=False)

In [31]:
import joblib

In [35]:
joblib.dump(model, "model_crf/StackedCRF.joblib")

['model_crf/StackedCRF.joblib']

In [32]:
model

<__main__.StackedCRF at 0x7f7e3ad2bca0>

In [None]:
test_samples = [
    "sdafas",
    "йогурт питьевой 2 % valio 500 мл",
    "молоко агуша 3,2% 1л",
    "сыр ламбер 200г",
    "кефир 1 % бутылка 900мл",
    "джин №1",
    "№1 газета",
    "молоко 1,5 %",
    "молоко 2%",
    "сок яблочный 2л",
    "крекер sladoya"
]
pred_tags = model.predict(test_samples)
pred_spans = model.predict_spans(test_samples)


In [73]:
for t, tags, sp in zip(test_samples, pred_tags, pred_spans):
    converted = spans_to_bio_splits(t, sp)
    # print("TEXT:", t)
    # print("SPANS:", sp)
    # print("CONVERTED:", converted)
    # print()

TEXT: sdafas
SPANS: [(0, 6, 'O')]
CONVERTED: [(0, 6, 'O')]

TEXT: йогурт питьевой 2 % valio 500 мл
SPANS: [(0, 6, 'TYPE'), (7, 15, 'O'), (16, 19, 'PERCENT'), (20, 25, 'BRAND'), (26, 32, 'VOLUME')]
CONVERTED: [(0, 6, 'B-TYPE'), (7, 15, 'O'), (16, 17, 'B-PERCENT'), (18, 19, 'I-PERCENT'), (20, 25, 'B-BRAND'), (26, 29, 'B-VOLUME'), (30, 32, 'I-VOLUME')]

TEXT: молоко агуша 3,2% 1л
SPANS: [(0, 6, 'TYPE'), (7, 17, 'BRAND'), (18, 20, 'VOLUME')]
CONVERTED: [(0, 6, 'B-TYPE'), (7, 12, 'B-BRAND'), (13, 17, 'I-BRAND'), (18, 20, 'B-VOLUME')]

TEXT: сыр ламбер 200г
SPANS: [(0, 10, 'TYPE'), (11, 15, 'VOLUME')]
CONVERTED: [(0, 3, 'B-TYPE'), (4, 10, 'I-TYPE'), (11, 15, 'B-VOLUME')]

TEXT: кефир 1 % бутылка 900мл
SPANS: [(0, 5, 'TYPE'), (6, 9, 'PERCENT'), (10, 17, 'TYPE'), (18, 23, 'VOLUME')]
CONVERTED: [(0, 5, 'B-TYPE'), (6, 7, 'B-PERCENT'), (8, 9, 'I-PERCENT'), (10, 17, 'B-TYPE'), (18, 23, 'B-VOLUME')]

TEXT: джин №1
SPANS: [(0, 4, 'TYPE'), (5, 7, 'BRAND')]
CONVERTED: [(0, 4, 'B-TYPE'), (5, 7, 'B-BRAN

In [18]:
import unicodedata
import re
from typing import List, Tuple
import spacy
import pandas as pd
from tqdm import tqdm
from ast import literal_eval

def _is_punct(ch: str) -> bool:
    # Любой символ категории Unicode "P" — пунктуация,
    # но % исключаем из удаления
    return unicodedata.category(ch).startswith("P") and ch != "%"

def _strip_punct(s: str) -> str:
    return "".join(ch for ch in s if not _is_punct(ch))

def predict_with_punct(nlp, s: str) -> List[List[str]]:
    """
    Возвращает список [[оригинальный_фрагмент_с_пунктуацией, label], ...]
    на основе предсказаний nlp по строке без пунктуации (кроме %).
    """
    orig_tokens: List[Tuple[int,int,str]] = []
    for m in re.finditer(r"\S+", s):
        start, end = m.span()
        orig_tokens.append((start, end, s[start:end]))

    clean_pieces = []
    clean_spans = []
    clean_cursor = 0
    kept_idx = []

    for i, (st, en, tok) in enumerate(orig_tokens):
        clean_tok = _strip_punct(tok)
        if not clean_tok:
            continue
        if clean_pieces:
            clean_cursor += 1
            clean_pieces.append(" ")
        clean_start = clean_cursor
        clean_pieces.append(clean_tok)
        clean_cursor += len(clean_tok)
        clean_spans.append((clean_start, clean_cursor, i))
        kept_idx.append(i)

    clean_text = "".join(clean_pieces).lower()
    if not clean_text.strip():
        return []

    doc = nlp(clean_text)

    results: List[List[str]] = []
    for ent in doc.ents:
        ent_start, ent_end = ent.start_char, ent.end_char
        covered = []
        for cst, cen, idx in clean_spans:
            if not (cen <= ent_start or cst >= ent_end):
                covered.append(idx)
        if not covered:
            continue
        i0, i1 = min(covered), max(covered)
        start0 = orig_tokens[i0][0]
        end1 = orig_tokens[i1][1]
        orig_fragment = s[start0:end1]
        results.append([orig_fragment, ent.label_])

    return results

def _split_into_tokens(text):
    """Разбивает текст на токены по пробелам"""
    if not text:
        return []
    
    tokens = []
    start = 0
    for i, char in enumerate(text):
        if char == ' ':
            if start < i:
                tokens.append((start, i))
            start = i + 1
    
    if start < len(text):
        tokens.append((start, len(text)))
    
    return tokens

def _tokenize_text(text):
    """Токенизирует текст и возвращает список токенов с их текстом и позициями"""
    tokens = _split_into_tokens(text)
    token_texts = []
    for start, end in tokens:
        token_texts.append((text[start:end].lower(), start, end))
    return token_texts

def convert_model2_to_model1(text, model2_results):
    """
    Конвертирует результаты NER модели 2 в формат модели 1, используя токенизацию.
    """
    if not isinstance(text, str):
        return []
    
    # Токенизируем текст
    text_tokens = _tokenize_text(text)
    if not text_tokens:
        return []
    
    # Если результат модели 2 пустой, все токены помечаем как 'O'
    if not model2_results:
        return [(start, end, 'O') for _, start, end in text_tokens]
    
    # Создаем список для тегов каждого токена, по умолчанию 'O'
    tags = ['O'] * len(text_tokens)
    
    # Обрабатываем каждую сущность из model2_results
    for entity in model2_results:
        if not isinstance(entity, (list, tuple)) or len(entity) < 2:
            continue
            
        entity_text, entity_type = entity[0], entity[1]
        
        if not isinstance(entity_text, str) or not isinstance(entity_type, str):
            continue
        
        # Токенизируем сущность
        entity_tokens = [token.lower() for token in entity_text.split()]
        if not entity_tokens:
            continue
        
        # Ищем последовательность токенов сущности в тексте
        i = 0
        while i <= len(text_tokens) - len(entity_tokens):
            # Проверяем, совпадает ли последовательность токенов
            match = True
            for j in range(len(entity_tokens)):
                if text_tokens[i + j][0] != entity_tokens[j]:
                    match = False
                    break
            
            if match:
                # Нашли совпадение - размечаем токены
                tags[i] = 'B-' + entity_type
                for j in range(1, len(entity_tokens)):
                    tags[i + j] = 'I-' + entity_type
                
                # Перескакиваем через найденную сущность
                i += len(entity_tokens)
            else:
                i += 1
    
    # Формируем результат
    result = []
    for (_, start, end), tag in zip(text_tokens, tags):
        result.append((start, end, tag))
    
    return result   


def make_submission(df, spicy_col: str):
    formated_results = [convert_model2_to_model1(text, literal_eval(model2_results)) for (text, model2_results) in zip(df['sample'].tolist(), df[spicy_col].tolist())]
    df['annotation'] = formated_results
    return df[['sample', 'annotation']]

In [19]:
import spacy

In [20]:
sub = pd.read_csv("submissions/sub_base.csv", sep=';')

In [21]:
model_best = spacy.load("../model/model-best-best")
model_ner_all_word = spacy.load("model_ner_all_word/model-best")

  self._model.load_state_dict(torch.load(filelike, map_location=device))


In [22]:
version = f"unichtozhenie_petuhov_v15"

In [23]:
preds = []
sub = pd.read_csv("submissions/sub_base.csv", sep=';')
for v in tqdm(sub['sample']):
    pred_best = predict_with_punct(model_best, v)
    pred_best = convert_model2_to_model1(v, pred_best)

    pred_tags = model.predict([v])
    pred_spans = model.predict_spans([v])
    preds_crf = spans_to_bio_splits(v, pred_spans[0])
    
    result = preds_crf
    if preds_crf != pred_best:
        pred_one = predict_with_punct(model_ner_all_word, v)
        pred_one = convert_model2_to_model1(v, pred_one)
        
        if pred_one == pred_best:
            result = pred_best
    preds.append(result)

sub["annotation"] = preds
sub[["sample", "annotation"]].to_csv(f"submissions/sub_{version}.csv", sep=';', index=False)

100%|██████████| 5000/5000 [01:18<00:00, 64.00it/s] 


In [179]:
sub

Unnamed: 0,sample,annotation
0,форма для выпечки,"[(0, 5, B-TYPE), (6, 9, O), (10, 17, O)]"
1,фарш свиной,"[(0, 4, B-TYPE), (5, 11, I-TYPE)]"
2,сок ананасовый без сахара,"[(0, 3, B-TYPE), (4, 14, I-TYPE), (15, 18, O),..."
3,еринги,"[(0, 6, B-TYPE)]"
4,молооко,"[(0, 7, B-TYPE)]"
...,...,...
4995,milkywa,"[(0, 7, B-BRAND)]"
4996,очиститель для унитаза,"[(0, 10, B-TYPE), (11, 14, O), (15, 22, O)]"
4997,арбузные,"[(0, 8, B-TYPE)]"
4998,кашы,"[(0, 4, B-TYPE)]"


In [161]:
sub[["sample", "annotation"]].to_csv(f"submissions/sub_{version}.csv", sep=';', index=False)

In [132]:
preds_crf, pred_best, pred_one

([(0, 6, 'B-TYPE')], [(0, 6, 'B-TYPE')], [(0, 4, 'B-TYPE')])

In [123]:
sub["preds_1"] = preds_1
sub["preds_2"] = preds_2

In [130]:
result

[(0, 6, 'B-TYPE')]

In [25]:
preds_1 = []
preds_2 = []
sub = pd.read_csv("submissions/sub_unichtozhenie_petuhov_v15.csv", sep=';')
for v in tqdm(sub['sample']):
    pred_best = predict_with_punct(model_best, v)
    pred_best = convert_model2_to_model1(v, pred_best)

    pred_tags = model.predict([v])
    pred_spans = model.predict_spans([v])
    preds_crf = spans_to_bio_splits(v, pred_spans[0])
    
    preds_1.append(preds_crf)
    preds_2.append(pred_best)

sub["annotation_crf"] = preds_1
sub["annotation_spacy"] = preds_2

100%|██████████| 5000/5000 [00:49<00:00, 100.82it/s]


In [28]:
sub[sub["annotation_crf"] != sub["annotation_spacy"]].to_csv("test.csv", sep=";", index=False)