In [None]:
import torch
from transformers import BertTokenizer, BertModel, AutoModel, AutoTokenizer
import numpy as np


class DocumentEmbedder:
    def __init__(self, model_name='bert-large-uncased'):
        self.model_name = model_name
        self.tokenizer = BertTokenizer.from_pretrained(model_name)
        self.model = BertModel.from_pretrained(model_name)
    def embed_doc(self, documents):
        # Initialize an empty list to store the document embeddings
        document_embeddings = []

        # Loop through each document and embed it
        for doc in documents:
            # Tokenize the document and convert it to tensors
            inputs = self.tokenizer(doc, return_tensors='pt', truncation=True, padding=True)
            
            # Forward pass through the BERT model
            with torch.no_grad():
                outputs = self.model(**inputs)
                # Extract the last layer embeddings for each token
                embeddings = outputs.last_hidden_state
            
            # Calculate the mean of token embeddings to get document-level embedding
            doc_embedding = torch.mean(embeddings, dim=1).squeeze().numpy()
            document_embeddings.append(doc_embedding)
        return document_embeddings
    
    def encode_question(self, question, max_length=128):
        tokens = self.tokenizer.encode(question, add_special_tokens=True, max_length=max_length, truncation=True)

        # Convert tokens to tensors
        input_ids = torch.tensor([tokens])
        
        with torch.no_grad():
            outputs = self.model(input_ids)
            question_embedding = outputs.last_hidden_state[:, 0, :]

        # Convert the PyTorch tensor to a NumPy array
        question_embedding_np = question_embedding.numpy()

        return question_embedding_np


In [None]:
bert = DocumentEmbedder()

In [None]:
from serpapi import GoogleSearch

def get_query(query):
    params = {
        "q": query,
        "engine": "google",
        "api_key": "d5283d8a6c6640c36e5228ae57e8baa9170859f8b5fa73e3c941cdb51afa9e0f"
    }

    search = GoogleSearch(params)
    results = search.get_dict()
    organic_results = results['organic_results']
    return organic_results

Query = "Inception movie"
organic_results = get_query(Query)



In [None]:
# get named entities form the Query variable

import spacy

def get_named_entities(query):
    nlp = spacy.load('en_core_web_sm')
    # get the named entities from the query
    # return a list of named entities
    # example: ['Michael Jordan', 'Chicago Bulls', 'NBA']
    named_entities = []
    doc = nlp(query)
    for ent in doc.ents:
        named_entities.append(ent.text)
    return named_entities

print(get_named_entities("Professor Lawrence Angrave"))

In [None]:
import trafilatura as tr
import pandas as pd
import wikipedia as wiki

def parse_results(link):
    downloaded = tr.fetch_url(link)
    text = tr.extract(downloaded, include_formatting=True)
    if text == None:
        return ''
    return text

def get_scholar_results(author_id):
    params = {
        'engine': 'google_scholar_author',
        'author_id': author_id,
        'api_key' : 'd5283d8a6c6640c36e5228ae57e8baa9170859f8b5fa73e3c941cdb51afa9e0f'
    }
    search = GoogleSearch(params)
    results = search.get_dict()
    return results

dataset = {'titles': [], 'text': []}

named_entity = "Inception"
for result in organic_results:
    if 'wikipedia.org' in result['link']:
        with open("Documents/wiki.txt", 'w') as f:
            wiki_results = wiki.search(result['title'])
            page = wiki.page(wiki_results[0], auto_suggest=False)
            f.write(page.content)
        dataset['titles'].append("wikiresult")
        dataset['text'].append(page.content)
            
    # if 'scholar.google.com' in result['link']:
    #     # get user id from link
    #     user_id = result['link'].split('user=')[1].split('&')[0]
    #     # get scholar results
    #     scholar_results = get_scholar_results(user_id)
    #     print(scholar_results["author"])
    else:
        text = parse_results(result['link'])
        if named_entity.lower() in text.lower():
            with open("Documents/" + result['title'] + '.txt', 'w') as f:
                f.write(parse_results(result['link']))
            dataset['titles'].append(result['title'])
            dataset['text'].append(parse_results(result['link']))

df = pd.DataFrame(dataset)
# df.to_csv('Dataset.csv', index=False, sep='\t')


In [None]:
# split the text into passages

def split_text(text: str, n=100, character=" "):
    """Split the text every ``n``-th occurrence of ``character``"""
    text = text.split(character)
    return [character.join(text[i : i + n]).strip() for i in range(0, len(text), n)]


def split_documents(documents: dict) -> dict:
    """Split documents into passages"""
    titles, texts = [], []
    for title, text in zip(documents["titles"], documents["text"]):
        if text is not None:
            for passage in split_text(text):
                titles.append(title if title is not None else "")
                texts.append(passage)
    return {"title": titles, "text": texts}

documents = df.to_dict('list')
documents = split_documents(documents)


In [None]:
# 
documents["embedding"] = bert.embed_doc(documents["text"])
print(len(documents["embedding"]))


In [None]:
new_df_embed = pd.DataFrame(documents)
print(new_df_embed.shape)

In [None]:
label_to_id = {label: i for i, label in enumerate(new_df_embed['title'].unique())}
print(label_to_id)
id_to_label = {i: label for label, i in label_to_id.items()}

