In [None]:
!python -m spacy download ru_core_news_md

In [None]:
import re
# import spacy
import requests
import pandas as pd
import numpy as np
from collections import Counter

In [None]:
nlp = spacy.load("ru_core_news_md", disable=["ner", "attribute_ruler"])

# Предобработка

## Отделение морфология нивхского

Данный раздел посвящен обработки словарных данных – приведению слов к нивсхким основ. Результатом всех функций стал словарь основ, к которому можно обращаться в задаче классификации глосс.



---



`{'nv_word': str, 'ru_word': list(str), 'nv_stem': str,
'affixes': list(str), 'metadata': str}`

In [None]:
!curl -L -o final_dictionary.csv 'https://docs.google.com/spreadsheets/d/1z2wStzMEO41N5qKeNlbkfexZyimQiu3aD7P6SYitVvU/export?exportFormat=csv'

In [None]:
!curl -L -o final_glosses.csv 'https://docs.google.com/spreadsheets/d/19045IoPzWSiTvmZC3zQIS1vqRYpwazQPCKTEov2dCcU/export?exportFormat=csv'

In [None]:
final_df = pd.read_csv('final_dictionary.csv')

In [None]:
final_glosses = pd.read_csv('final_glosses.csv')

### Таблички

Пос-теги получены с помощью Spacy для русского

In [None]:
final_df

In [None]:
final_df.groupby(['pos_tag']).count()

In [None]:
final_glosses

## Функции


In [None]:
pattern = ''.join(list(final_glosses[final_glosses['Gloss'] == 'CONV.3.SG']['Morph'])[0].split(', '))
# суффикс адвербиала: CAUS + CONV.3.SG
adv_aff = re.compile(f'[гӷ]у[{pattern}]$')
# суффикс плуралиса
pl_aff = re.compile('([ӻгғкх][оу](ну?)?)$')
# суффикс индикатива
ind_aff = re.compile('(н?д|т|ӈ)ь?$')
# суффикс каузатива
caus_aff = re.compile('(ңг?|(к|ӄ|г|ӷ)у?)$')
# суффикс атрибутив
atr_aff = re.compile('л?а$')

In [None]:
def sub_aff(word, aff):
    if re.search(aff, word):
       return re.search(aff, word), re.sub(aff, '', word)
    return False, word

In [None]:
def cosine(line, stem, stem_dictionary, a=0.238):
    url_api = 'https://rusvectores.org/{MODEL}/{word_1}__{word_2}/api/similarity/'
    model = 'ruwikiruscorpora_upos_cbow_300_10_2021'
    expl = nlp(line.ru)
    for w in expl:
        word1 = w.lemma_
        for x in stem_dictionary[stem]['ru']:
            if len(x.split()) == 1:
                word2 = x
                # print(word1, word2)
                url = url_api.format(MODEL=model, word_1=word1, word_2=word2, FORMAT='csv')
                x = requests.get(url)
                if x.status_code != 200:
                    continue
                if x.content.decode('utf8') == 'Unknown':
                    continue
                # print(x.content.decode('utf8').split('\t'))
                if float(x.content.decode('utf8').split('\t')[0]) > a:
                    return True
    return False

### Обработка глаголов

In [None]:
def process_verb(line, stem_dictionary, pos):
    dictionary = {'nv_word': '', 'ru_word': set(), 'nv_stem': '',
                  'affixes': dict(), 'metadata': '', 'pos': None}

    nv = line.nv.lower().replace('\xad', '')
    dictionary['nv_word'] = nv
    dictionary['pos'] = line.pos_tag
    traduction = line.ru

    # отделение PL
    pl, stem = sub_aff(nv, pl_aff)
    # отделение IND
    ind, stem = sub_aff(stem, ind_aff)
    # отделение CAUS
    caus = None
    if 'заставить' in line.ru:
        caus, stem = sub_aff(stem, caus_aff)
        traduction = traduction.replace('заставить ', '')

    dictionary['nv_stem'] = stem
    if stem not in stem_dictionary:
        stem_dictionary[stem] = {'ru': set(), 'idx': set(), 'pos': set()}
    stem_dictionary[stem]['ru'].update(set([x for x in traduction.split(';')]))
    stem_dictionary[stem]['idx'].add(pos)
    stem_dictionary[stem]['pos'].add(line.pos_tag)
    dictionary['ru_word'].update(set([x for x in traduction.split(';')]))

    if pl:
        dictionary['affixes']['PL'] = pl.group()
    if ind:
        dictionary['affixes']['IND'] = ind.group()
    if caus:
        dictionary['affixes']['CAUS'] = caus.group()

    dictionary['metadata'] = None if isinstance(line.metadata, float) else line.metadata
    return stem_dictionary, dictionary

In [None]:
final = []

In [None]:
stem_dictionary = dict()
verb_final = []
for pos, line in enumerate(final_df[final_df['pos_tag'] == 'VERB'].itertuples()):
      stem_dictionary, dictionary = process_verb(line, stem_dictionary, pos)
      verb_final.append(dictionary)

In [None]:
len(verb_final)

### Обработка прилагательных

In [None]:
def check_definition(line, stem_dictionary, stem):
    for expression in stem_dictionary[stem]['ru']:
        if expression.startswith('быть'):
            doc = nlp(expression.split()[1])
            if doc[0].lemma_ == line.ru:
                return True
    else:
        doc = nlp(expression)
        for word in doc:
            check = cosine(line, stem, stem_dictionary, a=0.3)
    return check

In [None]:
def process_adj(line, stem_dictionary, pos):
    dictionary = {'nv_word': '', 'ru_word': set(), 'nv_stem': '',
                  'affixes': dict(), 'metadata': '', 'pos': None}

    nv = line.nv.lower().replace('\xad', '')
    dictionary['nv_word'] = nv
    dictionary['pos'] = line.pos_tag
    traduction = line.ru
    dictionary['ru_word'].update(set([x for x in traduction.split(';')]))

    # отделение PL
    pl, stem = sub_aff(nv, pl_aff)

    atr_affix = None
    # поиск ATR и отделение
    if re.search('ла$', stem):
        if stem[:-2] in stem_dictionary and len(stem[:-2]) > 1:
            stem = re.sub('ла$', '', stem)
            atr_affix = 'ла'

        elif stem[:-1] in stem_dictionary:
            stem = re.sub('а$', '', stem)
            atr_affix = 'а'

    elif re.search('а$', stem):
        if stem[:-1] in stem_dictionary:
            stem = re.sub('а$', '', stem)
            atr_affix = 'а'

    dictionary['nv_stem'] = stem

    verb_in_dict = False
    if stem in stem_dictionary:
        verb_in_dict = check_definition(line, stem_dictionary, stem)
    else:
        stem_dictionary[stem] = {'ru': set(), 'idx': set(), 'pos': set()}
    stem_dictionary[stem]['idx'].add(pos)
    stem_dictionary[stem]['pos'].add(line.pos_tag)
    if not verb_in_dict:
        stem_dictionary[stem]['ru'].update(set([x for x in traduction.split(';')]))

    if pl:
        dictionary['affixes']['PL'] = pl.group()
    if atr_affix:
        dictionary['affixes']['PL'] = atr_affix

    dictionary['metadata'] = None if isinstance(line.metadata, float) else line.metadata

    return stem_dictionary, dictionary

In [None]:
adj_final = []
for pos, line in enumerate(final_df[final_df['pos_tag'] == 'ADJ'].itertuples(), len(verb_final)):
      stem_dictionary, dictionary = process_adj(line, stem_dictionary, pos)
      adj_final.append(dictionary)

In [None]:
len(adj_final)

### Обработка существительных

In [None]:
pers_aff = re.compile('(нивх|нивӈ|ниғвӈ)$')
woman_aff = re.compile('(умгу|р̌аӈ[ӄӻӷ])$')
cub_aff = re.compile('нонӄ$')
kid_aff = re.compile('(о[ӻғ]?ла|эӻлӈ)$')
female = re.compile('аньӽ$')
month = re.compile('лоӈ$')
earth = re.compile('миф$')
animal = re.compile('ӈа$')
pattern = '|'.join(list(final_glosses[final_glosses['Gloss'] == 'PRON:ANY']['Morph'])[0].split(', '))
pron_aff = re.compile(f'({pattern})$')

