In [None]:
#from google.colab import drive
#drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [None]:
#import os
#os.chdir('gdrive/My Drive/Colab Notebooks')

# установка библиотек

In [None]:
!pip install torch==1.9.0+cu102
!pip install pymorphy2

In [None]:
!pip install deeppavlov
!python -m deeppavlov install syntax_ru_syntagrus_bert
!python -m deeppavlov install squad_bert
!pip install tensorflow==1.14 
!pip install russian_tagsets

In [None]:
from deeppavlov import build_model, configs
model = build_model("ru_syntagrus_joint_parsing", download=True)

# загрузка подвыборок

In [None]:
import copy
import os
import pickle

import pandas as pd

In [None]:
f = open('train_names.txt','r', encoding='utf-8')
train_names = []
for line in f.readlines():
  train_names.append(line[:-1])
f.close()

In [None]:
f = open('test_names.txt','r', encoding='utf-8')
test_names = []
for line in f.readlines():
  test_names.append(line[:-1])
f.close()

In [None]:
texts = {}
trees = {}
results = {}

for name in test_names: # или train_names
  try:
    #указываем путь к pickle файлам с разметкой русского авторазметчика
    with open(r'corpus/Russian RST parser/pickle' + '/' + name, 'br') as f:
      obj = pickle.load(f)
      if 'tree' in name:
        trees[name] = obj
      else:
        if 'result' in name:
          results[name] = obj
        else:
          texts[name] = obj
  except:
    continue

In [None]:
#train texts
len(texts.keys()), len(results.keys()), len(trees.keys())

(79, 79, 79)

In [None]:
#test texts
len(texts.keys()), len(results.keys()), len(trees.keys())

(30, 30, 30)

# адаптация русского авторазметчика

In [None]:
c_verbs = set(["хотеть", "подумать", "думать", "сказать", "решить"])

In [None]:
def match_lemmas(tokens: list, lemmas: list) -> dict:
    """Сопоставляет токены с леммами"""

    straight_lemmas = []
    for sentence in lemmas:
        for lemma in sentence:
            straight_lemmas.append(lemma)
    tokens_texts = []
    for token in tokens:
        tokens_texts.append(token.text)
    return dict(zip(tokens_texts, straight_lemmas))

In [None]:
def lemmatize(tree: dict, tokens: list, lemmas: list) -> list:
    """Возвращает леммы токенов в дереве"""
    
    words = []
    for token in tokens:
        if tree.start <= token.begin and token.end <= tree.end:
            words.append(lemmas[token.text])
    return words

In [None]:
def rule_1(tree: dict, tokens: list, lemmas: list) -> bool:
    """Проверяет, есть ли отношение 'аттрибут/источник' """

    return tree.relation == 'attribution'


def rule_2(tree: dict, tokens: list, lemmas: list) -> bool:
    """Проверяет наличие слова 'который' при отношении 'детализация'
       Если оно есть, сегмент отделяться не будет"""

    if tree.relation == 'elaboration':
        lemmas = lemmatize(tree, tokens, lemmas)
        if 'который' in lemmas:
            return True
    return False


def rule_3(tree: dict, tokens: list, lemmas: list) -> bool:
    """Проверяет наличие когнитивного глагола в сегменте,
       Если он есть, сегмент отделяться не будет"""

    if tree.relation != 'elementary':
        lemmas = lemmatize(tree, tokens, lemmas)
        if len(c_verbs & set(lemmas)) != 0:
            return True
        else:
            return False
    else:
        return False

In [None]:
def rule_4(edu: str) -> bool:
  """Проверяет сегмент на наличие предложной группы"""

  for parse in model([edu]):
    sent_info = []
    row = parse.split('\t_\n')

    for i in row:
      inf = i.split('\t')
      sent_info.append(inf)

  get_pos = []

  for info in sent_info:
    if info[3] in ['ADP', 'VERB', 'PUNCT']:
      get_pos.append([info[0], info[3], info[6]])

  for tag in get_pos:
    if tag[1] == 'ADP' and tag[0] == '1':
      tags = [j[1] for j in get_pos]
      if 'VERB' not in tags:
        return True

  return False

In [None]:
def tree_check(tree: dict) ->  bool:
  """Дополнительная проверка на содержание сегментов"""

  if tree != None:
    if tree.left == None and tree.right == None:
      return rule_4(tree.text)

  return False

def rule_morph(tree: dict, tokens: list, lemmas: list) -> bool:
  """Проверяет дерево на наличие предложной группы"""

  res_left = tree_check(tree.left)
  res_right = tree_check(tree.right)
  if res_left == True or res_right == True:
    return True

  return False

In [None]:
def conditions_failed(tree: dict, rules: list, tokens: list, lemmas: list) -> bool:
    """Проверяет подпадает ли сегмент под условия правил"""

    for rule in rules:
        if rule(tree, tokens, lemmas):
            return True
    return False

In [None]:
def corrected(tree: dict, rules: list, tokens: list, lemmas: list) -> dict:
    """Объединяет сегменты, удаляя отношение между ними, при положительном результате проверки на правила"""

    if conditions_failed(tree, rules, tokens, lemmas):
        return delete_relation(tree)
    else:
        return tree