In [None]:
new_df_embed.head(-1)

In [None]:
import numpy as np


train_features = np.array(new_df_embed['embedding'].tolist())
train_labels = np.array([label_to_id[i] for i in new_df_embed['title'].tolist()])
print("train_features shape = ", train_features.shape)
print("train_labels shape = ", train_labels.shape)

In [None]:
import numpy as np
from sklearn.neighbors import KNeighborsClassifier

# Create a KNN classifier with cosine similarity as the metric
knn = KNeighborsClassifier(metric='cosine')

# Fit the KNN model with document embeddings
knn.fit(train_features, train_labels) 

In [None]:
question = "When was inception released"
question_embedding = bert.encode_question(question)[0]
print(question_embedding.shape)  


In [None]:
from transformers import T5Tokenizer, T5ForConditionalGeneration
def get_answer(question, context):
    tokenizer = T5Tokenizer.from_pretrained("google/flan-t5-base")
    model_flan = T5ForConditionalGeneration.from_pretrained("google/flan-t5-base",max_length=2048)
    input_text = "Answer based on context \n\n context: v1 \n\n question: v2" 
    input_text = input_text.replace('v1', context)
    input_text = input_text.replace('v2', question)
    input_ids = tokenizer(input_text, return_tensors="pt").input_ids
    outputs = model_flan.generate(input_ids)
    return tokenizer.decode(outputs[0])





# get the top k most relevant documents
k = 2
top_k = knn.predict_proba([question_embedding])[0].argsort()[-k:][::-1]

for i in top_k:
    print(id_to_label[i])



# concatenate the text from the top k documents
context = df.iloc[top_k]['text'].str.cat(sep=' ')
# print(context)



# get the answer
answer = get_answer(question, context)
print(answer)


In [None]:
# clean the documents folder
import os

for file in os.listdir('Documents'):
    os.remove('Documents/' + file)


In [None]:
from transformers import AutoTokenizer, AutoModelForQuestionAnswering
from collections import OrderedDict


class DocumentReader:
    def __init__(self, pretrained_model_name_or_path='bert-large-uncased'):
        self.READER_PATH = pretrained_model_name_or_path
        self.tokenizer = AutoTokenizer.from_pretrained(self.READER_PATH)
        self.model = AutoModelForQuestionAnswering.from_pretrained(self.READER_PATH)
        self.max_len = self.model.config.max_position_embeddings
        self.chunked = False

    def tokenize(self, question, text):
        self.inputs = self.tokenizer.encode_plus(question, text, add_special_tokens=True, return_tensors="pt")
        self.input_ids = self.inputs["input_ids"].tolist()[0]

        if len(self.input_ids) > self.max_len:
            self.inputs = self.chunkify()
            self.chunked = True

    def chunkify(self):
        """ 
        Break up a long article into chunks that fit within the max token
        requirement for that Transformer model. 

        Calls to BERT / RoBERTa / ALBERT require the following format:
        [CLS] question tokens [SEP] context tokens [SEP].
        """

        # create question mask based on token_type_ids
        # value is 0 for question tokens, 1 for context tokens
        qmask = self.inputs['token_type_ids'].lt(1)
        qt = torch.masked_select(self.inputs['input_ids'], qmask)
        chunk_size = self.max_len - qt.size()[0] - 1 # the "-1" accounts for
        # having to add an ending [SEP] token to the end

        # create a dict of dicts; each sub-dict mimics the structure of pre-chunked model input
        chunked_input = OrderedDict()
        for k,v in self.inputs.items():
            q = torch.masked_select(v, qmask)
            c = torch.masked_select(v, ~qmask)
            chunks = torch.split(c, chunk_size)
            
            for i, chunk in enumerate(chunks):
                if i not in chunked_input:
                    chunked_input[i] = {}

                thing = torch.cat((q, chunk))
                if i != len(chunks)-1:
                    if k == 'input_ids':
                        thing = torch.cat((thing, torch.tensor([102])))
                    else:
                        thing = torch.cat((thing, torch.tensor([1])))

                chunked_input[i][k] = torch.unsqueeze(thing, dim=0)
        return chunked_input

    def get_answer(self):
        if self.chunked:
            answer = ''
            for k, chunk in self.inputs.items():
                answer_start_scores, answer_end_scores = self.model(**chunk)

                answer_start = torch.argmax(answer_start_scores)
                answer_end = torch.argmax(answer_end_scores) + 1

                ans = self.convert_ids_to_string(chunk['input_ids'][0][answer_start:answer_end])
                if ans != '[CLS]':
                    answer += ans + " / "
            return answer
        else:
            answer_start_scores, answer_end_scores = self.model(**self.inputs)

            answer_start = torch.argmax(answer_start_scores)  # get the most likely beginning of answer with the argmax of the score
            answer_end = torch.argmax(answer_end_scores) + 1  # get the most likely end of answer with the argmax of the score
        
            return self.convert_ids_to_string(self.inputs['input_ids'][0][
                                              answer_start:answer_end])

    def convert_ids_to_string(self, input_ids):
        return self.tokenizer.convert_tokens_to_string(self.tokenizer.convert_ids_to_tokens(input_ids))