In [None]:
list_of_patterns = {pers_aff: "человек", woman_aff: "женщина", cub_aff: "детёныш",
                    kid_aff: "ребёнок", female: "самка",
                    month: "месяц", earth: "земля", animal: "зверь"}

In [None]:
def find_complex_word(line):
    for patt in list_of_patterns:
        res = re.search(patt, line)
        if res:
            if res.span()[0] > 0:
                return True, patt
    return False, None

In [None]:
def form_dictionary(line, stem, affixes, dictionary):
    for aff, gloss in affixes.items():
        dictionary['affixes'][aff] = gloss
    dictionary['nv_stem'] = stem
    dictionary['metadata'] = None if isinstance(line.metadata, float) else line.metadata
    return dictionary

In [None]:
def locative(line, stem_dictionary, dictionary, pos):
    nv = line.nv.lower().replace('\xad', '')
    if re.search('(?:место.+|.*место)', line.ru) or 'место' in str(line.metadata) or 'участок' in str(line.metadata):
        if nv[:-1] in stem_dictionary and 'VERB' in stem_dictionary[nv[:-1]]['pos']:
            dictionary = form_dictionary(line, nv[:-1], {'NMN:L': 'ф'}, dictionary)
            stem_dictionary[nv[:-1]]['ru'].update(set([x for x in line.ru.split(';')]))
            stem_dictionary[nv[:-1]]['idx'].add(pos)
            stem_dictionary[nv[:-1]]['pos'].add(line.pos_tag)
            return True, stem_dictionary, dictionary

        # аттрибутив + локатив?
        atr, stem = sub_aff(nv[:-1], atr_aff)
        if atr:
            dictionary = form_dictionary(line, stem, {'NMN:L': 'ф'}, dictionary)
            if stem not in stem_dictionary:
                stem_dictionary[stem] =  {'ru': set(), 'idx': set(), 'pos': set()}
            else:
                dictionary['affixes']['ATR'] = atr.group()
            stem_dictionary[stem]['ru'].update(set([x for x in line.ru.split(';')]))
            stem_dictionary[stem]['idx'].add(pos)
            stem_dictionary[stem]['pos'].add(line.pos_tag)
            return True, stem_dictionary, dictionary
    return False, stem_dictionary, dictionary

In [None]:
def process_noun(line, stem_dictionary, pos):
    dictionary = {'nv_word': '', 'ru_word': set(), 'nv_stem': '',
                  'affixes': dict(), 'metadata': '', 'pos': None}

    nv = line.nv.lower().replace('\xad', '')
    dictionary['nv_word'] = nv
    dictionary['pos'] = line.pos_tag
    traduction = line.ru
    dictionary['ru_word'].update(set([x for x in traduction.split(';')]))

    # поиск локативных номмнализаций
    if nv[-1] == 'ф':
        check, stem_dictionary, dictionary = locative(line, stem_dictionary, dictionary, pos)
        if check:
            return stem_dictionary, dictionary

    # поиск актантной номинализации
    ind = list(final_glosses[final_glosses['Gloss'] == 'NMN:A']['Morph'])[0].split(', ')
    pattern = re.compile(f"({'|'.join(ind)})$")

    if nv.endswith(tuple(ind)):
        new_word = re.sub(pattern, '', nv)
        if new_word in stem_dictionary and 'VERB' in stem_dictionary[new_word]['pos']:
            if cosine(line, new_word, stem_dictionary):
                dictionary = form_dictionary(line, new_word, {'NMN:A': re.search(pattern, nv).group()}, dictionary)
                stem_dictionary[new_word]['idx'].add(pos)
                return stem_dictionary, dictionary
        else:
          #  атрибутив + номинализация
            if re.search('ла$', new_word):
                if new_word[:-2] in stem_dictionary and len(new_word[:-2]) > 1 and 'VERB' in stem_dictionary[new_word[:-2]]['pos']:
                    new_word = re.sub('ла$', '', new_word)
                    atr_affix = 'ла'
                    if cosine(line, new_word, stem_dictionary):
                        dictionary = form_dictionary(line, new_word,
                        {'NMN:A': re.search(pattern, nv).group(),
                          'ATR': atr_affix}, dictionary)
                        stem_dictionary[new_word]['idx'].add(pos)

                elif new_word[:-1] in stem_dictionary and 'VERB' in stem_dictionary[new_word[:-1]]['pos']:
                    new_word = re.sub('а$', '', new_word)
                    atr_affix = 'а'
                    if cosine(line, new_word, stem_dictionary):
                        dictionary = form_dictionary(line, new_word,
                        {'NMN:A': re.search(pattern, nv).group(),
                          'ATR': atr_affix}, dictionary)
                        stem_dictionary[new_word]['idx'].add(pos)


            elif re.search('а$', new_word):
                if new_word[:-1] in stem_dictionary and 'VERB' in stem_dictionary[new_word[:-1]]['pos']:
                    new_word = re.sub('а$', '', new_word)
                    atr_affix = 'а'
                    if cosine(line, new_word, stem_dictionary):
                        dictionary = form_dictionary(line, new_word,
                        {'NMN:A': re.search(pattern, nv).group(),
                          'ATR': atr_affix}, dictionary)
                        stem_dictionary[new_word]['idx'].add(pos)
                        return stem_dictionary, dictionary

    ind_aff = re.compile('[дт]ь?$')
    ind, stem = sub_aff(nv, ind_aff)
    if ind and stem in stem_dictionary and 'VERB' in stem_dictionary[stem]['pos']:
        if cosine(line, stem, stem_dictionary, a=0.4):
            dictionary = form_dictionary(line, stem,
                        {'NMN:P': ind.group()}, dictionary)
            stem_dictionary[stem]['idx'].add(pos)
            return stem_dictionary, dictionary

    if nv not in stem_dictionary:
        dictionary = form_dictionary(line, nv, {}, dictionary)
        stem_dictionary[nv] =  {'ru': set(), 'idx': set(), 'pos': set()}
        stem_dictionary[nv]['ru'].update(set([x for x in line.ru.split(';')]))
        stem_dictionary[nv]['idx'].add(pos)
        stem_dictionary[nv]['pos'].add(line.pos_tag)
    return stem_dictionary, dictionary


In [None]:
mapping = {'нонӄ': 'детёныш', 'нивх': 'человек',
           'нивӈ': 'человек', 'ниғвӈ': 'человек',
           'умгу': 'женщина', 'р̌аӈӄ': 'женщина',
           'р̌аӈӻ': 'женщина', 'р̌аӈӷ': 'женщина',
           'оӻла': 'ребёнок', 'эӻлӈ': 'ребёнок',
           'оғла': 'ребёнок', 'ола': 'ребёнок',
           'аньӽ': 'самка', 'лоӈ': "месяц",
           'миф': 'земля', 'ӈа': 'зверь'}

In [None]:
def fix_complex_words(line, stem_dictionary, pos):
    dictionary = {'nv_word': '', 'ru_word': set(), 'nv_stem': '',
                  'affixes': dict(), 'metadata': '', 'pos': None}

    nv = line.nv.lower().replace('\xad', '')
    dictionary['nv_word'] = nv
    traduction = line.ru
    dictionary['ru_word'].update(set([x for x in traduction.split(';')]))
    dictionary['pos'] = line.pos_tag

    prefix = nv
    new_stem = []
    affixes = dict()
    while True:
        check, pattern = find_complex_word(prefix)
        if not check:
            break
        suffix, prefix = sub_aff(prefix, pattern)
        if suffix:
            affixes[suffix.group()] = mapping[suffix.group()]
            new_stem.append(suffix.group())
    if prefix in stem_dictionary:
        affixes[prefix] = stem_dictionary[prefix]['ru']
    else:
        affixes[prefix] = '?'
    new_stem.append(prefix)
    new_stem = '-'.join(new_stem[::-1])
    dictionary = form_dictionary(line, new_stem, affixes, dictionary)
    stem_dictionary[new_stem] =  {'ru': set(), 'idx': set(), 'pos': set()}
    stem_dictionary[new_stem]['ru'].update(set([x for x in line.ru.split(';')]))
    stem_dictionary[new_stem]['idx'].add(pos)
    stem_dictionary[new_stem]['pos'].add(line.pos_tag)
    return stem_dictionary, dictionary

