In [None]:
import os
import re
import json
import numpy as np
import pandas as pd
import torch
from tqdm.auto import tqdm
from sentence_transformers import SentenceTransformer, util
from transformers import T5Tokenizer, T5ForConditionalGeneration
from time import perf_counter as timer

In [None]:
class NewsArticleProcessor:
    def __init__(self, json_path, min_token_length=30, n_resources_to_return=10):
        self.json_path = json_path
        self.embedding_model_name = "all-mpnet-base-v2"
        self.min_token_length = min_token_length
        self.n_resources_to_return = n_resources_to_return
        self.embedding_model = SentenceTransformer(model_name_or_path=self.embedding_model_name, device="cpu")
        self.df = None
        self.articles_and_chunks = []
        self.save_path = os.path.join(os.getcwd(), 'articles_and_embeddings_df.csv')

    def load_json(self):
        with open(self.json_path, 'r') as json_file:
            data = json.load(json_file)
        return data

    @staticmethod
    def clean_text(text: str) -> str:
        return text.replace('\n', '')

    def preprocess_data(self, data):
        for article in data:
            article['articleBody'] = self.clean_text(article['articleBody'])
        self.df = pd.DataFrame(data)
        self.df['joined_article'] = self.df['title'] + ' ' + self.df['articleBody']

    def process_articles(self):
        for _, row in tqdm(self.df.iterrows(), total=self.df.shape[0]):
            article = row['joined_article']
            chunk_dict = {}

            article = article.replace("  ", " ").strip()
            article = re.sub(r'\.([A-Z])', r'. \1', article)

            chunk_dict["joined_article"] = article
            chunk_dict["article_char_count"] = len(article)
            chunk_dict["article_word_count"] = len(article.split())
            chunk_dict["article_token_count"] = len(article) / 4
            self.articles_and_chunks.append(chunk_dict)

        self.df = pd.DataFrame(self.articles_and_chunks)

    def filter_and_embed_articles(self):
        articles_over_min_token_len = self.df[self.df["article_token_count"] > self.min_token_length].to_dict(orient="records")

        for item in tqdm(articles_over_min_token_len):
            item["embedding"] = self.embedding_model.encode(item["joined_article"])

        self.df = pd.DataFrame(articles_over_min_token_len)

    def save_to_csv(self):
        self.df.to_csv(self.save_path, index=False)

    def load_from_csv(self):
        self.df = pd.read_csv(self.save_path)
        return self.df

    def process(self, load_csv=False):
        data = self.load_json()
        self.preprocess_data(data)
        self.process_articles()
        self.filter_and_embed_articles()
        self.save_to_csv()

        if load_csv:
            return self.load_from_csv()

    def prepare_embeddings(self):
        self.df["embedding"] = self.df["embedding"].apply(lambda x: np.fromstring(x.strip("[]"), sep=" "))
        self.articles = self.df.to_dict(orient="records")
        self.embeddings = torch.tensor(np.array(self.df["embedding"].tolist()), dtype=torch.float32).to("cpu")

    def retrieve_relevant_resources(self, query: str, print_time: bool=True):
        query_embedding = self.embedding_model.encode(query, convert_to_tensor=True)

        start_time = timer()
        dot_scores = util.dot_score(query_embedding, self.embeddings)[0]
        end_time = timer()

        if print_time:
            print(f"[INFO] Time taken to get scores on {len(self.embeddings)} embeddings: {end_time-start_time:.5f} seconds.")

        scores, indices = torch.topk(input=dot_scores, k=self.n_resources_to_return)
        relevant_articles = [self.articles[index]["joined_article"] for index in indices]

        return relevant_articles

In [None]:
class QuestionAnswering:
    def __init__(self, model_name="t5-base"):
        self.tokenizer = T5Tokenizer.from_pretrained(model_name)
        self.model = T5ForConditionalGeneration.from_pretrained(model_name)

    @staticmethod
    def array_to_string_with_newlines(arr):
        return '\n\n\n\n'.join(arr)

    def generate_answer(self, question, context):
        input_text = f"Answer the question: {question} using the context: {context}"
        inputs = self.tokenizer.encode(input_text, return_tensors="pt", max_length=512, truncation=True)

        outputs = self.model.generate(inputs, max_length=1000, num_beams=10, early_stopping=True, repetition_penalty=2.0, length_penalty=1.5, temperature=0.7)
        answer = self.tokenizer.decode(outputs[0], skip_special_tokens=True)

        return answer

In [None]:
def preprocess_articles(json_path):
    processor = NewsArticleProcessor(json_path=json_path)
    processor.process(load_csv=True)
    processor.prepare_embeddings()
    return processor

def get_answer(processor, question):
    relevant_articles = processor.retrieve_relevant_resources(query=question)

    qa = QuestionAnswering()
    context_text = qa.array_to_string_with_newlines(relevant_articles)
    answer = qa.generate_answer(question=question, context=context_text)

    return answer

In [None]:
json_path = 'news.json'
processor = preprocess_articles(json_path)

In [None]:
question = "What happened at the Al-Shifa Hospital?"
answer = get_answer(processor, question)
print("Answer:", answer)