Basic QA system

In [None]:
# load the files
import json
documents_set = json.load(open('documents.json'))
training_set = json.load(open('training.json'))
devel_set = json.load(open('devel.json'))
testing_set = json.load(open('testing.json'))

<b> 1. Information Retrieval to find the best matching sentence </b>

In [None]:
import nltk
import re
from math import log
from nltk.stem import WordNetLemmatizer
from nltk import sent_tokenize, word_tokenize
from collections import defaultdict, Counter
from nltk.corpus import stopwords
nltk.download("wordnet") 
nltk.download('punkt')

stopwords = set(nltk.corpus.stopwords.words('english')) 
stemmer = nltk.stem.PorterStemmer()
lmtzr = nltk.stem.wordnet.WordNetLemmatizer()


# Preprocessing: lowercase, remove stopwords, stem the words and form words representation 
def preprocess(doc):
    preprocessed_doc = []
    doc = re.sub(r'[^\w\s]', '', doc) # remove punctuations
    tokens = word_tokenize(doc)
    for token in tokens:
        if token.lower() not in stopwords: #remove stopwords
            preprocessed_doc.append(stemmer.stem(token.lower())) # stem the word
    return preprocessed_doc        

# collect term frequencies for each sentence (a bag of words)
def extract_term_freqs(sentence):
    tfs = Counter()
    preprocessed_sent = preprocess(sentence)
    for token in preprocessed_sent:
        tfs[token] += 1
    return tfs

# compute document frequencies(here refers to a term occurs in how many sentences within a document)
def compute_doc_freqs(doc_term_freqs):
    dfs = Counter()
    for tfs in doc_term_freqs.values():
        for term in tfs.keys():
            dfs[term] += 1
    return dfs

# process documents_set into sentences
def sent_tokenize(documents_set):
    documents_set_sents = {}
    for docid in range(len(documents_set)):
        doc = documents_set[docid]['text']
        sentences = []
        for para in doc:
            sentences += nltk.sent_tokenize(para)
        documents_set_sents[docid] = sentences
    return documents_set_sents
documents_set_sents = sent_tokenize(documents_set)


# process the document_set into term frequencies
def get_term_frequencies(documents_set_sents):
    doc_term_freqs = {}
    for docid in range(len(documents_set_sents)):
        doc = documents_set_sents[docid]
        sent_term_freqs = {}
        for sent_id in range(len(doc)):
            term_freqs = extract_term_freqs(doc[sent_id])
            sent_term_freqs[sent_id] = term_freqs
        doc_term_freqs[docid] = sent_term_freqs
    return doc_term_freqs
doc_term_freqs = get_term_frequencies(documents_set_sents)


# process the document_set into document frequencies
def get_doc_freqs(doc_term_freqs):
    doc_freqs = {}
    for docid in doc_term_freqs.keys():
        sent_freqs = compute_doc_freqs(doc_term_freqs[docid])
        doc_freqs[docid] = sent_freqs
    return doc_freqs
doc_freqs = get_doc_freqs(doc_term_freqs)


# build an inverted index to allow for efficient lookup by term
def inverted_index(doc_term_freqs):
    inverted_index_dict = {}
    for docid in doc_term_freqs.keys():
        inverted_index = defaultdict(list)
        # note the inversion of the indexing, to be term -> (sent_id, tf)
        for sent_id, term_freqs in doc_term_freqs[docid].items():
            for term in term_freqs.keys():
                inverted_index[term].append([sent_id, term_freqs[term]])
        inverted_index_dict[docid] = inverted_index
    return inverted_index_dict
inverted_index_dict = inverted_index(doc_term_freqs)


# Store the number of tokens in each sentence of a document
def get_token_num(documents_set_sents):
    token_num_dict = {}
    for docid in range(len(documents_set_sents)):
        sent_length_dict = {}
        doc = documents_set_sents[docid]
        for sent_id in range(len(doc)):
            preprocessed_sent = preprocess(doc[sent_id])
            sent_length_dict[sent_id] = len(preprocessed_sent)
        token_num_dict[docid] = sent_length_dict
    return token_num_dict
token_num_dict = get_token_num(documents_set_sents)


# store the number of sentences in a documents
def get_sent_num(documents_set_sents):
    sent_num = {}
    for docid in range(len(documents_set_sents)):
        sent_num[docid] = len(documents_set_sents[docid])
    return sent_num
sent_num = get_sent_num(documents_set_sents)


