In [None]:
import pandas as pd
import sklearn_crfsuite, re
import numpy as np
import importlib, os
import logging, math
import json, nltk
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

In [None]:
def ngrams(tokens, gram_len=2):
    out = []
    for i in range(len(tokens)-gram_len+1):
        new_token = ' '.join(tokens[i:(i+gram_len)])
        out.append(new_token.strip())
    return out

def clean_tokens(tokens, to_replace='[^\w ]+'):
    tokens = [re.sub(to_replace, ' ', token) for token in tokens]
    return tokens

def tokenize(mystr, is_char=False):
    return mystr.split() if is_char is False else list(mystr)

In [None]:
def get_tokens(sentence, min_ngram=1, max_ngram=1, to_replace='[^\w ]+', is_char=False):
    # print("============================")
    # print("sentence before ",sentence)
    if is_char is False:
        sentence = re.sub('<[^<]+?>', ' ', sentence)
        sentence = re.sub(to_replace, '', sentence)
    # print("sentence after ",sentence)
    tokens = clean_tokens(tokenize(sentence), to_replace) if is_char is False else tokenize(sentence, is_char)
    # print("tokens before ",tokens)
    tokens = [token.strip() for token in tokens] if is_char is False else [token for token in tokens]
    # print("tokens after ",tokens)

    n_grams = []
    for gram_len in range(min_ngram, max_ngram+1):
        n_grams += ngrams(tokens, gram_len)
    # print("n_grams ",n_grams)
    # print("===================")
    return n_grams

In [None]:
def word2features(sent, i):
    word = sent[i][0]
    postag = sent[i][1]

    features = {
        'bias': 1.0,
        'word.lower()': word.lower(),
        'word[-3:]': word[-3:],
        'word[-2:]': word[-2:],
        'word.isupper()': word.isupper(),
        'word.istitle()': word.istitle(),
        'word.isdigit()': word.isdigit(),
        'postag': postag,
        'postag[:2]': postag[:2],
    }
    if i > 0:
        word1 = sent[i-1][0]
        postag1 = sent[i-1][1]
        features.update({
            '-1:word.lower()': word1.lower(),
            '-1:word.istitle()': word1.istitle(),
            '-1:word.isupper()': word1.isupper(),
            '-1:postag': postag1,
            '-1:postag[:2]': postag1[:2],
        })
    else:
        features['BOS'] = True

    if i < len(sent)-1:
        word1 = sent[i+1][0]
        postag1 = sent[i+1][1]
        features.update({
            '+1:word.lower()': word1.lower(),
            '+1:word.istitle()': word1.istitle(),
            '+1:word.isupper()': word1.isupper(),
            '+1:postag': postag1,
            '+1:postag[:2]': postag1[:2],
        })
    else:
        features['EOS'] = True

    return features

In [None]:
def sent2features(sent):
    return [word2features(sent, i) for i in range(len(sent))]

def sent2labels(sent):
    return [label for token, postag, label in sent]

def sent2tokens(sent):
    return [token for token, postag, label in sent]

In [None]:
print("Reading data...")
df = pd.read_csv('data/data.csv')

text = list(df.text.apply(str))

labels = [str(x).strip().split('__') for x in df.raw_label]
print("df head ",df.head())

In [None]:
print("Creating BIOE tags...")
output_words, output_chars = [], []
for i in range(len(text)):
    corpus = text[i].encode("ascii", "ignore")
    corpus = corpus.decode()
    label = labels[i]
    
    word_tokens = get_tokens(corpus, min_ngram=1, max_ngram=1)
    char_tokens = get_tokens(corpus, min_ngram=1, max_ngram=1, is_char=True)
    # print("word_tokens ",word_tokens," char_tokens ",char_tokens)
    
    word_tokens = [w for w in word_tokens if len(w) > 0]
    char_tokens = [w for w in char_tokens if len(w) > 0]
    
    word_pos_tags = [y for x, y in nltk.pos_tag(word_tokens)]
    char_pos_tags = ['UNK']*len(char_tokens)
    # print("word_pos_tags ",word_pos_tags," char_pos_tags ",char_pos_tags)
    
    word_ner_tags = ['O']*len(word_tokens)
    char_ner_tags = ['O']*len(char_tokens)
    
    word_tokens_lc = [w.lower() for w in word_tokens]
    char_tokens_lc = [w.lower() for w in char_tokens]
    
    for j in range(len(label)):
        label[j] = label[j].encode("ascii", "ignore")
        label[j] = label[j].decode()
        gpt_word_tokens = get_tokens(label[j], min_ngram=1, max_ngram=1)
        gpt_char_tokens = get_tokens(label[j], min_ngram=1, max_ngram=1, is_char=True)
        # print("gpt_word_tokens ",gpt_word_tokens," gpt_char_tokens ",gpt_char_tokens)

        gpt_word_tokens = [w for w in gpt_word_tokens if len(w) > 0]
        gpt_char_tokens = [w for w in gpt_char_tokens if len(w) > 0]
        
        gpt_word_tokens_lc = [w.lower() for w in gpt_word_tokens]
        gpt_char_tokens_lc = [w.lower() for w in gpt_char_tokens]
        
        n1, n2 = len(gpt_word_tokens), len(gpt_char_tokens)
            
        for k in range(len(word_tokens)-n1+1):
            if (word_tokens[k:k+n1] == gpt_word_tokens or word_tokens_lc[k:k+n1] == gpt_word_tokens_lc) and word_ner_tags[k][0] == 'O':
                if n1 == 1:
                    word_ner_tags[k] = 'S'
                elif n1 == 2:
                    word_ner_tags[k] = 'B'
                    word_ner_tags[k+1] = 'E'
                else:
                    word_ner_tags[k] = 'B'
                    word_ner_tags[k+n1-1] = 'E'
                    word_ner_tags[k+1:k+n1-1] = ['I']*(n1-2)
                    
        for k in range(len(char_tokens)-n2+1):
            if (char_tokens[k:k+n2] == gpt_char_tokens or char_tokens_lc[k:k+n2] == gpt_char_tokens_lc) and char_ner_tags[k][0] == 'O':
                if n2 == 1:
                    char_ner_tags[k] = 'S'
                elif n2 == 2:
                    char_ner_tags[k] = 'B'
                    char_ner_tags[k+1] = 'E'
                else:
                    char_ner_tags[k] = 'B'
                    char_ner_tags[k+n2-1] = 'E'
                    char_ner_tags[k+1:k+n2-1] = ['I']*(n2-2)
    
    q_words = zip(word_tokens, word_pos_tags, word_ner_tags)
    q_chars = zip(char_tokens, char_pos_tags, char_ner_tags)
    # print("q_words ",list(q_words)," q_chars ",list(q_chars))

    output_words.append(list(q_words))
    output_chars.append(list(q_chars))