In [None]:
def delete_relation(tree: dict) -> dict:
    """Удаляет отношение"""
    
    if tree.nuclearity == 'SN':
        attr = tree.left
        attr.relation = 'elementary'
        n = left_n_leaf(tree.right)
        if (n.start - attr.end) > 1:
            n_copy = copy.deepcopy(n)
            n.right = n_copy 
            n.left = attr
            n.relation = 'same-unit'
        else:
            n.start = attr.start
            n.text = attr.text + n.text
        return tree.right
            
    else:
        attr = tree.right
        attr.relation = 'elementary'
        n = right_n_leaf(tree.left)
        if (attr.start - n.end) > 1:
            n_copy = copy.deepcopy(n)
            n.left = n_copy
            n.right = attr
            n.relation = 'same-unit'
        else:           
            n.end = attr.end
            n.text += attr.text
        return tree.left

In [None]:
def left_n_leaf(tree: dict) -> dict:
    """Возвращает левую ветвь дерева"""

    if tree.left:
        if tree.nuclearity == 'SN':
            return left_n_leaf(tree.right)
        else:
            return left_n_leaf(tree.left)
    else:
        return tree

In [None]:
def right_n_leaf(tree: dict) -> dict:
  """Возвращает правую ветвь дерева"""

    if tree.left:
        if tree.nuclearity == 'NS':
            return right_n_leaf(tree.left)
        else:
            return right_n_leaf(tree.right)
    else:
        return tree

In [None]:
def segmentation(tree: dict, text: str, segments: list, rules: list, tokens: list, lemmas: list) -> None:
    """Пересегментирует деревья"""

    if tree.relation != 'elementary':
        tree.left = corrected(tree.left, rules, tokens, lemmas)
        tree.right = corrected(tree.right, rules, tokens, lemmas)
        segmentation(tree.left, text, segments, rules, tokens, lemmas)
        segmentation(tree.right, text, segments, rules, tokens, lemmas)
    else:
          segments.append(text[tree.start:tree.end])

In [None]:
def texts_segmentation(rules: list) -> list:
    """Возвращает сегменты"""

    segments =  []
    for key in results:
        tree = results[key]['rst'][0]
        text = results[key]['text']
        tokens = results[key]['tokens']
        lemmas = results[key]['lemma']
        lemmas = match_lemmas(tokens, lemmas)
        segmentation(tree, text, segments, rules, tokens, lemmas)
        #для текстов b021,b023 и b050 автосегментатотор почему-то вернул два дерева вместо одного, поэтому нужно приделать второе дерево(там одно эде)
        #очень костыльно конечно, но думаю так не должно быть в других текстах, какой-то сбой
        if len(results[key]['rst']) > 1:
            segments.append(results[key]['rst'][1].text)
        segments.append('\n')
        
    return segments

In [None]:
rules = [rule_1, rule_2, rule_3, rule_morph]

In [None]:
new_array=range(len(rules))
power_set=[[]]
for x in new_array:
    for i in range(len(power_set)):
        tmp_list = power_set[i].copy()
        tmp_list.append(x)
        power_set.append(tmp_list)
power_set = power_set[1:]

In [None]:
%%time
for s in power_set[:9]:
    current_rules = []
    for i in s:
        current_rules.append(rules[i])
    print(current_rules)
    filename_raw = str(current_rules).split()
    filename = []
    for word in filename_raw:
        if 'rule' in word:
            filename.append(word)
    filename = '__'.join(filename)
    filename += '.xlsx'
    segments = texts_segmentation(current_rules)
    print(filename + ' сегментирован')
    exls = pd.DataFrame(segments)
    exls.to_excel(filename, index = False)
    print(filename + ' записан')

[<function rule_1 at 0x7f3ff7f95320>]
rule_1.xlsx сегментирован
rule_1.xlsx записан
[<function rule_2 at 0x7f3ff7f95200>]
rule_2.xlsx сегментирован
rule_2.xlsx записан
[<function rule_1 at 0x7f3ff7f95320>, <function rule_2 at 0x7f3ff7f95200>]
rule_1__rule_2.xlsx сегментирован
rule_1__rule_2.xlsx записан
[<function rule_3 at 0x7f3ff7f95560>]
rule_3.xlsx сегментирован
rule_3.xlsx записан
[<function rule_1 at 0x7f3ff7f95320>, <function rule_3 at 0x7f3ff7f95560>]
rule_1__rule_3.xlsx сегментирован
rule_1__rule_3.xlsx записан
[<function rule_2 at 0x7f3ff7f95200>, <function rule_3 at 0x7f3ff7f95560>]
rule_2__rule_3.xlsx сегментирован
rule_2__rule_3.xlsx записан
[<function rule_1 at 0x7f3ff7f95320>, <function rule_2 at 0x7f3ff7f95200>, <function rule_3 at 0x7f3ff7f95560>]
rule_1__rule_2__rule_3.xlsx сегментирован
rule_1__rule_2__rule_3.xlsx записан
[<function rule_morph at 0x7f4002b8d830>]
rule_morph.xlsx сегментирован
rule_morph.xlsx записан
[<function rule_1 at 0x7f3ff7f95320>, <function rul