In [None]:
from deeppavlov import train_model, build_model 
from deeppavlov.core.commands.utils import parse_config

PROJECT_DIR = '..'
MODEL_NAME = 'model'


# dataset that the model was trained on
model_config = parse_config('ner_collection3_bert')

# dataset that the model was trained on
print(model_config['dataset_reader']['data_path'])

model_config['dataset_reader']['data_path'] = PROJECT_DIR + '/datasets/conll/'

del model_config['metadata']['download']


model_config['dataset_reader']['iobes'] = False
model_config['metadata']['variables']['MODEL_PATH'] = PROJECT_DIR + '/models/' + MODEL_NAME

model_config['chainer']['pipe'][0]['max_seq_length'] = 128
model_config['chainer']['pipe'][0]['max_subword_length'] = 20


model_config['chainer']['pipe'][1]['save_path'] = PROJECT_DIR + '/models/tag.dict'
model_config['chainer']['pipe'][1]['load_path'] = PROJECT_DIR + '/models/tag.dict'

model_config['chainer']['pipe'][2]['save_path'] = PROJECT_DIR + '/models/' + MODEL_NAME
model_config['chainer']['pipe'][2]['load_path'] = PROJECT_DIR + '/models/' + MODEL_NAME

model_config['train']['batch_size'] = 400


model_config['chainer']['pipe'][0]['in'] = ['x_tokens']
model_config['chainer']['pipe'].insert(0, {"id": "ws_tok", "class_name": "split_tokenizer", "in": ["x"], "out": ["x_tokens"]})

ner_model = build_model(model_config, download=False)

In [None]:
import re
from pathlib import Path

import pandas as pd

SUBMISSION_PATH = Path(PROJECT_DIR) / 'datasets' / 'submission_raw.csv'
OUTPUT_PATH = Path(PROJECT_DIR) / 'datasets' / 'submission.csv'

TOKEN_PATTERN = re.compile(r'\S+')

def normalize_tag(tag: str) -> str:
    if not isinstance(tag, str):
        return 'O'
    tag = tag.strip()
    if not tag:
        return 'O'
    upper_tag = tag.upper()
    # keep O as-is; no S/E mapping anymore
    if upper_tag == 'O':
        return 'O'
    # pass B-/I- through, preserving the type suffix
    if upper_tag.startswith('B-') or upper_tag.startswith('I-'):
        return tag
    # anything else is unexpected; return as-is
    return tag

def is_tag(value) -> bool:
    if not isinstance(value, str):
        return False
    candidate = value.strip().upper()
    if not candidate:
        return False
    if candidate == 'O':
        return True
    return (candidate == 'O') or (len(candidate) >= 3 and candidate[1] == '-' and candidate[0] in {'B','I'})

def looks_like_sequence(seq, predicate) -> bool:
    if not isinstance(seq, (list, tuple)) or not seq:
        return False
    return all(predicate(item) for item in seq)

def looks_like_tag_sequence(seq) -> bool:
    return looks_like_sequence(seq, is_tag)

def looks_like_token_sequence(seq) -> bool:
    return looks_like_sequence(seq, lambda item: isinstance(item, str) and not is_tag(item))

def extract_tokens_and_tags(prediction) -> tuple[list[str], list[str]]:
    if isinstance(prediction, tuple):
        prediction = list(prediction)
    if not isinstance(prediction, list):
        raise ValueError(f'Unexpected model output type: {type(prediction)}')
    tokens: list[str] = []
    tags: list[str] = []

    def traverse(node):
        nonlocal tokens, tags
        if isinstance(node, tuple):
            node = list(node)
        if isinstance(node, list):
            if not tokens and looks_like_token_sequence(node):
                tokens = [str(item) for item in node]
            if not tags and looks_like_tag_sequence(node):
                tags = [normalize_tag(str(item)) for item in node]
            if tokens and tags:
                return
            for child in node:
                traverse(child)

    traverse(prediction)
    if not tags:
        raise ValueError(f'Unable to extract tag sequence from model output: {prediction}')
    return tokens, tags

def compute_annotation(text: str, tokens: list[str], tags: list[str]) -> list[tuple[int, int, str]]:
    if not tags:
        return []
    if tokens:
        effective_len = min(len(tokens), len(tags))
        tokens = tokens[:effective_len]
        tags = tags[:effective_len]
    annotation: list[tuple[int, int, str]] = []
    if tokens:
        cursor = 0
        fallback = False
        for token, tag in zip(tokens, tags):
            token = token or ''
            if not tag:
                cursor += len(token)
                continue
            start = text.find(token, cursor)
            if start == -1:
                fallback = True
                break
            end = start + len(token)
            annotation.append((start, end, tag))
            cursor = end
        if fallback:
            annotation = []
    if not annotation:
        matches = list(TOKEN_PATTERN.finditer(text))
        effective_len = min(len(matches), len(tags))
        for match, tag in zip(matches[:effective_len], tags[:effective_len]):
            if not tag:
                continue
            annotation.append((match.start(), match.end(), tag))
    return annotation

submission_df = pd.read_csv(SUBMISSION_PATH, sep=';', encoding='utf-8')
annotations = []
for row_idx, sample in enumerate(submission_df['sample'], start=1):
    text = '' if pd.isna(sample) else str(sample)
    model_output = ner_model([text])
    tokens, tags = extract_tokens_and_tags(model_output)
    annotation = compute_annotation(text, tokens, tags)
    word_count = len(tokens) if tokens else len(TOKEN_PATTERN.findall(text))
    entity_count = len(annotation)
    if word_count != entity_count:
        print(f'Row {row_idx}: word count {word_count} != annotation entities {entity_count}')
    annotations.append(annotation)

submission_df['annotation'] = [str(ann) for ann in annotations]
submission_df[['sample', 'annotation']].to_csv(OUTPUT_PATH, sep=';', encoding='utf-8', index=False)
print(f'Saved predictions to {OUTPUT_PATH.resolve()}')
