# Imports

In [None]:
!pip install transformers torch
!pip install sentence-transformers
!pip install nltk
!pip install unidecode

Collecting unidecode
  Downloading Unidecode-1.3.8-py3-none-any.whl.metadata (13 kB)
Downloading Unidecode-1.3.8-py3-none-any.whl (235 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m235.5/235.5 kB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: unidecode
Successfully installed unidecode-1.3.8


In [None]:
from google.colab import drive
import pandas as pd
import numpy as np
from numpy import savetxt
import json
from json import JSONEncoder
import gzip
import ast
from sentence_transformers import SentenceTransformer, util
import time
import torch
from transformers import RobertaTokenizerFast, RobertaForQuestionAnswering
from transformers import PegasusTokenizer, PegasusForConditionalGeneration
from unidecode import unidecode
import string
import re
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
import heapq
from collections import defaultdict
import pickle
import spacy
spacy_model = spacy.load('en_core_web_sm')
import ipywidgets as widgets
from IPython.display import display, clear_output, HTML

import nltk
from nltk.tokenize import sent_tokenize, word_tokenize
from nltk.corpus import words, stopwords
from nltk.stem import WordNetLemmatizer
nltk.download('punkt_tab')
nltk.download('stopwords')
nltk.download('punkt')
nltk.download('wordnet')
nltk.download('omw-1.4')

[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt_tab.zip.
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data] Downloading package omw-1.4 to /root/nltk_data...


True

# Data Gathering

In [None]:
# Connect to drive
drive.mount('/content/drive')

folder1="Itamar&Yarden"
folder2="קבצי טקסט"
file_name="data_wikipedia.csv"

df = pd.read_csv('/content/drive/My Drive/'+folder1+'/'+folder2+'/'+file_name)

Mounted at /content/drive


In [None]:
# Convert the column back to a list
df["sentences_list"] = df["sentences_list"].apply(ast.literal_eval)
df["chunks_list"] = df["chunks_list"].apply(ast.literal_eval)

In [None]:
# Path to the compressed JSON file
file_path = "/content/drive/My Drive/"+folder1+"/"+folder2+"/embedded_data.json.gz"

# Open and load the JSON data
with gzip.open(file_path, "rt") as read_file:
    data = json.load(read_file)

# Convert the JSON back to a DataFrame
data_dict = {
    "embedded_miniLM_text": [],
    "embedded_miniLM_chunks": [],
    "embedded_miniLM_sentences": []
}

# Iterate through the loaded data and decode JSON strings
for i in range(len(data)):
    data_dict["embedded_miniLM_text"].append(torch.tensor(json.loads(data[str(i)]["embedded_miniLM_text"])))
    data_dict["embedded_miniLM_chunks"].append(torch.tensor(json.loads(data[str(i)]["embedded_miniLM_chunks"])))
    data_dict["embedded_miniLM_sentences"].append(torch.tensor(json.loads(data[str(i)]["embedded_miniLM_sentences"])))

df["embedded_miniLM_text"] = data_dict["embedded_miniLM_text"]
df["embedded_miniLM_chunks"] = data_dict["embedded_miniLM_chunks"]
df["embedded_miniLM_sentences"] = data_dict["embedded_miniLM_sentences"]

In [None]:
# Load the saved n-gram counts
file_name = "ngram_counts.pkl"
with open('/content/drive/My Drive/'+folder1+'/'+folder2+'/'+file_name, 'rb') as file:
    loaded_ngram_counts = pickle.load(file)

# Access each `defaultdict`
unigram_counts = loaded_ngram_counts["unigram"]
bigram_counts = loaded_ngram_counts["bigram"]
trigram_counts = loaded_ngram_counts["trigram"]
vocab = loaded_ngram_counts["vocab"]

# Spelling Error Corrector

In [None]:
# Data preprocess
def preprocess_text(text):
    # Remove 's from text
    text = re.sub(r'\'s', '', text)
    # Remove special characters
    text = re.sub(r'[^\w\s]', '', text)
    # Lower text
    text = text.lower()
    # Remove repeating letters
    text = re.sub(r'(.)\1{2,}', r'\1\1', text)
    # Remove excess whitespace
    text = ' '.join(text.split())

    return text

In [None]:
# Alaph bet for candidates finding
abc = "abcdefghijklmnopqrstuvwxyz"

# Replace each letter with each letter in the ABC
def replaceLetters(word):
  candidates = []
  for i in range(len(word)):
    for letter in abc:
      alt = word[:i] + letter + word[i+1:]
      candidates.append(alt)
  return candidates

# Incert each letter in the ABC
def incertLetters(word):
  candidates = []
  for i in range(len(word)+1):
    for letter in abc:
      alt = word[:i] + letter + word[i:]
      candidates.append(alt)
  return candidates

# Delete each letter one at a time
def deleteLetters(word):
  candidates = []
  for i in range(len(word)):
    alt = word[:i] + word[i+1:]
    candidates.append(alt)
  return candidates

# Replace all letters with each others
def shuffleLetters(word):
  candidates = []
  for i in range(len(word)):
    for j in range(len(word)):
      # Convert the word to a list of characters
      alt = list(word)
      # Swap the letters at the specified indices
      alt[i], alt[j] = alt[j], alt[i]
      # Join the list back into a string
      alt = ''.join(alt)
      candidates.append(alt)
  return candidates

In [None]:
# Generate possibles corrections for wrong spelled word
def generate_candidates(word, vocab, distance):
    if word in vocab:
        return {word}

    candidates = set(replaceLetters(word) + incertLetters(word) + deleteLetters(word) + shuffleLetters(word))
    if distance == 1:
        return candidates & vocab  # Return only known words

    # For distance=2, limit the generation
    second_round = set()
    for candidate in candidates:
        second_round.update(replaceLetters(candidate) + incertLetters(candidate) + deleteLetters(candidate) + shuffleLetters(candidate))
    return second_round & vocab


In [None]:
# Return sentences score
def evaluate_sentence(sentence, trigram_counts, bigram_counts, unigram_counts, trigram_weight=0.7, bigram_weight=0.4, unigram_weight=0.3):
    score = 0
    total_unigram_count = sum(unigram_counts.values())

    # Calculate scores for trigrams, bigrams, and unigrams
    for i in range(len(sentence) - 2):  # Loop over the sentence, stopping 2 words before the end
        trigram = (sentence[i], sentence[i+1], sentence[i+2])
        bigram = (sentence[i], sentence[i+1])
        unigram = sentence[i]

        # Get the trigram, bigram, and unigram counts
        trigram_score = trigram_counts.get(trigram, 0)
        bigram_score = bigram_counts.get(bigram, 0)
        unigram_score = unigram_counts.get(unigram, 0)

        # Normalize the unigram score
        normalized_unigram_score = unigram_score / total_unigram_count if total_unigram_count > 0 else 0

        # Calculate the weighted score
        score += (trigram_weight * trigram_score) + (bigram_weight * bigram_score) + (unigram_weight * normalized_unigram_score)

    # Handle the last bigram and unigram for the final two words
    if len(sentence) >= 2:
        bigram = (sentence[-2], sentence[-1])
        bigram_score = bigram_counts.get(bigram, 0)
        unigram_score = unigram_counts.get(sentence[-2], 0)

        normalized_unigram_score = unigram_score / total_unigram_count if total_unigram_count > 0 else 0
        score += (bigram_weight * bigram_score) + (unigram_weight * normalized_unigram_score)

    # Add the last word's unigram score
    last_unigram_score = unigram_counts.get(sentence[-1], 0)
    normalized_last_unigram_score = last_unigram_score / total_unigram_count if total_unigram_count > 0 else 0
    score += (unigram_weight * normalized_last_unigram_score)

    return score


In [None]:
# Find best sequence using beam search
def correct_spelling_error(words, vocab, beam_width, trigram_counts, bigram_counts, unigram_counts):
    # Detect named entities using spaCy on the original text (before preprocessing)
    doc = spacy_model(words)
    named_entities = [ent.text.lower() for ent in doc.ents]

    # Preproces the sentence
    words = preprocess_text(words).split(" ")
    # Start with correct words
    current_sequences = [(0, [])]  # sequence list, (score, sentence)

    for word in words:
        new_candidates = []

        # If the word was a named entity in the original text, skip correction
        if word in named_entities:
            candidates = [word]  # Keep the named entity unchanged
        else:
            # Generate candidates for the current word
            candidates = generate_candidates(word, vocab, 1)
            if not candidates:
                candidates = generate_candidates(word, vocab, 2)
                if not candidates:
                    candidates = [word]

        # Expand every possible sequence
        for _, seq in current_sequences:
            for candidate in candidates:
                new_seq = seq + [candidate]
                new_score = evaluate_sentence(new_seq, trigram_counts, bigram_counts, unigram_counts)
                new_candidates.append((new_score, new_seq))

        # Save only the sequences with the highest scores
        # beam_width - number of sequences that gets saved every time
        current_sequences = heapq.nlargest(beam_width, new_candidates, key=lambda x: x[0])
    # Return the best sequence
    best_sequence = max(current_sequences, key=lambda x: x[0])

    return best_sequence


# Answer Retrieval

## Docs Retrieval

In [None]:
# Load the miniLM model
miniLM_model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md:   0%|          | 0.00/10.7k [00:00<?, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

1_Pooling/config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

In [None]:
documents = list(df["initial_processed_text"])

In [None]:
documents_embedded = torch.stack(list(df["embedded_miniLM_text"]), dim=0)

In [None]:
# Return top documents based on miniLM and query
def retrieve_top_documents(embedded_query, documents_embedded, documents, top_k=5):

  # Compute cosine similarity scores
  cosine_scores_minilm = util.pytorch_cos_sim(embedded_query, documents_embedded)

  # Sort by similarity
  minilm_sorted_indices = np.argsort(-cosine_scores_minilm.numpy()[0])
  minilm_ranked_documents = [documents[i] for i in minilm_sorted_indices]

  return minilm_ranked_documents[:top_k]

In [None]:
# Return top chunks based on miniLM and query
def retrieve_top_chunks(embedded_query, embedded_chunks, chunks, top_k=5):

    # Adjust top_k if it exceeds the number of chunks
    top_k = min(top_k, len(chunks))

    # Compute cosine similarities
    scores = util.cos_sim(embedded_query, embedded_chunks)[0]
    top_k_indices = torch.topk(scores, k=top_k).indices

    # Return the top-k most relevant chunks
    return [(chunks[i], scores[i].item()) for i in top_k_indices]


In [None]:
# Return top chunk from all documents based on miniLM and query
# top_k = (docs, chunks, relevent_chunks)
def retrieve_relevent_chunks(query, documents_embedded, documents, retrievel_model, top_k=(5,5,5)):
  # Generate embeddings
  embedded_query = retrievel_model.encode(query, convert_to_tensor=True)

  # Retrieve top documents
  top_docs = retrieve_top_documents(embedded_query, documents_embedded, documents, top_k[0])

  # Retrieve top chunks for each document
  relevent_chunks = []
  for doc in top_docs:
    i = df[df["initial_processed_text"] == doc].index[0]
    chunks_embeddings = df["embedded_miniLM_chunks"][i]
    chunks = list(df["chunks_list"][i])
    top_chunks = retrieve_top_chunks(embedded_query, chunks_embeddings, chunks, top_k[1])
    relevent_chunks.extend(top_chunks)

  relevent_chunks = [chunk for chunk, _ in sorted(relevent_chunks, key=lambda x: x[1], reverse=True)][:top_k[2]]

  return relevent_chunks


## RoBERTa

In [None]:
# Load the pre-trained fine-tuned RoBERTa model and tokenizer
model_name = f'/content/drive/My Drive/{folder1}/{folder2}/fine_tuned_roberta_qa'
roberta_tokenizer = RobertaTokenizerFast.from_pretrained(model_name)
roberta_model = RobertaForQuestionAnswering.from_pretrained(model_name)

In [None]:
# Return best answer from documents based on roberta and query
def answer_question_roberta(relevant_chunks, query, model, tokenizer, retriever_model, batch_size=4, top_k=5):
    best_answer = None
    best_score = float('-inf')
    inputs_cache = None  # Cache for inputs corresponding to the best answer

    # Divide chunks into batches
    batches = [relevant_chunks[i:i + batch_size] for i in range(0, len(relevant_chunks), batch_size)]

    for batch in batches:
        # Prepare inputs for the batch
        batch_inputs = tokenizer(
            [query] * len(batch),  # Repeat the query for each chunk
            batch,
            return_tensors='pt', truncation=True, padding=True, max_length=256
        )

        # Forward pass through the QA model for the batch
        with torch.no_grad():
            outputs = model(**batch_inputs)

        # Extract logits
        start_logits = outputs.start_logits
        end_logits = outputs.end_logits

        for i, chunk in enumerate(batch):
            start_index = torch.argmax(start_logits[i]).item()
            end_index = torch.argmax(end_logits[i]).item()

            if start_index == 0:
              continue

            # Check for invalid answer length
            inputs_len = batch_inputs['input_ids'][i][start_index:end_index + 1].size()[0]
            if inputs_len < 2:
                continue

            # Calculate the confidence score
            confidence_score = start_logits[i, start_index].item() + end_logits[i, end_index].item()

            # Update the best answer if the confidence score is higher
            if confidence_score > best_score:
                best_score = confidence_score
                inputs_cache = batch_inputs
                best_answer_indices = (start_index, end_index)
                best_chunk_index = i  # Store the index of the best chunk

    if inputs_cache is None:
        return "Answer not found in context."

    # Decode the best answer
    start_index, end_index = best_answer_indices
    answer_tokens = inputs_cache['input_ids'][best_chunk_index][start_index:end_index + 1]  # +1 to include end index
    best_answer = tokenizer.decode(answer_tokens, skip_special_tokens=True)

    return best_answer


## miniLM

In [None]:
# Load the miniLm model
miniLM_model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')

In [None]:
# Evaluate all the sentences and return their scores
def retrieve_relevent_sentences(embedded_query, embedded_sentences, sentences, top_k=1):

  # Compute cosine similarities
  scores = util.cos_sim(embedded_query, embedded_sentences)[0]
  top_k_indices = torch.topk(scores, k=top_k).indices

  # Return the top-k most relevant chunks
  return [(sentences[i], scores[i].item()) for i in top_k_indices][0]

In [None]:
# Return answer based on miniLM and query
def answer_question_miniLM(query, documents_embedded, documents, model):
    # Encode the query
    embedded_query = model.encode(query, convert_to_tensor=True)

    # Retrieve the top documents
    top_docs = retrieve_top_documents(embedded_query, documents_embedded, documents, top_k=5)

    # List to store top sentences
    relevent_sentences = []

    # Loop through the top documents to extract the most relevant sentences
    for doc in top_docs:
        # Find the index of the document in the dataframe
        i = df[df["initial_processed_text"] == doc].index[0]
        # Retrieve the embedded sentences and the list of sentences for the document
        embedded_sentences = df["embedded_miniLM_sentences"][i]
        sentences = list(df["sentences_list"][i])

        # Find the most relevant sentence
        best_sentence = retrieve_relevent_sentences(embedded_query, embedded_sentences, sentences, 1)

        # Extract the sentence and its similarity score
        answer = best_sentence[0]
        score = best_sentence[1]

        # If the similarity score is below the threshold, skip this sentence
        if score < 0.3668292164802551:
            continue
        else:
            # Add the relevant sentence to the list
            relevent_sentences.append(best_sentence)

    # If no relevant sentences are found, return no answer
    if not relevent_sentences:
        return "Answer not found in context."

    # Sort the relevant sentences by score and return the best answer
    return [answer for answer, _ in sorted(relevent_sentences, key=lambda x: x[1], reverse=True)][0]


# Answer Transformation

In [None]:
# load pre-trained Pegasus Paraphrase model and tokenizer
tokenizer_pegasus = PegasusTokenizer.from_pretrained("tuner007/pegasus_paraphrase")
model_pegasus = PegasusForConditionalGeneration.from_pretrained("tuner007/pegasus_paraphrase")

tokenizer_config.json:   0%|          | 0.00/86.0 [00:00<?, ?B/s]

spiece.model:   0%|          | 0.00/1.91M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/65.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/1.14k [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/2.28G [00:00<?, ?B/s]

Some weights of PegasusForConditionalGeneration were not initialized from the model checkpoint at tuner007/pegasus_paraphrase and are newly initialized: ['model.decoder.embed_positions.weight', 'model.encoder.embed_positions.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [None]:
# Return transformed answer using answer and query
def transform_answer(answer, query, model, tokenizer):
  # Create input sentence using the current structure
  sentence = f"Question: {query}. Answer: {answer}"

  # Tokenize input with truncation
  input_ids = tokenizer_pegasus.encode(
    sentence,
    return_tensors="pt",
    truncation=True,
  )

  # Generate paraphrased text
  paraphrase_ids = model_pegasus.generate(
    input_ids,
    num_beams=10,
    max_length=60,
    early_stopping=True
  )

  # Decode the output
  paraphrase = tokenizer_pegasus.decode(paraphrase_ids[0], skip_special_tokens=True)

  # Check if model return unwanted "Answer: ", if yes, removes it
  if paraphrase.startswith("Answer: "):
    paraphrase = paraphrase[len("Answer: "):]

  return paraphrase

# Final UI

In [None]:
# Define the widgets
corrected_toggle = widgets.ToggleButtons(
    options=["Corrected", "Uncorrected"],
    description="Query:",
    value="Corrected"
)
model_selector = widgets.ToggleButtons(
    options=["MiniLM", "RoBERTa"],
    description="Model:",
    value="MiniLM"
)
paraphrase_toggle = widgets.ToggleButtons(
    options=["Paraphrased", "Original"],
    description="Answer:",
    value="Paraphrased"
)

In [None]:
# Define the callback function
def on_run_button_click(b):
    with output_area:
        clear_output(wait=True)  # Clear output area before printing
        query = query_input.value
        use_corrected = corrected_toggle.value == "Corrected"
        use_minilm = model_selector.value == "MiniLM"
        use_paraphrased = paraphrase_toggle.value == "Paraphrased"

        # Display a loading animation like a spinning circle
        loading_html = """
        <div style='display: flex; align-items: center; justify-content: center; height: 100px;'>
            <div class="spinner"></div>
        </div>
        <style>
            .spinner {
                border: 5px solid rgba(0,0,0,0.1);
                border-radius: 50%;
                border-top: 5px solid #3498db;
                width: 30px;
                height: 30px;
                animation: spin 1s linear infinite;
            }
            @keyframes spin {
                0% { transform: rotate(0deg); }
                100% { transform: rotate(360deg); }
            }
        </style>
        """
        display(HTML(loading_html))

        # Clear the answer variable
        answer = None

        if use_corrected:
            beam_width = 3
            query = " ".join(correct_spelling_error(query, vocab, beam_width, trigram_counts, bigram_counts, unigram_counts)[1])

        relevant_chunks = retrieve_relevent_chunks(query, documents_embedded, documents, miniLM_model, (10,10,4))

        if use_minilm:
            answer = answer_question_miniLM(query, documents_embedded, documents, miniLM_model)
        else:
            answer = answer_question_roberta(relevant_chunks, query, roberta_model, roberta_tokenizer, miniLM_model, batch_size=8)

        if use_paraphrased:
            answer = transform_answer(answer, query, model_pegasus, tokenizer_pegasus)

        # If there's an answer, remove the loading spinner
        if answer:
            clear_output(wait=True)

        # Display results in a formatted way
        display(HTML(f"<div style='border: 1px solid #ccc; padding: 5px; margin-top: 10px; border-radius: 5px; background-color: #f9f9f9; width: 50%;'>"
                     f"<p style='color: #333;'>{answer}</p>"
                     f"</div>"))

In [None]:
# Define widgets
run_button = widgets.Button(description="Run", button_style='success')
output_area = widgets.Output()
query_input = widgets.Text(description="Query:")

# Display the widgets
display(query_input, corrected_toggle, model_selector, paraphrase_toggle, run_button, output_area)

# Attach the callback to the button
run_button.on_click(on_run_button_click)

Text(value='', description='Query:')

ToggleButtons(description='Query:', options=('Corrected', 'Uncorrected'), value='Corrected')

ToggleButtons(description='Model:', index=1, options=('MiniLM', 'RoBERTa'), value='RoBERTa')

ToggleButtons(description='Answer:', index=1, options=('Paraphrased', 'Original'), value='Original')

Button(button_style='success', description='Run', style=ButtonStyle())

Output()