In [None]:
def fix_morphology(line, stem_dictionary, pos):
    dictionary = {'nv_word': '', 'ru_word': set(), 'nv_stem': '',
                  'affixes': dict(), 'metadata': '', 'pos': None}

    nv = line.nv.lower().replace('\xad', '')
    dictionary['nv_word'] = nv
    traduction = line.ru
    dictionary['ru_word'].update(set([x for x in traduction.split(';')]))
    dictionary['pos'] = line.pos_tag

    stem = nv
    pl, prefix = sub_aff(nv, pl_aff)
    affixes = {}
    if pl and prefix in stem_dictionary:
        if cosine(line, prefix, stem_dictionary, a=0.2):
            affixes = {'PL': pl.group()}
            stem = prefix
    elif pl:
        nmn_a = re.search('[кӈ]$', prefix)
        if nmn_a and prefix[:nmn_a.span()[0]] in stem_dictionary:
            new_word = prefix[:nmn_a.span()[0]]
            if cosine(line, new_word, stem_dictionary, a=0.2):
                affixes = {'PL': pl.group(), 'NMN:A': nmn_a.group()}
                prefix = new_word
                stem = prefix
        else:
            check, pattern = find_complex_word(prefix)
            if check:
                word = re.search(pattern, prefix)
                real_prefix = prefix[:word.span()[0]]
                new_stem = real_prefix + '-' + word.group()
                if new_stem in stem_dictionary:
                    stem = new_stem
                    affixes[word.group()] = mapping[word.group()]
                    affixes['PL'] = pl.group()

                elif real_prefix in stem_dictionary:
                    stem = new_stem
                    affixes[word.group()] = mapping[word.group()]
                    affixes['PL'] = pl.group()

    dictionary = form_dictionary(line, stem, affixes, dictionary)

    if stem in stem_dictionary:
        stem_dictionary[stem]['idx'].add(pos)
    else:
        stem_dictionary[stem] = {'ru': set(), 'idx': set(), 'pos': set()}
        stem_dictionary[stem]['ru'].update(set([x for x in line.ru.split(';')]))
        stem_dictionary[stem]['idx'].add(pos)
        stem_dictionary[stem]['pos'].add(line.pos_tag)

    return stem_dictionary, dictionary

In [None]:
def fix_syllables(line, pos):

    dictionary = {'nv_word': '', 'ru_word': set(), 'nv_stem': '',
                  'affixes': dict(), 'metadata': '', 'pos': None}

    nv = line.nv.lower().replace('\xad', '')
    dictionary['nv_word'] = nv
    traduction = line.ru
    dictionary['ru_word'].update(set([x for x in traduction.split(';')]))
    dictionary['nv_stem'] = nv
    dictionary['metadata'] = None if isinstance(line.metadata, float) else line.metadata
    dictionary['pos'] = line.pos_tag

    return dictionary

In [None]:
noun_final = []
complex_words = []
two_sillables = []
morphology_words = []
for pos, line in enumerate(final_df[final_df['pos_tag'] == 'NOUN'].itertuples(), len(adj_final)):
    if len(line.nv.split()) > 1:
        # two_sillables.append((pos, line))
        dictionary = fix_syllables(line, pos)
        noun_final.append(dictionary)
        continue

    if find_complex_word(line.nv)[0]:
        # complex_words.append(((pos, line)))
        stem_dictionary, dictionary = fix_complex_words(line, stem_dictionary, pos)
        noun_final.append(dictionary)
        continue
    if re.search(pl_aff, line.nv):
        # morphology_words.append(line)
        stem_dictionary, dictionary = fix_morphology(line, stem_dictionary, pos)
        noun_final.append(dictionary)
        continue

    if re.search('(?:[кг][ўу][р̌рт]|[тд]о[хӽ]|ух)', line.nv):
        # morphology_words.append(line)
        stem_dictionary, dictionary = fix_morphology(line, stem_dictionary, pos)
        noun_final.append(dictionary)
        continue

    if re.search(pron_aff, line.nv):
        # morphology_words.append(line)
        stem_dictionary, dictionary = fix_morphology(line, stem_dictionary, pos)
        noun_final.append(dictionary)
        continue

    stem_dictionary, dictionary = process_noun(line, stem_dictionary, pos)
    noun_final.append(dictionary)

In [None]:
len(noun_final)

### Прочее

In [None]:
def process_pron(line, stem_dictionary, pos):
    dictionary = {'nv_word': '', 'ru_word': set(), 'nv_stem': '',
                  'affixes': dict(), 'metadata': '', 'pos': None}

    nv = line.nv.lower().replace('\xad', '')
    dictionary['nv_word'] = nv
    dictionary['pos'] = line.pos_tag
    traduction = line.ru
    dictionary['ru_word'].update(set([x for x in traduction.split(';')]))

    if 'лу' in nv:
        stem = re.sub('лу', '', nv)
        stem = re.sub(' ', '', stem)
        dictionary['affixes']['INDEF'] =' лу'
    else:
        stem = nv
    if stem not in stem_dictionary:
        stem_dictionary[stem] = {'ru': set(), 'idx': set(), 'pos': set()}

    stem_dictionary[stem]['ru'].update(set([x for x in traduction.split(';')]))
    stem_dictionary[stem]['idx'].add(pos)
    stem_dictionary[stem]['pos'].add(line.pos_tag)

    dictionary['nv_stem'] = stem
    dictionary['metadata'] = None if isinstance(line.metadata, float) else line.metadata

    return stem_dictionary, dictionary

In [None]:
pron_final = []
for pos, line in enumerate(final_df[final_df['pos_tag'] == 'PRON'].itertuples(), len(noun_final)):
    stem_dictionary, dictionary = process_pron(line, stem_dictionary, pos)
    pron_final.append(dictionary)

In [None]:
sconj_final = []
for pos, line in enumerate(final_df[final_df['pos_tag'] == 'SCONJ'].itertuples(), len(pron_final)):
    stem_dictionary, dictionary = process_pron(line, stem_dictionary, pos)
    sconj_final.append(dictionary)

In [None]:
def adv(line, stem_dictionary, pos):
    dictionary = {'nv_word': '', 'ru_word': set(), 'nv_stem': '',
                  'affixes': dict(), 'metadata': '', 'pos': None}

    nv = line.nv.lower().replace('\xad', '')
    dictionary['nv_word'] = nv
    dictionary['pos'] = line.pos_tag
    traduction = line.ru
    dictionary['ru_word'].update(set([x for x in traduction.split(';')]))

    affixes = {}
    stem = nv

    if re.search('л[уо]$', stem):
        patt = re.search('л[уо]$', stem).group()
        stem = re.sub(patt, '', nv)
        stem = re.sub(' ', '', stem)
        affixes['INDEF'] = patt

    if re.search('[гк]у[р̌рт]$', stem):
        new_word = re.sub('гур̌', '', stem)
        if new_word in stem_dictionary:
            affixes['CAUS'] = re.search('[гк]у(?=[р̌рт]$)', stem)
            affixes['CONV.3.SG'] = re.search('[р̌рт]$', stem).group()
            stem = new_word

    if re.search('[тд]оӽ$', stem):
        new_word = re.sub('доӽ', '', stem)
        if new_word in stem_dictionary:
            affixes['DAT'] = re.search('[тд]оӽ$', stem).group()
            stem = new_word

    if re.search('ра$', stem):
        new_word = re.sub('ра', '', stem)
        if new_word in stem_dictionary:
            affixes['EMPH.3.SG'] ='ра'
            stem = new_word

    if re.search('ух$', stem):
        new_word = re.sub('ух', '', stem)
        if new_word in stem_dictionary:
            affixes['ABL'] ='ух'
            stem = new_word

    if re.search('ӿагин$', stem):
        new_word = re.sub('ӿагин', '', stem)
        if new_word in stem_dictionary:
            affixes['PRON:ANY'] ='ӿагин'
            stem = new_word

    dictionary = form_dictionary(line, stem, affixes, dictionary)
    if stem not in stem_dictionary:
        stem_dictionary[stem] = {'ru': set(), 'idx': set(), 'pos': set()}

    stem_dictionary[stem]['ru'].update(set([x for x in traduction.split(';')]))
    stem_dictionary[stem]['idx'].add(pos)
    stem_dictionary[stem]['pos'].add(line.pos_tag)

    return stem_dictionary, dictionary