# compute BM25
def Okapi_BM25(query, docid, sent_num, inverted_index_dict, token_num_dict, doc_term_freqs, doc_freqs):
    preprocessed_query = preprocess(query)
    query_terms = set(preprocessed_query)
    query_terms_freqs = extract_term_freqs(query)
    k1 = 1.2
    k2 = 1.5
    b = 0.75
    score = {}
                
    for sent_id in range(sent_num[docid]):
        sent_score = 0
        for term in query_terms:
            N = sent_num[docid]                          # the number of sentences
            f = len(inverted_index_dict[docid][term])    # number of sentences contain term
            fdt =  doc_term_freqs[docid][sent_id][term]  # number of a term in a sentence
            Ld = sum(doc_term_freqs[docid][sent_id].values())     # length of a sentence
            Lavg =  sum(doc_freqs[docid].values()) / sent_num[docid]   # ave length of sentences
            fqt = query_terms_freqs[term]                # number of term in a query

            idf = log((N - f + 0.5)/(f + 0.5))
            tf_doc = ((k1 + 1) * fdt)/(k1 * ((1-b) + b * Ld/Lavg) + fdt)
            query_tf = (k2 + 1) * fqt / (k2 + fqt)
            wt = idf * tf_doc * query_tf
            sent_score += wt
        score[sent_id] = sent_score
    return score

# return the best matching sentence
def best_matching_sent_id(scores):
    best_matching_sent_id = max(scores, key=lambda k: scores[k])
    return best_matching_sent_id

def best_match_sent(documents_set_sents, data_set):
    best_match_sent = {}
    for query_id in range(len(data_set)):
        query = data_set[query_id]['question']
        docid = data_set[query_id]['docid']
        scores = Okapi_BM25(query, docid, sent_num, inverted_index_dict, token_num_dict, doc_term_freqs, doc_freqs)
        best_matching_sent_id = max(scores, key=lambda k: scores[k])
        best_match_sent[query_id] = documents_set_sents[docid][best_matching_sent_id]
    return best_match_sent

# generate best match sentence for questions in training set, development set and testing set.
best_match_sent_train = best_match_sent(documents_set_sents, data_set = training_set)
best_match_sent_devel = best_match_sent(documents_set_sents, data_set = devel_set)
best_match_sent_test = best_match_sent(documents_set_sents, data_set = testing_set)


<b> 2. Named Entity Recognition</b>

In [None]:
'''
Reference: Jenny Rose Finkel, Trond Grenager, and Christopher Manning. 2005. Incorporating Non-local Information into Information Extraction Systems 
by Gibbs Sampling. Proceedings of the 43nd Annual Meeting of the Association for Computational Linguistics (ACL 2005), 
pp. 363-370. http://nlp.stanford.edu/~manning/papers/gibbscrf3.pdf
'''

import nltk
from nltk.tag import StanfordNERTagger
from nltk import pos_tag, sent_tokenize, word_tokenize
from nltk.chunk import conlltags2tree
from nltk.tree import Tree
nltk.download('averaged_perceptron_tagger')

# StanfordNERTagger 7 class model for recognizing locations, persons, organizations, times, money, percents, and dates
st = StanfordNERTagger('/Users/yue/stanford-ner/classifiers/english.muc.7class.distsim.crf.ser.gz',
                       '/Users/yue/stanford-ner/stanford-ner.jar',
                       encoding='utf-8')

# obtain stanford entity information for best match sentences
def get_tagged_sents(best_match_sents):
    sents_list = []
    for i in range(len(best_match_sents.keys())):
        sents_list.append(word_tokenize(best_match_sents[i]))
    tagged_sent = st.tag_sents(sents_list)
    for st_tag in tagged_sent:
        for token, tag in st_tag:
            if token == '': # remove empty array
                st_tag.remove((token, tag))
    return tagged_sent
        
st_best_match_sents_devel = get_tagged_sents(best_match_sent_devel)
st_best_match_sents_train = get_tagged_sents(best_match_sent_train)
st_best_match_sents_test = get_tagged_sents(best_match_sent_test)


In [None]:
from nltk.chunk import conlltags2tree
from nltk.tree import Tree

# convert Stanford NER Tagger result into NLTK tress
## adapted from https://stackoverflow.com/questions/30664677/extract-list-of-persons-and-organizations-using-stanford-ner-tagger-in-nltk
# word boundaries
def stanfordNE2bio(st_tag):
    bio_tagged_sent = []
    prev_tag = 'O'
    for token, tag in st_tag:
        if tag == "O": #O
            bio_tagged_sent.append((token, tag))
            prev_tag = tag
            continue
        if tag != "O" and prev_tag == "O": # Begin NE
            bio_tagged_sent.append((token, "B-"+tag))
            prev_tag = tag
        elif prev_tag != "O" and prev_tag == tag: # Inside NE
            bio_tagged_sent.append((token, "I-"+tag))
            prev_tag = tag
        elif prev_tag != "O" and prev_tag != tag: # Adjacent NE
            bio_tagged_sent.append((token, "B-"+tag))
            prev_tag = tag
    return bio_tagged_sent