print("Creating train test split...")
train_indices, valid_indices = train_test_split(range(len(output_words)), test_size=0.2, random_state=0)

train_sents_words = [output_words[x] for x in train_indices if labels[x][0] != 'nan']
valid_sents_words = [output_words[x] for x in valid_indices]

train_sents_chars = [output_chars[x] for x in train_indices if labels[x][0] != 'nan']
valid_sents_chars = [output_chars[x] for x in valid_indices]

print("Creating train test data...")
# print("train_sents_words ",train_sents_words[5])

X_train_words = [sent2features(s) for s in train_sents_words]
y_train_words = [sent2labels(s) for s in train_sents_words]

# print("X_train_words ",X_train_words[5]," y_train_words ",y_train_words[5])
X_train_chars = [sent2features(s) for s in train_sents_chars]
y_train_chars = [sent2labels(s) for s in train_sents_chars]

X_valid_words = [sent2features(s) for s in valid_sents_words]
y_valid_words = [sent2labels(s) for s in valid_sents_words]

X_valid_chars = [sent2features(s) for s in valid_sents_chars]
y_valid_chars = [sent2labels(s) for s in valid_sents_chars]

print("Building word model...")
word_crf = sklearn_crfsuite.CRF(algorithm='lbfgs', c1=0.1, c2=0.1, max_iterations=100, all_possible_transitions=True)
word_crf.fit(X_train_words, y_train_words)

print("Building char model...")
char_crf = sklearn_crfsuite.CRF(algorithm='lbfgs', c1=0.1, c2=0.1, max_iterations=100, all_possible_transitions=True)
char_crf.fit(X_train_chars, y_train_chars)


In [None]:
print("Doing predictions...")
predictions_words = word_crf.predict(X_valid_words)
predictions_chars = char_crf.predict(X_valid_chars)

print("Doing word level predictions...")
out_vals_words = []

for i in range(len(predictions_words)):
    curr_val, all_curr_val = [], []
    # print("predictions_words[i] ",predictions_words[i][0])
    for j in range(len(predictions_words[i])):
        if predictions_words[i][j][0] == 'B' or predictions_words[i][j][0] == 'S':
            if len(curr_val) > 0:
                all_curr_val.append(' '.join(curr_val))
            curr_val = [valid_sents_words[i][j][0]]
            
        elif predictions_words[i][j][0] == 'I' or predictions_words[i][j][0] == 'E':
            curr_val += [valid_sents_words[i][j][0]]
    
    if len(curr_val) > 0:
        all_curr_val.append(' '.join(curr_val))
    
    out_vals_words.append(all_curr_val)

print("Doing char level predictions...")
out_vals_chars = []

for i in range(len(predictions_chars)):
    curr_val, all_curr_val = [], []
    
    for j in range(len(predictions_chars[i])):
        # print("predictions_chars[i] ",predictions_chars[i][0])
        if predictions_chars[i][j][0] == 'B' or predictions_chars[i][j][0] == 'S':
            if len(curr_val) > 0:
                all_curr_val.append(''.join(curr_val))
            curr_val = [valid_sents_chars[i][j][0]]
            
        elif predictions_chars[i][j][0] == 'I' or predictions_chars[i][j][0] == 'E':
            curr_val += [valid_sents_chars[i][j][0]]
    
    if len(curr_val) > 0:
        all_curr_val.append(''.join(curr_val))
    
    out_vals_chars.append(all_curr_val)

print("Merging predictions...")
out_vals = []
for i in range(len(out_vals_words)):
    if out_vals_words[i] == 'None':
        out_vals.append(out_vals_chars[i])
    else:
        out_vals.append(out_vals_words[i])

pred_labels = []
for i in out_vals:
    if len(i)>0:
        pred_labels.append(i[0])
    else:
        pred_labels.append('None')

In [None]:
norm_labels = [str(x).strip().split('__') for x in df.raw_label]
norm_labels = [norm_labels[x] for x in valid_indices]
true_labels = [x[0].encode("ascii", "ignore").decode() for x in norm_labels]

print(classification_report(true_labels, pred_labels))