In [None]:
adv_final = []
for pos, line in enumerate(final_df[final_df['pos_tag'] == 'ADV'].itertuples(), len(sconj_final)):
    stem_dictionary, dictionary = adv(line, stem_dictionary, pos)
    adv_final.append(dictionary)

In [None]:
adp_final = []
for pos, line in enumerate(final_df[final_df['pos_tag'] == 'ADP'].itertuples(), len(adv_final)):
    stem_dictionary, dictionary = adv(line, stem_dictionary, pos)
    adp_final.append(dictionary)

In [None]:
cconj_final = []
for pos, line in enumerate(final_df[final_df['pos_tag'] == 'CCONJ'].itertuples(), len(adp_final)):
    stem_dictionary, dictionary = adv(line, stem_dictionary, pos)
    cconj_final.append(dictionary)

In [None]:
det_final = []
for pos, line in enumerate(final_df[final_df['pos_tag'] == 'DET'].itertuples(), len(cconj_final)):
    stem_dictionary, dictionary = adv(line, stem_dictionary, pos)
    det_final.append(dictionary)

In [None]:
intj_final = []
for pos, line in enumerate(final_df[final_df['pos_tag'] == 'INTJ'].itertuples(), len(det_final)):
    stem_dictionary, dictionary = adv(line, stem_dictionary, pos)
    intj_final.append(dictionary)

In [None]:
num_final = []
for pos, line in enumerate(final_df[final_df['pos_tag'] == 'NUM'].itertuples(), len(intj_final)):
    stem_dictionary, dictionary = adv(line, stem_dictionary, pos)
    num_final.append(dictionary)

In [None]:
part_final = []
for pos, line in enumerate(final_df[final_df['pos_tag'] == 'PART'].itertuples(), len(num_final)):
    stem_dictionary, dictionary = adv(line, stem_dictionary, pos)
    part_final.append(dictionary)

In [None]:
propn_final = []
for pos, line in enumerate(final_df[final_df['pos_tag'] == 'PROPN'].itertuples(), len(part_final)):
    stem_dictionary, dictionary = adv(line, stem_dictionary, pos)
    propn_final.append(dictionary)

In [None]:
final = verb_final + adj_final + noun_final + pron_final + sconj_final + adv_final + adp_final + cconj_final + det_final

In [None]:
final = final + intj_final + num_final + part_final + propn_final

In [None]:
len(final)

In [None]:
final[0]

In [None]:
import json

def set_default(obj):
    if isinstance(obj, set):
        return list(obj)
    raise TypeError

with open('data.json', 'w', encoding='utf-8') as f:
    json.dump(final, f, ensure_ascii=False, indent=6, default=set_default)

In [None]:
with open('stem.json', 'w', encoding='utf-8') as f:
    json.dump(stem_dictionary, f, ensure_ascii=False, indent=3, default=set_default)

# Данные

In [None]:
import os
import re
import random

In [None]:
!pip --quiet install gdown

In [None]:
!gdown --folder https://drive.google.com/drive/folders/1YBLW10W3q3-wa5Mx4CA6-P4Qvs5ish9d

In [None]:
path = '/content/датасет'

In [None]:
texts = ['/'.join([path, x]) for x in os.listdir(path)]

In [None]:
data = []
for text in texts:
    with open(text, 'r', encoding='utf8') as file:
      # f = file.readlines()
      # data.extend(f)
      new_dict = {'segmented': '', 'glossed': '', 'translation': None, 'metadata': None}
      for string in file:
          if re.findall('\d+(\_\d+)*=', string):
              new_dict['translation'] = re.search('(?<=\d=[\t ]).*', string).group()
              data.append(new_dict)
              new_dict = {'segmented': '', 'glossed': '', 'translation': None, 'metadata': None}
              continue
          if re.findall('\d+\>', string):
              substring = re.search('(?<=\>[ \t]).*', string).group()
              substring = re.sub('\t+', '\t', substring)
              new_dict['segmented'] += '\t'.join([substring])
              continue
          if re.findall('\d+\<', string):
              substring = re.search('(?<=\<[ \t]).*', string).group()
              substring = re.sub('\t+', '\t', substring)
              new_dict['glossed'] += '\t'.join([substring])
          if re.findall('#', string):
              string = re.sub('(?<=#) *', '', string)
              if new_dict['metadata'] is None:
                  new_dict['metadata'] = '\n'.join([re.search('(?<=#).*', string).group()])
              else:
                  new_dict['metadata'] += '\n' + '\n'.join([re.search('(?<=#).*', string).group()])

In [None]:
data[125]

# Few-shot: морфемная сегментация

Для задачи морфемной сегментации было проведено разделение выборки на пословную выборку.

Для обеспечения минимального пересечение между тренировочной и тестовой выборки была написана функция с гиперпараметром. Эмпирически был подобран минимальное возможное пересечение – 25%.



---



In [None]:
!curl -L -o final_glosses.csv 'https://docs.google.com/spreadsheets/d/19045IoPzWSiTvmZC3zQIS1vqRYpwazQPCKTEov2dCcU/export?exportFormat=csv'

In [None]:
final_glosses = pd.read_csv('final_glosses.csv').drop(['Category'], axis=1)

In [None]:
final_glosses

In [None]:
morph_gloss = {}
for key in final_glosses.itertuples():
    morphemes = key.Morph.split(', ')
    for morph in morphemes:
        morph_gloss[morph] = key.Gloss

In [None]:
!curl -L -o data.json "https://drive.google.com/uc?export=download&id=1lqJPVhTz1F_hfCPj65rfu1s7S52rvwat"

In [None]:
import json

f = open('/content/data.json', 'r', encoding='utf8')
data = json.load(f)
f.close()

In [None]:
len(data)

In [None]:
# разбиение выборки на слова для few-shot сегментации

all_words = []
all_labels = []
for sent in data:
    for word in sent['segmented'].split('\t'):
        word = re.sub('[\"\«\»,\.\(\)\?\!\[А-Я:\]]+', '', word.lower())
        orig = word.replace('-', '')
        if re.findall('(?<![ˇ’ʻ‘ʼ\'р̌’‘ӻӿӃӾЧА-яёЁӽӈғӄӷ])[ӷр̌ӻӿа-яёӽӈғӄ](?![р̌’ʻ‘\'ʼӻғӿА-яЁёӽӈа-яˇӄӷ])', orig):
            continue
        if re.findall('"[ˇ’ʻ‘ʼ\'р̌’‘ӻӿӃӾЧА-яёЁӽӈғӄӷ]{2}"', orig):
            continue
        orig = re.sub('[\"\«\»,\.\(\)\?\!\[А-Я:\]]+', '', orig.lower())
        if orig != '':
            all_words.append(orig)
            all_labels.append(word)

In [None]:
dataset = [{key: value} for key, value in zip(all_words, all_labels)]

In [None]:
len(dataset)

In [None]:
import random

def split_with_overlap_limit(dataset, train_ratio=0.5, max_overlap_ratio=0.2, max_attempts=100):
    """
    Разделяет данные на обучающую и тестовую выборки с ограничением на пересечение.
    """
    for attempt in range(max_attempts):
        random.shuffle(dataset)
        # Случайное разделение
        random.shuffle(dataset)
        train_size = int(len(all_words) * train_ratio)
        train = dataset[:train_size]
        test = dataset[train_size:]

        train_set = set([list(word.keys())[0] for word in train])
        test_set = set([list(word.keys())[0] for word in test])

        # Проверка пересечения
        overlap = train_set.intersection(test_set)
        overlap_ratio = len(overlap) / len(test_set)

        # Если пересечение в пределах допустимого, завершаем
        if overlap_ratio <= max_overlap_ratio:
            return train, test

    raise ValueError("Не удалось разделить данные с заданным ограничением на пересечение.")


# Разделение с ограничением на пересечение
train, test = split_with_overlap_limit(dataset, train_ratio=0.3, max_overlap_ratio=0.25)

print("Обучающая выборка:", train)
print("Тестовая выборка:", test)

In [None]:
len(test)

Данная функция, вдохновленная метрикой chrF++, обеспечивает ретрив релевантных примеров. По максимально совпадающим биграммам выбираются до 10 актуальных примеров, а также выделяются морфемы, которые, возможно, встречаются в тестовом слове (target word)

In [None]:
def get_ngrams(word, n=2):
    return [word[i:i+n] for i in range(len(word) - n + 1)]

