In [11]:
import datetime
import os
import string

import nltk
import pandas as pd

from nltk import BigramCollocationFinder, word_tokenize, pos_tag
from nltk.corpus.reader import BNCCorpusReader
from nltk.collocations import BigramAssocMeasures

In [2]:
# download perceptron PoS tagger
nltk.download('averaged_perceptron_tagger')

[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /home/netherwulf/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!


True

In [3]:
# read British National Corpus

bnc_texts_dir = os.path.join('..', 'storage', 'bnc', 'raw_data', 'BNC', 'Texts')

bnc_reader = BNCCorpusReader(root=bnc_texts_dir, fileids=r'[A-K]/\w*/\w*\.xml')

In [4]:
# import bigram association measures

bigram_measures = BigramAssocMeasures()

measure_dict = {'pmi' : bigram_measures.pmi,
                'dice' : bigram_measures.dice,
                'chi2' : bigram_measures.chi_sq}

In [5]:
# initialize finder and preprocess BNC corpus

# treat each sentence as another document (find MWEs inside sentences)
finder = BigramCollocationFinder.from_documents(bnc_reader.sents())

# treat whole corpus as a long sentence (find MWEs inside and BETWEEN sentences)
# finder = BigramCollocationFinder.from_words(bnc_reader.words())


In [6]:
def get_curr_time():
    return f'{datetime.datetime.now().strftime("%H:%M:%S")}'

In [7]:
def get_pos_tag(nltk_pos_tag):
    if nltk_pos_tag[0] == 'J':
        return 'ADJ'
        
    if nltk_pos_tag[0] == 'N':
        return 'NOUN'

    if nltk_pos_tag[0] == 'V':
        return 'VERB'

In [8]:
# check word correctness
def check_word_correctness(word) -> bool:
    if len(word) < 1:
        return False
    # print(f'word: {word}',
    #       f'pos_tag: {pos_tag([word])}',
    #       f'pos_tag([word])[0][1][0]: {pos_tag([word])[0][1][0]}',
    #       sep='\n')

    if any(char in string.punctuation for char in word):
        return False

    if any(char in ['`', '~', '‘', '—', '\'', '’'] for char in word):
        return False

    if any(char.isdigit() for char in word):
        return False

    if word[0].isupper():
        return False

    if pos_tag([word])[0][1][0] not in ['J', 'N', 'V']:
        return False

    return True

In [9]:
# get number of occurrences for MWE
def get_mwe_freq(mwe, freq_list):
    # mwe_tuple = [mwe_tuple for mwe_tuple in freq_list if mwe_tuple[0] == mwe]
    return freq_list[mwe]

In [10]:
# save list of measures to the TSV file
def save_mwe_list(mwe_list, measure_name, dataset_name, dataset_dir):
    out_filepath = os.path.join(dataset_dir, f'{dataset_name}_{measure_name}_incorrect_mwe.tsv')

    with open(out_filepath, 'w') as out_file:
        out_file.write('\t'.join(['first_word_tag', 'second_word_tag', 'first_word', 'second_word', 'measure_value', 'frequency']) + '\n')

        for mwe_tuple in mwe_list:
            out_file.write('\t'.join(mwe_tuple) + '\n')

In [12]:
# get cleaned list of MWEs for list of measures

dataset_name = 'bnc'
dataset_dir = os.path.join('..', 'storage', dataset_name, 'preprocessed_data')

for measure_name in measure_dict.keys():
    if measure_name != 'pmi':
        continue

    print(f'{get_curr_time()} : Generating incorrect MWEs list for {measure_name}')

    # get list of MWEs with spoecified measure
    desc_mwe_list = finder.score_ngrams(measure_dict[measure_name])

    # get frequencies of MWE
    freq_mwe_list = {k: v for k, v in finder.ngram_fd.items()}

    # clean list of MWEs
    # asc_mwe_list_cleaned = [mwe_tuple for mwe_tuple in desc_mwe_list[::-1] if all([check_word_correctness(mwe_word) for mwe_word in mwe_tuple[0]]) and mwe_tuple[1] > 1]
    asc_mwe_list_cleaned = [mwe_tuple for mwe_tuple in desc_mwe_list[::-1] if all([check_word_correctness(mwe_word) for mwe_word in mwe_tuple[0]])]

    # get last values from the end of the 3rd quartile
    asc_mwe_list_cleaned = asc_mwe_list_cleaned[int(0.75 * len(asc_mwe_list_cleaned)) - 100000:int(0.75 * len(asc_mwe_list_cleaned))]

    # get list with MWE, measure value and frequency
    mwe_with_freq = [[get_pos_tag(pos_tag([mwe_tuple[0][0]])[0][1]), get_pos_tag(pos_tag([mwe_tuple[0][1]])[0][1]), mwe_tuple[0][0], mwe_tuple[0][1], str(mwe_tuple[1]), str(get_mwe_freq(mwe_tuple[0], freq_mwe_list))] for mwe_tuple in asc_mwe_list_cleaned]

    # save dataset
    save_mwe_list(mwe_with_freq, measure_name, dataset_name, dataset_dir)

22:20:20 : Generating incorrect MWEs list for pmi


In [28]:
# get cleaned list of MWEs for single measure

measure_name = 'pmi'

dataset_name = 'bnc'

dataset_dir = os.path.join('..', 'storage', dataset_name, 'preprocessed_data')

print(f'{get_curr_time()} : Generating incorrect MWEs list for {measure_name}')

# get list of MWEs with spoecified measure
desc_mwe_list = finder.score_ngrams(measure_dict[measure_name])

# get frequencies of MWE
freq_mwe_list = {k: v for k, v in finder.ngram_fd.items()}

# clean list of MWEs
asc_mwe_list_cleaned = [mwe_tuple for mwe_tuple in desc_mwe_list[::-1] if all([check_word_correctness(mwe_word) for mwe_word in mwe_tuple[0]]) and mwe_tuple[1] > 1]
# asc_mwe_list_cleaned = [mwe_tuple for mwe_tuple in desc_mwe_list[::-1] if all([check_word_correctness(mwe_word) for mwe_word in mwe_tuple[0]])]

# get only first 100,000 MWEs with lowest measure value
asc_mwe_list_cleaned = asc_mwe_list_cleaned[:100000]

# get last values from the end of the 3rd quartile
# asc_mwe_list_cleaned = asc_mwe_list_cleaned[int(0.75 * len(asc_mwe_list_cleaned)) - 100000:int(0.75 * len(asc_mwe_list_cleaned))]

# get list with MWE, measure value and frequency
mwe_with_freq = [[get_pos_tag(pos_tag([mwe_tuple[0][0]])[0][1]), get_pos_tag(pos_tag([mwe_tuple[0][1]])[0][1]), mwe_tuple[0][0], mwe_tuple[0][1], str(mwe_tuple[1]), str(get_mwe_freq(mwe_tuple[0], freq_mwe_list))] for mwe_tuple in asc_mwe_list_cleaned]

# save dataset
save_mwe_list(mwe_with_freq, f'{measure_name}_greater_than_1', dataset_name, dataset_dir)

01:47:36 : Generating incorrect MWEs list for pmi


In [None]:
def save_sentence_df(sentence_df, filepath):
    with open(filepath, 'w', encoding='utf8') as f_out:
        for row_idx in range(len(sentence_df)):
            f_out.write('\t'.join(sentence_df.loc[row_idx, :].values.tolist()))

In [37]:
# get sentences containing MWEs from the list

# load MWE lists
incorr_lists_dir = os.path.join('..', 'storage', 'bnc', 'preprocessed_data')

pmi_incorr_list_filename = 'bnc_pmi_greater_than_1_incorrect_mwe.tsv'
dice_incorr_list_filename = 'bnc_dice_end_of_3rd_quatile_incorrect_mwe.tsv'
chi2_incorr_list_filename = 'bnc_chi2_end_of_3rd_quartile_incorrect_mwe.tsv'

incorr_list_path = os.path.join(incorr_lists_dir, pmi_incorr_list_filename)

incorr_mwe_df = pd.read_csv(incorr_list_path, sep = '\t')

incorr_mwe_list = [[str(first_word), str(second_word)] for first_word, second_word in zip(incorr_mwe_df['first_word'].tolist(), 
                                                                                          incorr_mwe_df['second_word'].tolist()) if first_word != '’' and second_word != '’']
sent_df = pd.DataFrame(columns = ['first_word_tag', 'second_word_tag',
                                  'first_word', 'second_word', 
                                  'first_word_id', 'second_word_id', 
                                  'sentence'])
sent_idx = 0
for sentence in bnc_reader.sents():

    for mwe_ind, incorr_mwe in enumerate(incorr_mwe_list):
        
        if incorr_mwe[0] in sentence:
            first_word_id = sentence.index(incorr_mwe[0])

            if first_word_id < (len(sentence) - 1) and sentence[first_word_id + 1] == incorr_mwe[1]:

                mwe_row = incorr_mwe_df.iloc[[mwe_ind]].values.tolist()[0]

                sent_df = sent_df.append({'first_word_tag': mwe_row[0],
                                          'second_word_tag': mwe_row[1],
                                          'first_word': mwe_row[2],
                                          'second_word': mwe_row[3],
                                          'first_word_id': str(sentence.index(incorr_mwe[0])),
                                          'second_word_id': str(sentence.index(incorr_mwe[1])),
                                          'sentence': ' '.join(sentence)},
                                          ignore_index=True)
    if sent_idx % 10000 == 0 and sent_idx > 0:
        print(f'{get_curr_time()} : Processed {sent_idx + 1} sentences')
    sent_idx +=1

sent_df

In [None]:
# save dataframe to tsv
measure_name = 'pmi'
sent_df_out_filepath = os.path.join('..', 'storage', 'bnc', 'preprocessed_data', f'{measure_name}_incorrect_sentence_list.tsv')

save_sentence_df(sent_df, sent_df_out_filepath)

12374827