In [1]:
import csv
import pandas as pd
import time
import sklearn
import random
import sqlite3
from estnltk import Text
from estnltk_neural.taggers import StanzaSyntaxTagger
from estnltk.taggers import NerTagger
import json
from estnltk.converters import text_to_json

In [2]:
from estnltk import Layer
from estnltk.taggers import Tagger
from collections import defaultdict
from collections import OrderedDict
import copy

PhrasePatternTagger without NER patterns in ruleset.

In [3]:
def get_ner(ner_layer, word_layer, span):
        nertag = None
        if len(ner_layer) > 0:
            word = word_layer.get(span)
            for n in ner_layer:
                for part in n:
                    if part==word:
                        nertag=n.nertag
        if nertag:
            return nertag
        return 'OTHER'
    
def get_POS(word_layer, span):
    infinite_verb_forms = ['da', 'des', 'ma', 'maks', 'mas', 'mast', 'mata', 'nud', 'tav', 'tud', 'v']
    # if POS is ambiguous, only unique tags are kept, e.g. ['V', 'A', 'A'] -> ['V', 'A']
    pos_list = []
    word = word_layer.get(span)
    for i in range(len(word.morph_analysis['partofspeech'])):
        if word.morph_analysis['partofspeech'][i] == 'V':
            if word.morph_analysis['form'][i] in infinite_verb_forms:
                pos_list.append('V_inf')
            elif word.form[i] == 'neg':
                pos_list.append('V_neg')
            else:
                pos_list.append('V_fin')
        else:
            pos_list.append(word.morph_analysis['partofspeech'][i])
    
    if len(pos_list) > 1:
        char_unique = [char for indx, char in enumerate(pos_list) if char not in pos_list[:indx]]
        if len(char_unique) < 2:
            return char_unique[0]
        return '|'.join(char_unique)
    return pos_list[0]

In [4]:
class PhrasePatternTagger2(Tagger):
    """Tags phrases that match given syntax and part-of-speech pattern rules, and their corresponding patterns.""" 
    
    conf_param = ['rules_file', 'ruleset_map']
    
    def __init__(self, rules_file: str,
                       output_layer='phrase_patterns',
                       morph_analysis_layer='morph_analysis',
                       words_layer='words',
                       syntax_layer='stanza_syntax',
                       ner_layer='ner'):
        
        self.input_layers = [morph_analysis_layer, words_layer, syntax_layer, ner_layer]
        self.output_layer = output_layer
        self.output_attributes = ['extraction_pattern', 'ner_pattern', 'pattern_id', 'score', 'phrase_pattern_id', 'phrase_class']
        self.rules_file = rules_file

        ruleset_map = defaultdict(list)
        
        with open(rules_file, encoding='UTF-8') as csv_file:
            reader = csv.DictReader(csv_file)
            for row in reader:
                info = [row['ID'], row['POS_pattern']]
                ruleset_map[row['tree']].append(info)
                
        self.ruleset_map = ruleset_map

    def _make_layer_template(self):
        layer = Layer(name=self.output_layer,
                      text_object=None,
                      attributes=self.output_attributes,
                      enveloping=self.input_layers[1],
                      ambiguous=True)
        return layer
        
    def _make_layer(self, text, layers, status):
        layer = self._make_layer_template()
        layer.text_object = text
        
        for i in range(len(layers[self.input_layers[2]])): # Iterate over 'stanza_syntax' layer
            pattern_spans = []    
            ids = []

            pattern_spans.append(layers[self.input_layers[2]][i])
            ids.append([layers[self.input_layers[2]][i]['id'], layers[self.input_layers[2]][i]['head']])
                
            for j in range(i + 1, len(layers[self.input_layers[2]])):
                tree = []
                pos = []
                ner = []
                for k in range(len(pattern_spans)):
                    if layers[self.input_layers[2]][j] not in pattern_spans:
                        if layers[self.input_layers[2]][j] in pattern_spans[k]['children'] or pattern_spans[k] in layers[self.input_layers[2]][j]['children'] or layers[self.input_layers[2]][j]['parent_span'] != None and layers[self.input_layers[2]][j]['parent_span'] == pattern_spans[k]['parent_span']:
                            pattern_spans.append(layers[self.input_layers[2]][j])
                            ids.append([layers[self.input_layers[2]][j]['id'], layers[self.input_layers[2]][j]['head']])
                
                # fixing word and head ID values
                ids_for_pattern = copy.deepcopy(ids)
                for k in range(len(ids_for_pattern)):
                    temp = ids_for_pattern[k][0]
                    ids_for_pattern[k][0] = k+1
                    for l in range(len(ids)):
                        if ids[l][1] == temp:
                            ids_for_pattern[l][1] = ids_for_pattern[k][0]
            
                word_ids = [word_id[0] for word_id in ids_for_pattern]
                for k in range(len(ids_for_pattern)):
                    if ids_for_pattern[k][0] == ids_for_pattern[k][1]:
                        ids_for_pattern[k][1] = 0
                    elif ids_for_pattern[k][1] not in word_ids:
                        ids_for_pattern[k][1] = 0
                
                # finding the root of current pattern and setting its deprel value as such
                for k in range(len(pattern_spans)):
                    deprel = pattern_spans[k].deprel
                    if ids_for_pattern[k][1] == 0 and deprel != 'root':
                        deprel = 'root'
                    tree.append([str(ids_for_pattern[k][0]), str(ids_for_pattern[k][1]), deprel])
                    # POS-tag is taken from morph_analysis layer
                    pos.append(get_POS(layers[self.input_layers[1]], pattern_spans[k]))
                    # nertag is taken from ner layer
                    ner.append(get_ner(layers[self.input_layers[-1]], layers[self.input_layers[1]], pattern_spans[k]))                     
                    
                pattern = [" ".join(word_info) for word_info in tree]
                #print(pattern)
                # check if tree pattern exists in ruleset map
                if ",".join(pattern) in self.ruleset_map.keys():
                    #print(pattern, 'yes')
                    pos_pattern = "-".join(pos)
                    ner_pattern = "-".join(ner)
                    # check if POS-sequence and NER-sequence exist in ruleset map with given tree pattern
                    for el in self.ruleset_map[",".join(pattern)]:
                        #print(el[1], pos_pattern, el[2], ner_pattern)
                        if el[1] == pos_pattern:
                            #print(pattern, 'yesyes')
                            # add annotation
                            layer.add_annotation([span.base_span for span in pattern_spans], 
                                                 extraction_pattern=",".join([",".join(pattern), pos_pattern]),
                                                 ner_pattern=ner_pattern,
                                                 pattern_id=el[0],
                                                 score=None,
                                                 phrase_pattern_id=None,
                                                 phrase_class=None)         
                
        return layer
    