def calculate_chrf(true_word, candidate_word, n=2, beta=1):
    true_ngrams = get_ngrams(true_word, n)
    candidate_ngrams = get_ngrams(candidate_word, n)

    true_counts = Counter(true_ngrams)
    candidate_counts = Counter(candidate_ngrams)

    overlap = sum(min(true_counts[ng], candidate_counts[ng]) for ng in candidate_counts if ng in true_counts)

    precision = overlap / len(candidate_ngrams) if candidate_ngrams else 0
    recall = overlap / len(true_ngrams) if true_ngrams else 0

    if precision == 0 or recall == 0:
        return 0

    chrf = ((1 + beta**2) * precision * recall) / (recall + beta**2 * precision)
    return chrf

In [None]:
target_word = list(test[148].keys())[0]

In [None]:
test[148]

In [None]:
results = []
for word in train:
    cand, segm = list(word.items())[0]
    chrf_score = calculate_chrf(target_word, cand)
    results.append((cand, segm, chrf_score))

results.sort(key=lambda x: x[2], reverse=True)

examples = results[:30]
glosses = set()
for word, segm, score in sorted(set(examples), key=lambda x: -x[2]):
    morphemes = segm.split('-')
    for m in morphemes:
        if m in morph_gloss:
            glosses.add(f'{m}={morph_gloss[m]}')
    print(f"Слово: {word}, разделение: {segm} chrF++: {score:.3f}")

Системный промпт для Гигачата

In [None]:
# prompt = """### Ты – выдающийся лингвист, специализирующийся в морфологии нивхского языка.

# #### Задача
# Отглосировать слово, то есть выделить составляющие его морфемы и разделить их дефисами ('-'). В качестве примера будет дано несколько уже отглоссированных слов.

# #### Инструкции
# 1. Проанализируй структуру предложенного слова.
# 2. Найди в нем отдельные морфемы (корень, суффиксы, окончания) и раздели их дефисами.
# 4. Не изменяй символы в словах: все символы должны быть точно такими же, как в оригинале. Не удаляй буквы и не вставляй другие.
# 5. Возможные морфемы могут помочь, но не всегда присутствуют в оригинальном слове.
# 6. Слово из твоего ответ без дефисов должны быть идентичным оригинальному.

# #### Формат ответа
# - Результат должен содержать оригинальное слово, но со всеми выявленными морфемами, разделенными дефисами.
# - Тебе необходимо ответить только одним словом, разделенным на дефисы

# #### Пример
# Раздели следующие слово на морфемы: `ӽаудь`

# Другие слова:
# `ғаудь`: `ғау-дь`
# `ӽаугудьғу`: `ӽау-гу-дь-ғу`
# `ӄ’аудь`: `ӄ’ау-дь`
# `ӄаудь`: `ӄау-дь`

# Возможные морфемы:
# - 'ӄ’ау': AUX:NEG
# - 'дь': IND
# - 'ғу': PL

# ----------------
# Твой ответ: `ӽау-дь`"""

In [None]:
prompt = """Ты – выдающийся лингвист, специализирующийся в морфологии нивхского языка.

#### Задача
Отглосировать слово, то есть выделить составляющие его морфемы и разделить их дефисами ('-').

#### Инструкция
1. Проанализируй структуру предложенного слова.
2. Найди в нём отдельные морфемы (корень, аффиксы, окончания) и раздели их дефисами.
3. Морфемы могут отсутствовать в некоторых случаях; учитывай этот факт при анализе.
4. Сохраняй неизменными все символы в словах.

#### Формат ответа
Ответ должен содержать только одно слово, которое будет оригинальным словом, разделённым дефисами согласно найденным морфемам.

#### Пример работы
----------------
Раздели следующие слово на морфемы: `ӽаудь`

Другие слова:
`ғаудь`: `ғау-дь`
`ӽаугудьғу`: `ӽау-гу-дь-ғу`
`ӄ’аудь`: `ӄ’ау-дь`
`ӄаудь`: `ӄау-дь`

Возможные морфемы:
- 'ӄ’ау': AUX:NEG
- 'дь': IND
- 'ғу': PL

Твой ответ: `ӽау-дь`"""

$chrF$++ $= \frac {(1 + \beta^2) \cdot P \cdot R} {R + β^2 \cdot P} $

где:

$P$ – precision (доля совпадающих n-грамм в предсказании относительно всех n-грамм в предсказании).

$R$ – recall (доля совпадающих n-грамм в истинной строке относительно всех n-грамм в истинной строке).

$\beta$ – параметр, который регулирует важность recall относительно precision.


In [None]:
def get_ngrams(word, n=2):
    # получение n-грам
    return [word[i:i+n] for i in range(len(word) - n + 1)]

def calculate_chrf(true_word, candidate_word, n=2, beta=1):
    true_ngrams = get_ngrams(true_word, n)
    candidate_ngrams = get_ngrams(candidate_word, n)

    true_counts = Counter(true_ngrams)
    candidate_counts = Counter(candidate_ngrams)

    overlap = sum(min(true_counts[ng], candidate_counts[ng]) for ng in candidate_counts if ng in true_counts)

    precision = overlap / len(candidate_ngrams) if candidate_ngrams else 0
    recall = overlap / len(true_ngrams) if true_ngrams else 0

    if precision == 0 or recall == 0:
        return 0

    chrf = ((1 + beta**2) * precision * recall) / (recall + beta**2 * precision)
    return chrf

In [None]:
def format_prompt(target_word):
    prompt = """Раздели следующие слово на морфемы: `{0}`

Похожие слова:
{1}

Возможные морфемы:
{2}"""

    glosses = set()
    final_examples = []
    results = []
    # высчитываем метрику chrF++ для таргета по трейну
    for word in train:
        cand, segm = list(word.items())[0]
        chrf_score = calculate_chrf(target_word, cand)
        results.append((cand, segm, chrf_score))

    # берем 30 ближайших слов
    results.sort(key=lambda x: x[2], reverse=True)
    examples = sorted(set(results[:30]), key=lambda x: -x[2])

    for word, segm, score in examples:
        morphemes = segm.split('-')
        for m in morphemes:
            if m in morph_gloss:
                glosses.add(f'{m}={morph_gloss[m]}')

        final_examples.append(f"Слово: {word}, разделение: {segm}")
        poss_glosses = '- ' + '\n- '.join(list(glosses)[:3])
    final_examples = '\n'.join(final_examples[:5])
    new_prompt = prompt.format(target_word, final_examples, poss_glosses)
    return new_prompt

In [None]:
target_prompt = format_prompt('ӿоӄот')
print(target_prompt)

In [None]:
!pip --quiet install gigachat

In [None]:
from google.colab import userdata
token = userdata.get('Giga_TOKEN')

In [None]:
url = "https://ngw.devices.sberbank.ru:9443/api/v2/oauth"

payload = 'scope=GIGACHAT_API_PERS'
headers = {
    'Content-Type': 'application/x-www-form-urlencoded',
    'Accept': 'application/json',
    'RqUID': '1777f9ac-7f6e-4632-9a84-24374af0adb3',
    'Authorization': f'Basic {token}'
}

response = requests.request("POST", url, headers=headers, data=payload, verify=False)

access_token = response.json()['access_token']

In [None]:
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

In [None]:
url = "https://gigachat.devices.sberbank.ru/api/v1/chat/completions"

In [None]:
def get_giga_answers(target_word, access_token):
    headers = {
    'Content-Type': 'application/json',
    'Accept': 'application/json',
    'Authorization': f'Bearer {access_token}'}

    payload = {
    "model": "GigaChat",
    "messages":
            [{"role": "system",
              "content": prompt}],
    "profanity_check": True,
    "max_tokens": 15}

    target_prompt = format_prompt(target_word)
    payload['messages'].append({'role': 'user',
                                'content': target_prompt})

    response = requests.request("POST", url, headers=headers, data=json.dumps(payload), verify=False)
    return response

In [None]:
import tqdm
import time

In [None]:
# Функция для генерации нового токена
def generate_access_token(token):
    url = "https://ngw.devices.sberbank.ru:9443/api/v2/oauth"
    payload = 'scope=GIGACHAT_API_PERS'

    headers = {
    'Content-Type': 'application/x-www-form-urlencoded',
    'Accept': 'application/json',
    'RqUID': '1777f9ac-7f6e-4632-9a84-24374af0adb3',
    'Authorization': f'Basic {token}'
}

    print("\nГенерация нового токена...\n")
    response = requests.request("POST", url, headers=headers, data=payload, verify=False)
    token = response.json()['access_token']
    return token

