# Transform from xml to python format

In [1]:
import xml.etree.ElementTree as ET
import pandas as pd
import numpy as np
from tqdm import tqdm

In [2]:
from natasha import (
    Segmenter,
    MorphVocab,
    
    NewsEmbedding,
    NewsMorphTagger,
    NewsSyntaxParser,

    Doc
)


segmenter = Segmenter()
morph_vocab = MorphVocab()

emb = NewsEmbedding()
morph_tagger = NewsMorphTagger(emb)
syntax_parser = NewsSyntaxParser(emb)

In [3]:
#Функции для получения разметки из xml файлов SentiRuEval

def parse_texts(path):
    root_node = ET.parse(path).getroot()
    texts = dict()
    for rev in root_node.findall('review'):
        texts[rev.get('id')] = rev.find('text').text
    return texts

def parse_aspects(path):
    root_node = ET.parse(path).getroot()
    global_aspects = dict()
    tag_list = ['type', 'to','term','sentiment','mark','from','category']
    for rev in root_node.findall('review'):
        id_ = rev.get('id')
        aspects = dict()
        for i, a in enumerate(rev.findall('aspects/aspect')):
            aspects[i] = dict()
            for tag in tag_list:
                aspects[i][f'{tag}'] = a.get(tag)
        global_aspects[f'{id_}'] = aspects
    return global_aspects

def mark_text(path, aspect_type = 'explicit', binary = True):
    global_aspects = parse_aspects(path)
    texts = parse_texts(path)
    
    categories = set()
    for id_ in global_aspects.values():
        for aspect in id_.values():
            categories.add(aspect['category'])
    
    markup = dict()
    for id_, aspects in global_aspects.items():
        labels = []
        tokens = list(tokenize(texts[id_]))
        for token in tokens:
            label = 0
            for aspect in aspects.values():
                if aspect['type'] == aspect_type and aspect['mark'] == 'Rel':
                    if token.start == int(aspect['from']):
                        label = 1
                    elif token.start > int(aspect['from']) and token.stop <= int(aspect['to']):
                        if binary:
                            label = 1
                        else:
                            label = 2
            labels.append(label)
        markup[id_] = labels
        
    return markup, texts

def markup_text_senti(path):
    global_aspects = parse_aspects(path)
    texts = parse_texts(path)
    markup = defaultdict(dict)
    for id_, aspects in global_aspects.items():
        spans = []
        markup[id_]['text'] = texts[id_]
        for entity in aspects.values():
            spans.append((int(entity['from']), int(entity['to']), entity['polarity']))
            spans.append((int(entity['from']), int(entity['to']), entity['category']))
        markup[id_]['aspects'] = spans
    return markup

In [4]:
senti_texts_train = parse_texts('data/SentiRuEval_car_markup_train.xml')
senti_texts_test = parse_texts('data/SentiRuEval_car_markup_test.xml')

In [5]:
senti_texts = {**senti_texts_train,**senti_texts_test}

In [6]:
senti_aspects_train = parse_aspects('data/SentiRuEval_car_markup_train.xml')
senti_aspects_test = parse_aspects('data/SentiRuEval_car_markup_test.xml')
senti_aspects = {**senti_aspects_train,**senti_aspects_test}

In [7]:
spans_senti_train = dict()
for id_ in senti_aspects_train.keys():
    entities = []
    for entity in senti_aspects_train[id_].values():
        if entity['type'] == 'explicit':
            entities.append((int(entity['from']), int(entity['to']), entity['term'], entity['sentiment'], entity['category']))
    spans_senti_train[id_] = entities

In [8]:
spans_senti_test = dict()
for id_ in senti_aspects_test.keys():
    entities = []
    for entity in senti_aspects_test[id_].values():
        if entity['type'] == 'explicit':
            entities.append((int(entity['from']), int(entity['to']), entity['term'], entity['sentiment'], entity['category']))
    spans_senti_test[id_] = entities

In [9]:
senti_train_data = {}
senti_test_data = {}

In [10]:
for k, text in senti_texts_train.items():
    senti_train_data[k] = {}
    senti_train_data[k]['text'] = text
    senti_train_data[k]['spans'] = spans_senti_train[k]
