In [None]:
!pip install wikipedia
!pip install transformers
!pip install keybert

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [1]:
import pandas as pd

train = pd.read_csv("/content/drive/MyDrive/datasets/AIIJC/2-NLP/train_data.csv")[:50000]
test = pd.read_csv("/content/drive/MyDrive/datasets/AIIJC/2-NLP/test.csv")
sample = pd.read_csv("/content/drive/MyDrive/datasets/AIIJC/2-NLP/sample_submission.csv")

In [2]:
from transformers import pipeline
from keybert import KeyBERT
import wikipedia as wiki
from tqdm.notebook import tqdm
import warnings
from scipy.spatial.distance import cosine
from sklearn.feature_extraction.text import TfidfVectorizer


class QuestionAnswering():
    def __init__(self, train_data, transformer_name='bert-large-uncased-whole-word-masking-finetuned-squad', reserve_transformer='deepset/bert-large-uncased-whole-word-masking-squad2', min_kw_len=3, lang='en', ignore_warnings=True):
        if ignore_warnings: warnings.simplefilter("ignore")
        self.transformer = pipeline(task='question-answering', model=transformer_name, tokenizer=transformer_name)
        self.reserve_transformer = pipeline(task='question-answering', model=reserve_transformer, tokenizer=transformer_name)
        self.kw_model = KeyBERT()
        self.train = train_data
        self.min_kw_len = min_kw_len
        self.vectorizer = TfidfVectorizer()
        self.vectorizer.fit(self.train['question'].tolist())
        self.corpus_v = self.vectorizer.transform(self.train['question'].tolist()).toarray()
        wiki.set_lang(lang)

    
    def __repr__(self):
        return "Question Answering class"


    def find_key_word(self, sentence, ngrams=(1,3)):
        return self.kw_model.extract_keywords(sentence, keyphrase_ngram_range=ngrams, stop_words=None)[0][0]


    def search_on_wiki(self, topic):
        page = wiki.page(''.join(topic.split()))
        return page.content

    
    def cosine_answer(self, question):
        text_v = self.vectorizer.transform([question]).toarray()
        max = {'value' : 0, 'id': 0}
        for i in range(len(self.corpus_v)):
            cos = 1 - cosine(text_v[0], self.corpus_v[i])
            if cos > max['value']:
                max['value'] = cos
                max['id'] = i

        return self.train['answer'][max['id']]


    def find_wiki_data(self, question):
        all_data = []
        keyw = []
        for i in range(self.min_kw_len, len(question.split())):
            try:
                keywords = self.find_key_word(question, (len(question.split()) - i-1, len(question.split()) - i))
                keyw.append(keywords)
                data = wiki.summary(keywords)
                if len(data) > 0:
                    all_data.append(data)
                else:
                    continue
            except:
                continue

        if len(all_data) > 0:
            return ' '.join(all_data)

        for keywords in keyw:
            try:
                data = self.search_on_wiki(keywords)
                if len(data) > 0:
                    all_data.append(data)
                else:
                    continue
            except:
                continue

        if len(all_data) > 0:
            return ' '.join(all_data)

        for keywords in keyw:
            try:
                kw = wiki.search(keywords, results=1)
                data = wiki.summary(kw)
                if len(data) > 0:
                    all_data.append(data)
                else:
                    continue
            except:
                continue

        if len(all_data) > 0:
            return ' '.join(all_data)

        for keywords in keyw:
            try:
                kw = wiki.search(keywords, results=1)
                data = self.search_on_wiki(kw)
                if len(data) > 0:
                    all_data.append(data)
                else:
                    continue
            except:
                continue

        if len(all_data) > 0:
            return ' '.join(all_data)

        keyw = []
        for i in range(len(question.split())-self.min_kw_len, len(question.split())):
            try:
                keywords = self.find_key_word(question, (len(question.split()) - i-1, len(question.split()) - i))
                keyw.append(keywords)
                data = wiki.summary(keywords)
                if len(data) > 0:
                    all_data.append(data)
                else:
                    continue
            except:
                continue

        if len(all_data) > 0:
            return ' '.join(all_data)

        for keywords in keyw:
            try:
                data = self.search_on_wiki(keywords)
                if len(data) > 0:
                    all_data.append(data)
                else:
                    continue
            except:
                continue

        if len(all_data) > 0:
            return ' '.join(all_data)

        for keywords in keyw:
            try:
                kw = wiki.search(keywords, results=1)
                data = wiki.summary(kw)
                if len(data) > 0:
                    all_data.append(data)
                else:
                    continue
            except:
                continue

        if len(all_data) > 0:
            return ' '.join(all_data)

        for keywords in keyw:
            try:
                kw = wiki.search(keywords, results=1)
                data = self.search_on_wiki(kw)
                if len(data) > 0:
                    all_data.append(data)
                else:
                    continue
            except:
                continue

        if len(all_data) > 0:
            return ' '.join(all_data)

        
        for keywords in keyw:
            try:
                kws = wiki.search(keywords, results=20)
                for kw in kws:
                    data = self.search_on_wiki(kw)
                    if len(data) > 0:
                        all_data.append(data)
                    else:
                        continue
            except:
                continue

        if len(all_data) > 0:
            return ' '.join(all_data)
        else:
            return 0


    def give_answer(self, question):
        context = self.find_wiki_data(question)
        if context == 0: return self.cosine_answer(question)
        answer = self.transformer({
            'context' : context,
            'question' : question})['answer']
        if answer != '.': return answer
        else:
            answer = self.reserve_transformer({
                'context' : context,
                'question' : question})['answer']
            if answer != '.': return answer
            else:
                return self.cosine_answer(question)


    def predict(self, corpus):
        answers = []
        for question in tqdm(corpus):
            ans = self.give_answer(question)
            answers.append(ans)
        return answers

In [None]:
corpus = test['question'].tolist()

QA = QuestionAnswering(train)
answers = QA.predict(corpus)

  0%|          | 0/2463 [00:00<?, ?it/s]

In [None]:
import requests

requests.get("https://api.telegram.org/bot2057911206:AAGQfUTD03Vd0N6dhhJcgec1F-gMPmX0wqo/sendMessage?chat_id=799213094&text=Prediction is ready!")

In [None]:
submission = sample.copy()
submission['answer'] = answers
submission.to_csv("submission_12.csv")