In [None]:
len(test)

In [None]:
all_preds = []

In [None]:
access_token = generate_access_token(token)
last_token_time = time.time()
token_lifetime = 25 * 60
final_data = {}
for word in tqdm.tqdm(test):
    if time.time() - last_token_time >= token_lifetime:
        access_token = generate_access_token(token)
        last_token_time = time.time()
        print('\nНовый токен\n')

    final_data = {}
    word, true_label = list(word.items())[0]
    response = get_giga_answers(word, access_token)
    pred = response.json()['choices'][0]['message']['content']
    final_data['word'] = word
    final_data['true_label'] = true_label
    final_data['pred_label'] = pred
    all_preds.append(final_data)

In [None]:
f = open('preds_2_prompt.json', 'w', encoding='utf8')
new_file = json.dumps(all_preds)
f.write(new_file)
f.close()

In [None]:
len(all_preds)

In [None]:
preds = pd.DataFrame(all_preds)

In [None]:
preds

In [None]:
to_drop = list(preds[preds['word'].str.contains('(?<![ˇ’ʻ‘ʼ\'р̌’‘ӻӿӃӾЧА-яёЁӽӈғӄӷ])[ӷр̌ӻӿа-яёӽӈғӄ](?![р̌’ʻ‘\'ʼӻғӿА-яЁёӽӈа-яˇӄӷ])')].index)

In [None]:
to_drop.extend(list(preds[preds['word'].str.contains('"[ˇ’ʻ‘ʼ\'р̌’‘ӻӿӃӾЧА-яёЁӽӈғӄӷ]{2}"')].index))

In [None]:
preds = preds.drop(to_drop)

In [None]:
preds

In [None]:
preds[preds['true_label'] == preds['pred_label']]

In [None]:
from sklearn.metrics import accuracy_score
accuracy = accuracy_score(list(preds['true_label']), list(preds['pred_label']))

In [None]:
print(f'Word accuracy: {accuracy*100:.2f}%')

In [None]:
true_data = [x.split('-') for x in list(preds['true_label'])]
true_data[0]

In [None]:
predicted_data = [x.split('-') for x in list(preds['pred_label'])]
predicted_data[0]

In [None]:
def calculate_overall_accuracy(true_data, predicted_data):
    """
    Вычисляет общую точность морфем для множества слов.
    """
    total_correct = 0
    total_count = 0

    for true_segments, predicted_segments in zip(true_data, predicted_data):
        total_correct += sum(1 for true, pred in zip(true_segments, predicted_segments) if true == pred)
        total_count += len(true_segments)

    return total_correct / total_count if total_count > 0 else 0.0


overall_accuracy = calculate_overall_accuracy(true_data, predicted_data)
print(f"Overall Morpheme Accuracy: {overall_accuracy:.2%}")

In [None]:
from collections import Counter

def compute_chrf_morpheme_as_ngram(true_segments, predicted_segments, beta=2):
    """
    Вычисляет метрику chrF++ для сегментированных данных, где морфемы рассматриваются как n-граммы.
    """
    def compute_fscore(precision, recall, beta):
        """ Вычисляет F-меру. """
        if precision + recall == 0:
            return 0.0
        return (1 + beta**2) * (precision * recall) / ((beta**2 * precision) + recall)

    ref_morphemes = Counter(true_segments)
    hyp_morphemes = Counter(predicted_segments)

    # совпадающие морфемы
    common_morphemes = ref_morphemes & hyp_morphemes
    matches = sum(common_morphemes.values())

    # общее количество морфем
    total_ref_morphemes = sum(ref_morphemes.values())
    total_hyp_morphemes = sum(hyp_morphemes.values())

    # точность и полнота
    precision = matches / total_hyp_morphemes if total_hyp_morphemes > 0 else 0
    recall = matches / total_ref_morphemes if total_ref_morphemes > 0 else 0

    # вычисление F-меры
    chrf_score = compute_fscore(precision, recall, beta)
    return chrf_score

In [None]:
def compute_overall_chrf_morpheme_as_ngram(true_data, predicted_data, beta=2):
    """
    Вычисляет средний chrF++ для множества сегментированных данных.
    """
    scores = [
        compute_chrf_morpheme_as_ngram(true_segments, predicted_segments, beta)
        for true_segments, predicted_segments in zip(true_data, predicted_data)
    ]
    return sum(scores) / len(scores) if scores else 0.0


overall_chrf = compute_overall_chrf_morpheme_as_ngram(true_data, predicted_data)

In [None]:
print(f'Word accuracy: {accuracy:.2%}')
print(f"Morpheme Accuracy: {overall_accuracy:.2%}")
print(f"Overall chrF++ Score for Morpheme-as-ngram Data: {overall_chrf:.2%}")

In [None]:
train_set = {idx: [list(key_dict.items())[0][0], list(key_dict.items())[0][1]] for idx, key_dict in enumerate(train)}
df = pd.DataFrame(train_set, index=['word', 'segmentation']).T
df.to_csv('train.csv')

### Попытка исправить

In [None]:
new_test = [{key.word: key.true_label} for key in preds[preds['true_label'] != preds['pred_label']].itertuples()]

In [None]:
print(list(preds[preds['true_label'] != preds['pred_label']].index))

In [None]:
access_token = generate_access_token(token)
last_token_time = time.time()
token_lifetime = 25 * 60
final_data = {}
for word in tqdm.tqdm(new_test):
    if time.time() - last_token_time >= token_lifetime:
        access_token = generate_access_token(token)
        last_token_time = time.time()
        print('Новый токен')

    final_data = {}
    word, true_label = list(word.items())[0]
    response = get_giga_answers(word, access_token, )
    pred = response.json()['choices'][0]['message']['content']
    final_data['word'] = word
    final_data['true_label'] = true_label
    final_data['pred_label'] = pred
    all_preds.append(final_data)

In [None]:
new_preds = all_preds[2597:]
all_preds = all_preds[:2597]

In [None]:
len(all_preds)

In [None]:
print(list(preds[preds['true_label'] != preds['pred_label']].index))

In [None]:
iter = 0
for idx in list(preds[preds['true_label'] != preds['pred_label']].index):
    all_preds[idx] = new_preds[iter]
    iter += 1

In [None]:
new_preds = pd.DataFrame(all_preds)
new_preds

In [None]:
to_drop = new_preds[new_preds['word'].str.contains('"[ˇ’ʻ‘ʼ\'р̌’‘ӻӿӃӾЧА-яёЁӽӈғӄӷ]{2}"')].index

In [None]:
new_preds = new_preds.drop(to_drop)

In [None]:
new_preds['true_label'] = new_preds['true_label'].str.replace('"', '')
new_preds['word'] = new_preds['word'].str.replace('"', '')
new_preds['true_label'] = new_preds['true_label'].str.replace('»', '')
new_preds['pred_label'] = new_preds['pred_label'].str.replace('=', '-')
new_preds['word'] = new_preds['word'].str.replace('»', '')

new_preds

In [None]:
from sklearn.metrics import accuracy_score
accuracy = accuracy_score(list(new_preds['true_label']), list(new_preds['pred_label']))
accuracy

In [None]:
new_preds.to_csv('new_preds.csv')

### Анализ ошибок модели (?)

In [None]:
preds[new_preds['pred_label'] != preds['true_label']]

In [None]:
train_set = [list(key)[0] for key in train]

In [None]:
errors = {'segm_error': 0,
          'deleted_symb': 0,
          'added_symb': 0,
          'difficult_words': 0,
          'preprocess_error': 0,
          'other': 0}

for word in new_preds[new_preds['pred_label'] != new_preds['true_label']].itertuples():
    real_word = word.word
    pred_word = word.pred_label.replace('-', '')

    # ошибка разметки
    if re.search('[A-z]+', real_word):
        errors['preprocess_error'] += 1
        continue

    # просто ошибка в сегментации
    if len(real_word) == len(pred_word):
        if real_word == pred_word:
            errors['segm_error'] += 1
            continue

    # удаление лишних символов
    if len(real_word) > len(pred_word):
        errors['deleted_symb'] += 1
        continue

    if len(real_word) < len(pred_word):
        errors['added_symb'] += 1
        continue

    if real_word not in train_set:
        errors['difficult_words'] += 1
        continue

    # прочее (замена символа в слове на другой символ)
    else:
        errors['other'] += 1