for k, text in senti_texts_test.items():
    senti_test_data[k] = {}
    senti_test_data[k]['text'] = text
    senti_test_data[k]['spans'] = spans_senti_test[k]

In [50]:
senti_texts_train['92845']

'Недавно купил этот автомобиль. Авто отличное! Двигатель 2,5 литра, турбодизель. Прежний хозяин сказал при продаже, что масло не жрет, солярку тоже, летит как угорелая! Так оно и есть. 140 км/ч нормальная крейсерская скорость. Вообще немцы умеют делать автомобили. Дорогу держит отлично, так как достаточно широкая машина. Тормоза все дисковые. Главное передний привод, по сравнению с другими немецкими автомобилями. Такими как мерседес и бмв этого же класса. Места в автомобиле очень много. Не тесно даже, когда сидят пять взрослых человек. Багажное отделение тоже огромно. Влезла стиральная машина. По соотношению цена - качество, отличный автомобиль. Больше никогда не сяду за руль российского автомбиля! Всем рекомендую Ауди. '

In [51]:
doc = Doc(senti_texts_train['92845'])
doc.segment(segmenter)
doc.parse_syntax(syntax_parser)
doc.tag_morph(morph_tagger)

In [52]:
doc.tokens

[DocToken(stop=7, text='Недавно', id='1_1', head_id='1_2', rel='advmod', pos='ADV', feats=<Pos>),
 DocToken(start=8, stop=13, text='купил', id='1_2', head_id='1_0', rel='root', pos='VERB', feats=<Perf,Masc,Ind,Sing,Past,Fin,Act>),
 DocToken(start=14, stop=18, text='этот', id='1_3', head_id='1_4', rel='det', pos='DET', feats=<Acc,Masc,Sing>),
 DocToken(start=19, stop=29, text='автомобиль', id='1_4', head_id='1_2', rel='obj', pos='NOUN', feats=<Inan,Acc,Masc,Sing>),
 DocToken(start=29, stop=30, text='.', id='1_5', head_id='1_2', rel='punct', pos='PUNCT'),
 DocToken(start=31, stop=35, text='Авто', id='2_1', head_id='2_2', rel='nsubj', pos='PROPN', feats=<Inan,Gen,Neut,Sing>),
 DocToken(start=36, stop=44, text='отличное', id='2_2', head_id='2_0', rel='root', pos='ADJ', feats=<Nom,Pos,Neut,Sing>),
 DocToken(start=44, stop=45, text='!', id='2_3', head_id='2_2', rel='punct', pos='PUNCT'),
 DocToken(start=46, stop=55, text='Двигатель', id='3_1', head_id='3_5', rel='nsubj', pos='NOUN', feats=<I

# POS and Dep parcing

In [11]:
tag_table = {'O': 0,
             'B-positive': 1,
             'B-neutral': 3,
             'B-negative': 5,
             'I-positive': 2,
             'I-neutral': 4,
             'I-negative': 6}

In [25]:
def get_docs_and_bio(rev_dict):
    asp_labels = []
    docs = []
    keys = []
    for k, review in rev_dict.items():
        keys.append(k)
        doc = Doc(review['text'])
        doc.segment(segmenter)
        doc.parse_syntax(syntax_parser)
        doc.tag_morph(morph_tagger)
        docs.append(doc)
        asp_labels_local = []
        for token in doc.tokens:
            label = 'O'
            for span in review['spans']:
                if span[3] != 'both':
                    if token.start == span[0]:
                        label = f'B-{span[3]}'
                    elif token.start > span[0] and token.stop <= span[1]:
                        label = f'I-{span[3]}'
            l = tag_table[label]
            asp_labels_local.append(l)
        asp_labels.append(asp_labels_local)
    return docs, asp_labels, keys

In [26]:
train_docs, train_asp_labels, train_keys = get_docs_and_bio(senti_train_data)

In [27]:
test_docs, test_asp_labels, test_keys = get_docs_and_bio(senti_test_data)

# Use tokenizer

In [30]:
POS_tags = ['ADJ', 'ADP', 'ADV', 'AUX',
            'CCONJ', 'DET', 'INTJ', 'NOUN',
            'NUM', 'PART', 'PRON', 'PROPN',
            'PUNCT', 'SCONJ', 'SYM', 'VERB', 
            'X', '[PAD]']

In [31]:
DEP_tags = 'root, acl, acl:relcl, advcl, advmod, amod, appos, aux, aux:pass, case, cc, ccomp, compound, conj, cop, csubj, csubj:pass, dep, det, discourse, expl, fixed, flat, flat:foreign, flat:name, iobj, list, mark, nmod, nsubj, nsubj:pass, nummod, nummod:entity, nummod:gov, obj, obl, obl:agent, orphan, parataxis, punct, xcomp'.split(', ')

In [32]:
model_checkpoint = 'blanchefort/rubert-base-cased-sentiment-rurewiews'

In [33]:
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)

In [34]:
label_all_tokens = True

In [35]:
def tokenize_and_align_labels(docs, asp_labels):
    
    examples = {}
    tokens_lists = []
    examples['tail_ids'] = []
    examples['head_ids'] = []
    examples['pos_tags'] = []
    examples['dep_tags'] = []
    examples['asp_tags'] = asp_labels
    
    for doc in docs:
        tokens = []
        tail_ids = []
        head_ids = []
        pos_tags = []
        dep_tags = []
        for token in doc.tokens:
            tokens.append(token.text)
            tail_ids.append(token.id)
            head_ids.append(token.head_id)
            pos_tags.append(POS_tags.index(token.pos))
            dep_tags.append(DEP_tags.index(token.rel))
        tokens_lists.append(tokens)
        examples['tail_ids'].append(tail_ids)
        examples['head_ids'].append(head_ids)
        examples['pos_tags'].append(pos_tags)
        examples['dep_tags'].append(dep_tags)
        
    tokenized_inputs = tokenizer(
        tokens_lists, truncation=True, is_split_into_words=True
    )
    
    for label_type, tags in examples.items():
        labels = []
        for i, label in enumerate(tags):
            word_ids = tokenized_inputs.word_ids(batch_index=i)
            previous_word_idx = None
            label_ids = []
            for word_idx in word_ids:
                # Special tokens have a word id that is None. We set the label to -100 so they are automatically
                # ignored in the loss function.
                if word_idx is None:
                    label_ids.append(-100)
                # We set the label for the first token of each word.
                elif word_idx != previous_word_idx:
                    label_ids.append(label[word_idx])
                # For the other tokens in a word, we set the label to either the current label or -100, depending on
                # the label_all_tokens flag.
                else:
                    label_ids.append(label[word_idx] if label_all_tokens else -100)
                previous_word_idx = word_idx

            labels.append(label_ids)

        tokenized_inputs[label_type] = labels
        
    return tokenized_inputs

In [36]:
train_tokenized = tokenize_and_align_labels(train_docs, train_asp_labels)

Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.


In [77]:
test_tokenized = tokenize_and_align_labels(test_docs, test_asp_labels)

In [80]:
train_dep_matrices = []
test_dep_matrices = []

In [81]:
for heads, tails, deps in zip(train_tokenized['head_ids'], 
                              train_tokenized['tail_ids'], 
                              train_tokenized['dep_tags']):
    dep_matrix = np.zeros((len(heads[1:-1]), len(heads[1:-1])))
    for i, hid in enumerate(heads[1:-1]):
        for j, tid in enumerate(tails[1:-1]):
            if hid == tid: # j <- i
                dep_matrix[j, i] = deps[1:-1][i]
    train_dep_matrices.append(dep_matrix)

In [82]:
for heads, tails, deps in zip(test_tokenized['head_ids'], 
                              test_tokenized['tail_ids'], 
                              test_tokenized['dep_tags']):
    dep_matrix = np.zeros((len(heads[1:-1]), len(heads[1:-1])))
    for i, hid in enumerate(heads[1:-1]):
        for j, tid in enumerate(tails[1:-1]):
            if hid == tid: # j <- i
                dep_matrix[j, i] = deps[1:-1][i]
    test_dep_matrices.append(dep_matrix)