In [None]:
import pandas as pd
import numpy as np
from sklearn.metrics import classification_report
import math
import matplotlib.pyplot as plt
from tqdm import tqdm
import Levenshtein

import os


In [None]:
from camel_tools.morphology.database import MorphologyDB
from camel_tools.morphology.analyzer import Analyzer
from camel_tools.tokenizers.word import simple_word_tokenize
from camel_tools.disambig.mle import MLEDisambiguator
from camel_tools.disambig.bert import BERTUnfactoredDisambiguator
from pathlib import Path
S31_DB_PATH = Path('../../data/disambig_db/calima-msa-s31.db')
S31_DB = MorphologyDB(S31_DB_PATH, 'a')
S31_AN = Analyzer(S31_DB, 'NOAN_ALL', cache_size=100000)
bert_disambig = BERTUnfactoredDisambiguator.pretrained('msa', top=1000, pretrained_cache = False)
bert_disambig._analyzer = S31_AN

In [None]:
frag_train = pd.read_csv('../../data/all_train_aligned.csv')
frag_dev = pd.read_csv('../../data/all_dev_aligned.csv')
frag_test = pd.read_csv('../../data/all_test_aligned.csv')


frag_train = frag_train[frag_train.apply(lambda x: type(x['0']) == str, axis = 1)]
frag_dev = frag_dev[frag_dev.apply(lambda x: type(x['0']) == str, axis = 1)]
frag_test = frag_test[frag_test.apply(lambda x: type(x['0']) == str, axis = 1)]

Import lexicon DB and readability level pipeline


In [None]:
import pickle
with open('../../data/lemma_db/quick_lemma_lookup.pkl', 'rb') as f:
    lemma_db = pickle.load(f)

def get_readability_level(analysis, token, oov_management, default_level = 0):
  lex = analysis['lex']
  pos = analysis['pos']
  result = lemma_db.get(lex)
  if result:
    if len(result) == 1:
      rl = result[0]['level']
    else:
      most_similar_element = None
      max_similarity = -1

      for element in result:
          similarity = Levenshtein.ratio(pos, element['pos'])
          if similarity > max_similarity:
              max_similarity = similarity
              most_similar_element = element

      rl = most_similar_element['level']
  else:
    rl = oov_management(token, default_level, analysis)
  return rl

Analysis tiebreaking

In [None]:
def sort_score(list_of_analyses):
  list_of_analyses.sort(key = lambda x: x.score, reverse = True)
  highest_score = list_of_analyses[0].score
  analyses_with_equal_score = [x for x in list_of_analyses
                                if x.score == highest_score]
  return analyses_with_equal_score


def sort_lexlogprob(list_of_analyses):
  list_of_analyses.sort(key = lambda x: x.analysis['lex_logprob'], reverse=True)
  highest_prob = list_of_analyses[0].analysis['lex_logprob']
  analyses_with_equal_prob = [x for x in list_of_analyses
                                if x.analysis['lex_logprob'] == highest_prob]
  return analyses_with_equal_prob

def sort_level(list_of_analyses):
  list_of_analyses.sort(key = lambda x: get_readability_level(x.analysis, '', default_level_oov, 9999))
  lowest_rl = get_readability_level(list_of_analyses[0].analysis, '', default_level_oov, 9999)
  analyses_with_equal_level = [x for x in list_of_analyses
                                    if get_readability_level(x.analysis, '', default_level_oov, 9999) == lowest_rl]
  return analyses_with_equal_level


def score_then_llp(list_of_analyses):
  list_of_analyses = sort_score(list_of_analyses)
  if len(list_of_analyses) == 1:
    return list_of_analyses[0].analysis

  list_of_analyses = sort_lexlogprob(list_of_analyses)
  return list_of_analyses[0].analysis


def score_then_level(list_of_analyses):
  list_of_analyses = sort_score(list_of_analyses)
  if len(list_of_analyses) == 1:
    return list_of_analyses[0].analysis

  list_of_analyses = sort_level(list_of_analyses)
  return list_of_analyses[0].analysis

def score_then_llp_then_level(list_of_analyses):
  list_of_analyses = sort_score(list_of_analyses)
  if len(list_of_analyses) == 1:
    return list_of_analyses[0].analysis

  list_of_analyses = sort_lexlogprob(list_of_analyses)
  if len(list_of_analyses) == 1:
    return list_of_analyses[0].analysis

  list_of_analyses = sort_level(list_of_analyses)
  return list_of_analyses[0].analysis


def score_then_level_then_llp(list_of_analyses):
  list_of_analyses = sort_score(list_of_analyses)
  if len(list_of_analyses) == 1:
    return list_of_analyses[0].analysis

  list_of_analyses = sort_level(list_of_analyses)
  if len(list_of_analyses) == 1:
    return list_of_analyses[0].analysis

  list_of_analyses = sort_lexlogprob(list_of_analyses)
  return list_of_analyses[0].analysis




OOV Management

In [None]:
with open('../../data/levels_db/mle_max_aligned_model.pkl', 'rb') as f:
  mle_model = pickle.load(f)

with open('../../data/freq/freq_token_db.pkl', 'rb') as f:
  freq_backoff = pickle.load(f)

def default_level_oov(word, level, analysis):
  return level

def default_level_nounprop(word, level, analysis):
  if analysis['pos'] == 'NOUN_PROP':
    return level
  else:
    return 5






Inference pipeline

In [None]:
def lexicon_levels_pipeline(fragment, tiebreak, oov_management):
  tokens = [t.split('#')[0] for t in fragment.split(' ')]
  analyses = [token.analyses for token in bert_disambig.disambiguate(tokens)]
  picked_lexs = [tiebreak(analysis) for analysis in analyses]
  levels = [
      get_readability_level(analysis, token, oov_management) for analysis, token in zip(picked_lexs, tokens)
  ]
  levels = [a if a > 3 else 3 for a in levels]
  return levels

In [None]:
def get_gt_levels(fragment):
  return [int(t.split('#')[1]) for t in fragment.split(' ')]

gt_levels = np.concatenate([get_gt_levels(f) for f in frag_test['0']])

In [None]:
all_exps = [
    np.concatenate([lexicon_levels_pipeline(fragment, score_then_llp, default_level_oov) for fragment in (frag_test['0'])]),
    np.concatenate([lexicon_levels_pipeline(fragment, score_then_level, default_level_oov) for fragment in (frag_test['0'])]),
    np.concatenate([lexicon_levels_pipeline(fragment, score_then_llp_then_level, default_level_oov) for fragment in (frag_test['0'])]),
    np.concatenate([lexicon_levels_pipeline(fragment, score_then_level_then_llp, default_level_oov) for fragment in (frag_test['0'])]),
]

def results_to_csv(result_arr):
  all_rows = []
  for resu in result_arr:
    inv_report = classification_report(gt_levels, resu, output_dict = True)

    arr_inv = np.concatenate([[inv_report[x]['f1-score'],
            inv_report[x]['precision'],
            inv_report[x]['recall'],] for x in ['3', '4', '5']])
    arr_inv = np.append(arr_inv, inv_report['accuracy'])
    arr_inv = np.append(arr_inv, inv_report['macro avg']['f1-score'])



    all_rows.append(arr_inv)

  return all_rows

pd.DataFrame(results_to_csv(res), columns= ['f1_3','3_prec','3_recall','f1_4','4_prec','4_recall','f1_5','5_prec','5_recall','accuracy','f1_macro'])