In [None]:
[float(f'{x / sum(errors.values()):.2f}') for x in errors.values()]

In [None]:
import matplotlib.pyplot as plt

# Данные для круговой диаграммы
labels = ['Сегментация', 'Удаление', 'Вставка', 'Разметка', 'Предобработка', 'Прочее']
sizes = [float(f'{x / sum(errors.values()):.2f}') for x in errors.values()]  # Процентное соотношение
colors = ['#607196', '#CCD7C5', '#C7A27C', '#EFD2CB', '#c2c2f0', '#EE9480', '#285943']  # Цветовая палитра
explode = (0.1, 0, 0, 0, 0, 0)  # Выделение первого сегмента

# Создание фигуры и осей
plt.figure(figsize=(8, 8))  # Размер графика
plt.style.use('ggplot')

# Построение круговой диаграммы
plt.pie(
    sizes,
    explode=explode, # Выделяем первый сегмент
    colors=colors,    # Цвета сегментов
    autopct='%1.1f%%',  # Отображение процентов
    shadow=False,      # Тень для объёма
    startangle=90,    # Начальный угол поворота диаграммы
    textprops={'fontsize': 9}, # Размер шрифта для подписей
    pctdistance=1.09,
    wedgeprops=dict(width=0.3)
)

# Обеспечиваем круглую форму диаграммы
plt.axis('equal')

# Добавляем заголовок
plt.title('Типы ошибок в процентах', fontsize=16, fontweight='bold')
plt.legend(labels, title="Категории", loc="best")

# Отображение диаграммы
plt.show()

## Глоссирование как задача классификации

In [None]:
import re
import tqdm
import time
import requests
import pandas as pd
import numpy as np
import random
from collections import Counter

In [None]:
!curl -L -o final_glosses.csv 'https://docs.google.com/spreadsheets/d/19045IoPzWSiTvmZC3zQIS1vqRYpwazQPCKTEov2dCcU/export?exportFormat=csv'

In [None]:
final_glosses = pd.read_csv('final_glosses.csv').drop(['Category'], axis=1)

In [None]:
!curl -L -o data.json "https://drive.google.com/uc?export=download&id=1UK3M9yhRbG59dxKUpo67DROx5KK84CVe"

In [None]:
!curl -L -o stem.json "https://drive.google.com/uc?export=download&id=1EBpjmHoy0Tj6kC1eWaQSV7LvjSrp0kc0"

In [None]:
import json

f = open('/content/stem.json', 'r', encoding='utf8')
stems = json.load(f)
f.close()

In [None]:
morph_gloss = {}
for key in final_glosses.itertuples():
    morphemes = key.Morph.split(', ')
    for morph in morphemes:
        if morph not in morph_gloss:
            morph_gloss[morph] = []
        morph_gloss[morph].append(key.Gloss)

In [None]:
import json

f = open('/content/data.json', 'r', encoding='utf8')
data = json.load(f)
f.close()

In [None]:
gold_segmented = [x['segmented'] for x in data]
gold_labels = [x['glossed'] for x in data]
gold_translation = [x['translation'] for x in data]

In [None]:
gold_segmented[7]

In [None]:
gold_labels[7]

In [None]:
gold_translation[7]

In [None]:
dataset = [{'segm': x, 'label': y, 'translation': z} for x, y, z in zip(gold_segmented, gold_labels, gold_translation)]

In [None]:
random.seed(72)

random.shuffle(dataset)

In [None]:
len(dataset)

In [None]:
train, test = dataset[:200], dataset[200:]

In [None]:
print(train[25]['label'].replace('\t', ' '))

In [None]:
system_prompt = """Ты – выдающийся лингвист, специализирующийся в морфологии нивхского языка.

#### Задача
Определить соответствие уже выделенных морфем их грамматическим значениям.

#### Инструкция
1. Каждой части словоформы соотнеси русскую лемму или сооответствующую глоссу из предоставленного списка.
2. Для определения глоссы леммы используй перевод.
3. Сохраняй неизменными все символы в словах.

#### Формат ответа
Ответ должен содержать строку, с таким же количеством слов, как и в поданном на разбор предложении.

#### Пример работы
----------------
Определи глоссы в следующем предложении:
Иф сидь лаға ғар в ытык ғе, в ымык ғе сык ны т’а дь.

Перевод:
Что он ни требовал, мать с отцом всё делали.

Список возможных глосс:
- 'ғар': RES, IND:EMPH, CONV:SUBJ
- 'в': REC
- 'ғе': COM
- 'т’а': USIT.1.PL, USIT.1.SG, USIT.2.PL, USIT.3.PL, PROH, COORD.1.PL
- 'дь': NMN:P, IND
----------------
Твой ответ: Иф=он сидь=что лаға=требовать ғар=CONV:SUBJ в=REC ытык=отец ғе=COM в=REC ымык=мать ғе=COM сык=весь ны=делать т’а=USIT.3.PL дь=IND"""

In [None]:
def generate_access_token(token):
    url = "https://ngw.devices.sberbank.ru:9443/api/v2/oauth"
    payload = 'scope=GIGACHAT_API_PERS'

    headers = {
    'Content-Type': 'application/x-www-form-urlencoded',
    'Accept': 'application/json',
    'RqUID': '1777f9ac-7f6e-4632-9a84-24374af0adb3',
    'Authorization': f'Basic {token}'
}

    print("\nГенерация нового токена...\n")
    response = requests.request("POST", url, headers=headers, data=payload, verify=False)
    token = response.json()['access_token']
    return token

In [None]:
train[25]

In [None]:
def process_sentence(dataset):
    final_data = []
    # Разделяем сегментированный текст на токены и морфемы
    for data in dataset:
        segm_tokens = data['segm'].split('\t')  # Разделение по табуляции
        morphemes = [token.split('-') for token in segm_tokens]  # Разделение на морфемы

        # Разделяем метки на токены и их грамматические метки
        label_tokens = data['label'].split('\t')  # Разделение по табуляции
        labels = [label.split('-') for label in label_tokens]  # Разделение на метки

        # Формируем структурированное представление
        processed_data = {
            "text": " ".join(segm_tokens),  # Исходный текст
            "label_tokens": " ".join(label_tokens),
            "translation": data['translation'],  # Перевод
            "tokens": [
                {
                    "token": token,
                    "morphemes": morpheme_list,
                    "labels": label_list
                }
                for token, morpheme_list, label_list in zip(segm_tokens, morphemes, labels)
            ]
        }
        final_data.append(processed_data)
    return final_data

In [None]:
train_data = process_sentence(train)
test_data = process_sentence(test)

In [None]:
def weighted_retrieval(target_sentence, corpus, token_weight=0.5, morpheme_weight=0.8, top_n=3):

    def count_matching_elements(target, candidate):
        """
        Подсчитывает количество совпадающих элементов между целевым списком и кандидатом.
        """
        target_counter = Counter(target)
        candidate_counter = Counter(candidate)
        common_elements = target_counter & candidate_counter
        return sum(common_elements.values())

    # Извлечение токенов и морфем из целевого предложенияx
    target_tokens = [token["token"] for token in target_sentence["tokens"]]
    target_morphemes = [morpheme for token in target_sentence["tokens"] for morpheme in token["morphemes"]]

    # Подсчет совпадений для каждого предложения в корпусе
    scored_sentences = []
    for sentence in corpus:
        # Извлечение токенов и морфем из предложения корпуса
        sentence_tokens = [token["token"] for token in sentence["tokens"]]
        sentence_morphemes = [morpheme for token in sentence["tokens"] for morpheme in token["morphemes"]]

        # Подсчет совпадений токенов
        token_score = count_matching_elements(target_tokens, sentence_tokens)

        # Подсчет совпадений морфем
        morpheme_score = count_matching_elements(target_morphemes, sentence_morphemes)

        # Взвешенная сумма
        total_score = (
            token_score * token_weight +
            morpheme_score * morpheme_weight
        )

        # Сохраняем результат
        scored_sentences.append((sentence, total_score))

    # Сортировка предложений по взвешенному скору
    scored_sentences.sort(key=lambda x: x[1], reverse=True)

    # Возвращаем топ-N предложений
    return [sentence for sentence, _ in scored_sentences[:top_n]]