In [5]:
pattern_tagger = PhrasePatternTagger2(rules_file='indicator_patterns_ner_tree_pos_updated.csv')
pattern_tagger

name,output layer,output attributes,input layers
PhrasePatternTagger2,phrase_patterns,"('extraction_pattern', 'ner_pattern', 'pattern_id', 'score', 'phrase_pattern_id', 'phrase_class')","('morph_analysis', 'words', 'stanza_syntax', 'ner')"

0,1
rules_file,indicator_patterns_ner_tree_pos_updated.csv
ruleset_map,"defaultdict(<class 'list'>, {'string': [['int64', 'string']], '1 2 nmod,2 0 root ..., type: <class 'collections.defaultdict'>, length: 7"


In [6]:
stanza_tagger = StanzaSyntaxTagger(input_type='morph_analysis', input_morph_layer='morph_analysis',
                                   add_parent_and_children=True)

Downloading resources index: 20.1kB [00:00, ?B/s]


In [7]:
ner_tagger = NerTagger()

In [8]:
con = sqlite3.connect('correct_noun_phrases.db')

In [11]:
cur = con.cursor()
cur.execute("SELECT * FROM correct_phrase_patterns")
rows = cur.fetchall()

In [12]:
tagged_phrases = defaultdict(list)

random.shuffle(rows)

start = time.time()
tagged_total = 0

for row in rows:
    text = Text(row[8]).tag_layer('morph_analysis')
    ner_tagger.tag(text)
    stanza_tagger.tag(text)
    pattern_tagger.tag(text)
    if len(text.phrase_patterns) > 0:
        tagged_total+=len(text.phrase_patterns)
        for pattern in text.phrase_patterns:
            # at the moment, up to 100 samples for each pattern are kept and later saved in database
            if len(tagged_phrases[pattern['extraction_pattern'][0]]) < 100:
                tagged_phrases[pattern['extraction_pattern'][0]].append(text)
                
print(f"{len(rows)} fraasi märgendamiseks kulus PhrasePatternTaggeril {time.time()-start} sekundit")
print(f"Tagger leidis {tagged_total} fraasi")

21492 fraasi märgendamiseks kulus PhrasePatternTaggeril 807.0359604358673 sekundit
Tagger leidis 21770 fraasi


In [13]:
con.close()

In [14]:
con = sqlite3.connect("tagged_noun_phrases2.db")
cur = con.cursor()
cur.execute('pragma encoding=UTF8')
cur.execute("CREATE TABLE tagged_phrases(ID INTEGER PRIMARY KEY, extraction_pattern TEXT, ner_pattern TEXT, pattern_id INTEGER, raw_text TEXT, parent_phrase TEXT)")

<sqlite3.Cursor at 0x1f3f24f54c0>

In [15]:
for el in tagged_phrases:
    for i in range(len(tagged_phrases[el])):
        phrase_json = text_to_json(tagged_phrases[el][i])
        for phrase in tagged_phrases[el][i].phrase_patterns:
            p_lemmas = []
            for span in phrase:
                morph_word = tagged_phrases[el][i].morph_analysis.get(span)
                # first lemma is always chosen
                p_lemmas.append(morph_word.lemma[0])
            raw_text = ' '.join([l for l in p_lemmas])
            cur.execute("""INSERT INTO tagged_phrases
                                    (extraction_pattern, ner_pattern, pattern_id, raw_text, parent_phrase)
                                    VALUES (?, ?, ?, ?, ?);""", (phrase['extraction_pattern'][0], phrase['ner_pattern'][0], phrase['pattern_id'][0], raw_text, phrase_json))
    
            con.commit()

con.close()