# convert Stanford NER Tagger result into NLTK tress
def stanfordNE2tree(st_tag):
    bio_tagged_sent = stanfordNE2bio(st_tag)
    sent_tokens, sent_ne_tags = zip(*bio_tagged_sent)
    sent_pos_tags = [pos for token, pos in pos_tag(sent_tokens)]
    sent_conlltags = [(token, pos, ne) for token, pos, ne in zip(sent_tokens, sent_pos_tags, sent_ne_tags)]
    ne_tree = conlltags2tree(sent_conlltags)
    return ne_tree

# get continuous named entity words
def get_continuous_NE(st_tag):
    ne_tree = stanfordNE2tree(st_tag)
    ne_in_sent = []
    ne_token = []
    for subtree in ne_tree:
        if type(subtree) == Tree: 
            ne_label = subtree.label()
            ne_string = " ".join([token for token, pos in subtree.leaves()])
            ne_in_sent.append((ne_string.lower(), ne_label))
            ne_token.append(ne_string.lower())
    return ne_in_sent

## End of adaptation

# obtain named entity for best matching sent by joining contiguous words with the same type for each data set
def get_final_en(st_best_match_sents):
    final_en = []
    for st_tag in st_best_match_sents:
        final_en.append(get_continuous_NE(st_tag))
    return final_en

devel_set_ne = get_final_en(st_best_match_sents_devel)
train_set_ne = get_final_en(st_best_match_sents_train)
test_set_ne = get_final_en(st_best_match_sents_test)


<b> 3. Question type classification and answer extraction </b>

In [None]:
# answer type detection
'''
question wh-word: what, which, when, where, who, why, how(how far, how many, how much, how long).

Qestion Class  Answer type
Who/Whom       (Person/Organisation)
Where          (Location)
When           (Date/Time)
Which          (Person/Location/Date/Time)
Why            (Unkown)
What           (Locations, Persons, Organizations, Times, Money, Percents, Dates)
How            (Money, Percents, etc)
'''
answers = []
same_count = 0
for question_id in range(len(testing_set)):
    ans = 'unk'
    candidate_ans = []
    lowercased_query = testing_set[question_id]['question'].lower()
    # best match sentence
    best_match_sentence = best_match_sent_test[question_id]
    # named entity for best match sentence
    final_ne_sent = test_set_ne[question_id]
    sent_pos = [pos for token,pos in final_ne_sent]
    # question answering
    if final_ne_sent == []:
        ans = 'unk'
    else:
        # when
        if 'when' in lowercased_query or 'date' in lowercased_query or 'year' in lowercased_query:
            for token, pos in final_ne_sent:
                if pos == 'DATE' or pos == 'TIME':
                    ans = token

        # who
        elif 'who' in lowercased_query or 'whom' in lowercased_query:
            for token, pos in final_ne_sent: 
                if pos == 'ORGANISATION':
                    ans = token
                elif pos == 'PERSON':
                    ans = token

        # where
        elif 'where' in lowercased_query or 'place' in lowercased_query:
            for token, pos in final_ne_sent:
                if pos == 'LOCATION':
                    ans = token

        # num: how far, how many, how much, how long
        elif 'how' in lowercased_query or 'far' in lowercased_query or 'many' in lowercased_query or 'much' in lowercased_query or 'long' in lowercased_query:
            for token, pos in final_ne_sent:
                if pos == 'MONEY':
                    ans = token
                elif pos == 'PERCENT':
                    ans = token
                elif pos == 'TIME':
                    ans = token
                elif pos == 'DATE':
                    ans = token
                    
        elif 'percent' in lowercased_query:
            for token, pos in final_ne_sent:
                if pos == 'PERCENT':
                    ans = token

        if ans == 'unk':
            ans = final_ne_sent[0][0]
    answers.append([question_id, ans])


In [None]:
import csv
with open('test_ans1.csv', 'w', newline='', encoding='UTF-8') as csvfile:
    writer = csv.writer(csvfile, delimiter=',')
    header = ['id', 'answer']
    writer.writerow(header)
    writer.writerows(answers)
csvfile.close()