In [None]:
test_data[25]['text']

In [None]:
print('\n\n'.join(['\n'.join([x['text'], x['label_tokens']]) for x in weighted_retrieval(test_data[25], train_data, top_n=2)]) )

In [None]:
for x in weighted_retrieval(test_data[25], train_data, top_n=2):
    print(x['text'], end='\n')
    print(x['label_tokens'], end='\n\n')

In [None]:
test_data[3]

In [None]:
def make_label(sent):
    text = re.sub('[\.\,]+', '', sent['text']).replace('-', ' ').split()
    label_tokens = sent['label_tokens'].replace('-', ' ').split()
    final_label = ' '.join([f'{x}={y}' for x, y in zip(text, label_tokens)])
    return final_label

In [None]:
make_label(test_data[3])

In [None]:
"""
Необходимая для ретрива словарных данных функция
"""

def extract_stems(tokens):
    """
    Извлекает основы (стемы) из токенов.
    """
    stems = set()
    for token in tokens:
        # Берем первую морфему как основу (или можно использовать более сложную логику)
        if token["morphemes"]:
            stems.add(token["morphemes"][0])  # Первая морфема считается основой
    return list(stems)


def retrieve_from_stem_dictionary(target_sentence, stem_dictionary):
    """
    Выполняет ретрив из словаря основ
    """
    # Извлечение основ из целевого предложения
    target_tokens = target_sentence["tokens"]
    target_stems = extract_stems(target_tokens)

    # Поиск совпадений в словаре
    results = []
    for stem in target_stems:
        if stem in stem_dictionary:
            trads = '; '.join(stem_dictionary[stem]['ru'])
            results.append(f'{stem}: {trads}')

    return results

In [None]:
def format_segm_prompt(sent, random_sample=False):
    prompt = """Определи глоссы в следующем предложении:
{0}

Перевод:
{1}

Похожие примеры:
{2}

Список возможных глосс:
{3}

Русские основы слов:
{4}
"""
    segm_sent = sent['text'].replace('-', ' ')
    translation = sent['translation']
    if random_sample:
        top_sent = random.sample(train_data, 2)
    else:
        top_sent = weighted_retrieval(sent, train_data, top_n=2)
    format_top = []
    for sent in top_sent:

        text = sent['text']
        label = make_label(sent)
        format_top.append(f'{text}\n{label}')

    format_top = '\n\n'.join(format_top)
    all_morphemes = []
    for x in test_data[25]['tokens']:
        for m in x['morphemes']:
            if m in morph_gloss:
                all_morphemes.append(f"{m}: {', '.join(morph_gloss[m])}")

    array = '\n'.join(all_morphemes)
    rus_stem = '\n'.join(retrieve_from_stem_dictionary(sent, stems))
    prompt = prompt.format(segm_sent, translation, format_top, array, rus_stem)
    return prompt

In [None]:
print(format_segm_prompt(test_data[25], random_sample=True))

In [None]:
url = "https://gigachat.devices.sberbank.ru/api/v1/chat/completions"

In [None]:
def get_giga_answers_segmented(sentence, access_token, random_sample=False):
    headers = {
    'Content-Type': 'application/json',
    'Accept': 'application/json',
    'Authorization': f'Bearer {access_token}'}

    payload = {
    "model": "GigaChat",
    "messages":
            [{"role": "system",
              "content": system_prompt}],
    "profanity_check": True,
    "max_tokens": 200}

    if random_sample:
        target_prompt = format_segm_prompt(sentence, random_sample)
    else:
        target_prompt = format_segm_prompt(sentence)
    payload['messages'].append({'role': 'user',
                                'content': target_prompt})

    response = requests.request("POST", url, headers=headers, data=json.dumps(payload), verify=False)
    return response

In [None]:
from google.colab import userdata
token = userdata.get('Giga_TOKEN')

In [None]:
all_preds = []

In [None]:
access_token = generate_access_token(token)
last_token_time = time.time()
token_lifetime = 25 * 60
final_data = {}

for sent in tqdm.tqdm(test_data):
    if time.time() - last_token_time >= token_lifetime:
        access_token = generate_access_token(token)
        last_token_time = time.time()
        print('\nНовый токен\n')

    final_data = {}
    segment = sent['text']
    true_label = sent['label_tokens']

    response = get_giga_answers_segmented(sent, access_token)
    pred = response.json()['choices'][0]['message']['content']
    final_data['sent'] = segment
    final_data['true_label'] = make_label(sent)
    final_data['pred_label'] = pred
    all_preds.append(final_data)

In [None]:
random_sample_preds = []

In [None]:
access_token = generate_access_token(token)
last_token_time = time.time()
token_lifetime = 25 * 60
final_data = {}

for sent in tqdm.tqdm(test_data):
    if time.time() - last_token_time >= token_lifetime:
        access_token = generate_access_token(token)
        last_token_time = time.time()
        print('\nНовый токен\n')

    final_data = {}
    segment = sent['text']
    true_label = sent['label_tokens']

    response = get_giga_answers_segmented(sent, access_token, random_sample=True)
    pred = response.json()['choices'][0]['message']['content']
    final_data['sent'] = segment
    final_data['true_label'] = make_label(sent)
    final_data['pred_label'] = pred
    random_sample_preds.append(final_data)

In [None]:
all_preds[100]

In [None]:
!pip install sacrebleu

In [None]:
import sacrebleu

In [None]:
from sacrebleu.metrics import CHRF

In [None]:
def compute_chrf(true_label, pred_label):
    """
    Вычисляет метрику chrF между true_label и pred_label.

    :param true_label: Истинная метка (str).
    :param pred_label: Предсказанная метка (str).
    :return: Значение chrF (float).
    """
    chrf = CHRF()
    score = chrf.corpus_score([pred_label], [[true_label]])
    return score.score

In [None]:
compute_chrf(all_preds[100]['true_label'], all_preds[100]['pred_label'])

In [None]:
final_metric = 0
for pred in all_preds:
    final_metric += compute_chrf(pred['true_label'], pred['pred_label'])

print(f'ChrF metric for segmentation: {final_metric/len(all_preds):.2f}')

In [None]:
all_preds[317]

In [None]:
random_sample_preds[317]

In [None]:
compute_chrf(all_preds[317]['true_label'], all_preds[317]['pred_label'])

In [None]:
compute_chrf(random_sample_preds[317]['true_label'], random_sample_preds[317]['pred_label'])

In [None]:
final_metric = 0
for pred in random_sample_preds:
    final_metric += compute_chrf(pred['true_label'], pred['pred_label'])

print(f'ChrF metric for segmentation: {final_metric/len(all_preds):.2f}')

In [None]:
from collections import Counter

def compute_morpheme_metrics(true_labels, pred_labels):

    def split_into_morphemes(label):
        """Разбивает текст на морфемы."""
        morphemes = []
        for token in label.split():
            morphemes.extend(token.split('-'))  # Разделяем по "-"
        return morphemes

    total_matches = 0
    total_true_morphemes = 0
    total_pred_morphemes = 0

    for true_label, pred_label in zip(true_labels, pred_labels):
        # Извлечение морфем
        true_morphemes = split_into_morphemes(true_label)
        pred_morphemes = split_into_morphemes(pred_label)

        # Подсчет совпадений
        true_counter = Counter(true_morphemes)
        pred_counter = Counter(pred_morphemes)
        common_morphemes = true_counter & pred_counter
        matches = sum(common_morphemes.values())

        # Обновление общих счетчиков
        total_matches += matches
        total_true_morphemes += len(true_morphemes)
        total_pred_morphemes += len(pred_morphemes)

    # Вычисление точности и полноты
    precision = total_matches / total_pred_morphemes if total_pred_morphemes > 0 else 0
    recall = total_matches / total_true_morphemes if total_true_morphemes > 0 else 0

    # Вычисление F-меры
    f_score = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0

    return round(precision, 3), round(recall, 3), round(f_score, 3)

In [None]:
true_all_preds = [x['true_label'] for x in all_preds]
pred_all_preds = [x['pred_label'] for x in all_preds]
compute_morpheme_metrics(true_all_preds, pred_all_preds)

In [None]:
true_labels = [x['true_label'] for x in random_sample_preds]
pred_labels = [x['pred_label'] for x in random_sample_preds]
compute_morpheme_metrics(true_labels, pred_labels)