In [1]:
!pip install transformers[sentencepiece]



In [2]:
from transformers import pipeline, AutoModelForSeq2SeqLM, AutoTokenizer
import nltk
import spacy
from nltk.tokenize import sent_tokenize
from sklearn.feature_extraction.text import TfidfVectorizer
import numpy as np

nltk.download('punkt')
nltk.download('punkt_tab')
nlp = spacy.load("en_core_web_sm")

# from sklearn.metrics.pairwise import cosine_similarity
# from sentence_transformers import SentenceTransformer

# Standard library imports
import os
import re
from collections import Counter, defaultdict
from itertools import combinations

# Third-party imports
import matplotlib.colors as mcolors
import matplotlib.pyplot as plt
import nltk
import numpy as np
import pandas as pd
import seaborn as sns
import spacy

from google.colab import drive
drive.mount('/content/drive')
os.chdir('/content/drive/MyDrive/Colab Notebooks/Cosc524 - Collaboration')


# nltk downloads and imports
from nltk.corpus import stopwords
from nltk.tokenize import (
    RegexpTokenizer,
    TextTilingTokenizer,
    sent_tokenize,
    word_tokenize,
)



[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt_tab.zip.


Mounted at /content/drive


In [3]:
contractions_dict = {
      "ain’t": "are not", "aren’t": "are not", "can’t": "cannot", "can’t’ve": "cannot have",
      "'cause": "because", "could’ve": "could have", "couldn’t": "could not", "couldn’t’ve": "could not have",
      "didn’t": "did not", "doesn’t": "does not", "don’t": "do not", "hadn’t": "had not",
      "hadn’t’ve": "had not have", "hasn’t": "has not", "haven’t": "have not", "he’d": "he would",
      "he’d’ve": "he would have", "he’ll": "he will", "he’ll’ve": "he will have", "how’d": "how did",
      "how’d’y": "how do you", "how’ll": "how will", "I’d": "I would", "I’d’ve": "I would have",
      "I’ll": "I will", "I’ll’ve": "I will have", "I’m": "I am", "I’ve": "I have", "isn’t": "is not",
      "it’d": "it would", "it’d’ve": "it would have", "it’ll": "it will", "it’ll’ve": "it will have",
      "let’s": "let us", "ma’am": "madam", "mayn’t": "may not", "might’ve": "might have",
      "mightn’t": "might not", "mightn’t’ve": "might not have", "must’ve": "must have", "mustn’t": "must not",
      "mustn’t’ve": "must not have", "needn’t": "need not", "needn’t’ve": "need not have", "o’clock": "of the clock",
      "oughtn’t": "ought not", "oughtn’t’ve": "ought not have", "shan’t": "shall not", "sha’n’t": "shall not",
      "shan’t’ve": "shall not have", "she’d": "she would", "she’d’ve": "she would have", "she’ll": "she will",
      "she’ll’ve": "she will have", "should’ve": "should have", "shouldn’t": "should not",
      "shouldn’t’ve": "should not have", "so’ve": "so have", "that’d": "that would", "that’d’ve": "that would have",
      "there’d": "there would", "there’d’ve": "there would have", "they’d": "they would",
      "they’d’ve": "they would have", "they’ll": "they will", "they’ll’ve": "they will have", "they’re": "they are",
      "they’ve": "they have", "to’ve": "to have", "wasn’t": "was not", "we’d": "we would",
      "we’d’ve": "we would have", "we’ll": "we will", "we’ll’ve": "we will have", "we’re": "we are",
      "we’ve": "we have", "weren’t": "were not", "what’ll": "what will", "what’ll’ve": "what will have",
      "what’re": "what are", "what’ve": "what have", "when’ve": "when have", "where’d": "where did",
      "where’ve": "where have", "who’ll": "who will", "who’ll’ve": "who will have", "who’ve": "who have",
      "why’ve": "why have", "will’ve": "will have", "won’t": "will not", "won’t’ve": "will not have",
      "would’ve": "would have", "wouldn’t": "would not", "wouldn’t’ve": "would not have", "y’all": "you all",
      "y’all’d": "you all would", "y’all’d’ve": "you all would have", "y’all’re": "you all are",
      "y’all’ve": "you all have", "you’d": "you would", "you’d’ve": "you would have", "you’ll": "you will",
      "you’ll’ve": "you will have", "you’re": "you are", "you’ve": "you have"
  }

In [4]:
def load_text(file_path):
    """
    Load text from a file.
    """
    print("Loading the text from the file...")
    with open(file_path, 'r', encoding='utf-8') as file:
        return file.read()

In [5]:
def remove_punctuation(text):
    """
    Remove punctuation from a given text string.

    This function takes a string and removes any character that is not a
    word character (alphanumeric) or whitespace, effectively stripping out
    punctuation.

    Parameters:
        text (str): The input string from which to remove punctuation.

    Returns:
        str: The input string with punctuation removed.
    """
    return re.sub(r'[^\w\s]', '', text)

In [6]:
def expand_contractions(s, contractions_dict, cont_reg):
    """
    Expand contractions in a given text string using a dictionary of contractions.

    This function searches for contractions in the input string `s` using a regular
    expression pattern (`cont_reg`) and replaces each contraction with its expanded
    form from the `contractions_dict`.

    Parameters:
        s (str): The input string containing contractions to expand.
        contractions_dict (dict): A dictionary where keys are contractions (e.g., "can't")
                              and values are their expanded forms (e.g., "cannot").
        cont_reg (re.Pattern): A regular expression pattern to identify
                           contractions within the text.

    Returns:
        str: The input string with contractions expanded.

    """
    def replace(match):
        contraction = match.group(0).lower()
        if contraction in contractions_dict:
            return contractions_dict[contraction]
        return contraction  # Returns original if not in dictionary

    return cont_reg.sub(replace, s)

In [7]:
def remove_gutenberg_header_footer(text, start_marker, end_marker):
    """
    Remove the header and footer from Project Gutenberg text.

    Finds specified start and end markers and returns the content in between.

    Parameters:
        text (str): The text to clean.
        start_marker (str): Marker indicating the start of main content.
        end_marker (str): Marker indicating the end of main content.

    Returns:
        str: Text with header and footer removed, or original text if markers are not found.
    """
    start_index = text.find(start_marker)
    if start_index == -1:
        return text

    end_index = text.find(end_marker)
    if end_index == -1:
        return text

    cleaned_text = text[start_index + len(start_marker):end_index].strip()
    return cleaned_text


In [8]:
def identify_chapters(text):
    """
    Identify chapter markers in text and filter.

    Finds chapter markers (e.g., "Chapter 1", "Chapter II") and returns their positions.
    Filters out chapters if they are located too closely to one another. (Table of contents)

    Parameters:
        text (str): The text in which to identify chapter markers.

    Returns:
        list of tuples: List of chapter markers and their positions, filtered by distance.
    """
    chapter_markers = []
    roman_numeral_pattern = r"(?i)\bchapter\s+(\d+|[IVXLCDM]+)\b"

    for match in re.finditer(roman_numeral_pattern, text):
        chapter_markers.append((match.group(0), match.start()))

    # Remove chapters with start index difference less than 1000
    filtered_chapter_markers = []
    for i in range(len(chapter_markers)):
        if i + 1 < len(chapter_markers):  # Check if there's a next chapter
            current_start_index = chapter_markers[i][1]
            next_start_index = chapter_markers[i + 1][1]
            if next_start_index - current_start_index >= 1000:
                filtered_chapter_markers.append(chapter_markers[i])
        else:
            # Always include the last chapter
            filtered_chapter_markers.append(chapter_markers[i])

    return filtered_chapter_markers


In [9]:
def split_novel_into_chapters(text, chapter_markers):
    """
    Split text into chapters based on chapter marker positions.

    Parameters:
        text (str): The novel text to split.
        chapter_markers (list of tuples): Chapter markers with positions in the text.

    Returns:
        list of str: List of chapter texts.
    """
    chapters = []

    for i in range(len(chapter_markers)):
        chapter_num, chapter_start_index = chapter_markers[i]

        if i + 1 < len(chapter_markers):
            next_chapter_start_index = chapter_markers[i + 1][1]
            chapter_text = text[chapter_start_index:next_chapter_start_index].strip()
        else:
            chapter_text = text[chapter_start_index:].strip()

        if chapter_text:
            chapters.append(chapter_text)

    return chapters


In [10]:
def combine_chapters_remove_chapter_numbers(chapters):
    """
    Combine chapter texts into one and remove chapter numbers.

    Parameters:
        chapters (list of str): List of chapter texts.

    Returns:
        str: Combined text with chapter numbers removed.
    """
    combined_text = ""
    for chapter in chapters:
        chapter = re.sub(r"Chapter\s+([IVXLC]+|\d+)\b\.?\s?.*?\n", "", chapter, flags=re.IGNORECASE)
        combined_text += chapter + "\n"  # Add a newline between chapters

    return combined_text

In [11]:
def extract_text(file_name, start_marker, end_marker, contractions_dict):
  # Load the text from the file
  novel_text = load_text(file_name)

  # Remove header/footer and split into chapters
  novel_text_no_hf = remove_gutenberg_header_footer(novel_text, start_marker, end_marker)
  chapter_markers = identify_chapters(novel_text_no_hf)
  chapters = split_novel_into_chapters(novel_text_no_hf, chapter_markers)

  # Expand contractions in chapters
  contractions_dict = {k.lower(): v for k, v in contractions_dict.items()}
  contraction_pattern = r"\b(%s)\b" % '|'.join(map(re.escape, contractions_dict.keys()))
  cont_reg = re.compile(contraction_pattern, re.IGNORECASE)

  chapters = [expand_contractions(chapter, contractions_dict, cont_reg) for chapter in chapters]

  # Combine chapters into single text for analysis
  single_text = combine_chapters_remove_chapter_numbers(chapters)

  # tokenized_sentences = [sent_tokenize(chapter) for chapter in chapters]

  return chapters, single_text

In [12]:

def normalize_sentence(sentence: str) -> str:

    sentence = re.sub(r'[^\w\s.!?]', '', sentence).strip()

    return sentence

def tokenize_and_normalize(text: str):
    # Tokenize the text into sentences using NLTK
    sentences = nltk.sent_tokenize(text)

    # Normalize each sentence
    normalized_sentences = [normalize_sentence(sentence) for sentence in sentences]

    return normalized_sentences

In [13]:
file_name = '/content/drive/MyDrive/Colab Notebooks/Cosc524 - Collaboration/data/a study in scarlet.txt'

start_marker = "*** START OF THE PROJECT GUTENBERG EBOOK A STUDY IN SCARLET ***"
end_marker = "*** END OF THE PROJECT GUTENBERG EBOOK A STUDY IN SCARLET ***"

chapters, single_text = extract_text(file_name, start_marker, end_marker, contractions_dict)
# print(chapters)
# print(single_text)

# Sentence Tokenization
normalized_sentences = tokenize_and_normalize(single_text)

# # Print normalized sentences
# for sentence in normalized_sentences:
#     print(sentence)

# # Word Tokenization
# words = []
# for sentence in normalized_sentences:
#     words.extend(word_tokenize(sentence))

# for word in words:
#     print(word)

Loading the text from the file...


In [14]:
def rank_sentences_by_importance(sentences):

    if isinstance(sentences, str):
        sentences = [sentences]  # Wrap the string in a list

    tfidf_vectorizer = TfidfVectorizer(stop_words='english')
    tfidf_matrix = tfidf_vectorizer.fit_transform(sentences)

    sentence_scores = tfidf_matrix.sum(axis=1).A1
    ranked_indices = np.argsort(sentence_scores)[::-1]

    return [sentences[i] for i in ranked_indices]

def extract_possible_answers(context):
    doc = nlp(context)
    answers = []
    for ent in doc.ents:
        answers.append(ent.text)
    for chunk in doc.noun_chunks:
        answers.append(chunk.text)
    return list(set(answers))

def generate_questions(sentences, answers):
    tokenizer = AutoTokenizer.from_pretrained("valhalla/t5-base-qa-qg-hl")
    model = AutoModelForSeq2SeqLM.from_pretrained("valhalla/t5-base-qa-qg-hl")

    # existing_questions = set()
    qa_pairs = []
    for sentence in sentences:
        for answer in answers:
            if answer in sentence:
                input_text = f"generate question: {sentence} answer: {answer}"
                input_ids = tokenizer.encode(input_text, return_tensors="pt")
                output = model.generate(input_ids,
                                        max_length=128,
                                        num_beams=6,
                                        early_stopping=True,
                                        # repetition_penalty=1.2,  # Penalizes repetitive tokens
                                        # top_k=50,               # Limits the sampling pool to the top 50 tokens
                                        # top_p=0.9,              # Nucleus sampling for diversity
                                        # temperature=0.7         # Adds randomness
                                      )
                question = tokenizer.decode(output[0], skip_special_tokens=True)
                qa_pairs.append({"question": question, "context": sentence, "answer": answer})

                # Check for duplicates
                # if question not in existing_questions:
                #     existing_questions.add(question)
                #     qa_pairs.append({"question": question, "context": sentence, "answer": answer})

    return qa_pairs

# def retain_quality_questions(qa_pairs):
#   model = SentenceTransformer('all-MiniLM-L6-v2')
#   question_embeddings = model.encode([pair['question'] for pair in qa_pairs])
#   unique_qa_pairs = []
#   for i, pair in enumerate(qa_pairs):
#       is_duplicate = False
#       for j, existing_pair in enumerate(unique_qa_pairs):
#           if cosine_similarity([question_embeddings[i]], [question_embeddings[j]])[0][0] > 0.9:
#               is_duplicate = True
#               break
#       if not is_duplicate:
#           unique_qa_pairs.append(pair)

#   qa_pairs = unique_qa_pairs
#   return qa_pairs

def validate_qa_pairs(qa_pairs):
    # Optionally filter or adjust pairs for quality
    valid_pairs = []
    for pair in qa_pairs:
        if pair['answer'] in pair['context']:
            valid_pairs.append(pair)
    return valid_pairs


In [None]:
all_qa_pairs = []

chunk_size = 10

for i in range(0, len(normalized_sentences), chunk_size):
    chunk = normalized_sentences[i:i + chunk_size]  # Extract a chunk of sentences

    # print(chunk)
    # Extract key sentences and possible answers for each chunk
    key_sentences = rank_sentences_by_importance(" ".join(chunk))  # Use the chunk for ranking
    possible_answers = extract_possible_answers(" ".join(chunk))  # Extract answers from the chunk

    # Generate question-answer pairs for the chunk
    qa_pairs = generate_questions(key_sentences, possible_answers)

    # Validate the generated question-answer pairs
    validated_pairs = validate_qa_pairs(qa_pairs)

    # Append the validated pairs to the list of all QA pairs
    all_qa_pairs.extend(validated_pairs)

# Print the final set of question-answer pairs
for pair in all_qa_pairs:
    print(f"Question: {pair['question']}")
    print(f"Answer: {pair['answer']}")
    print(f"Context: {pair['context']}")
    print()

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/90.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/629 [00:00<?, ?B/s]

spiece.model:   0%|          | 0.00/792k [00:00<?, ?B/s]

added_tokens.json:   0%|          | 0.00/31.0 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/65.0 [00:00<?, ?B/s]

You are using the default legacy behaviour of the <class 'transformers.models.t5.tokenization_t5.T5Tokenizer'>. This is expected, and simply means that the `legacy` (previous) behavior will be used so nothing changes for you. If you want to use the new behaviour, set `legacy=False`. This should only be set if you understand what it means, and thoroughly read the reason why this was added as explained in https://github.com/huggingface/transformers/pull/24565


pytorch_model.bin:   0%|          | 0.00/892M [00:00<?, ?B/s]

In [None]:
from transformers import AutoModelForQuestionAnswering, Trainer, TrainingArguments
from datasets import Dataset, DatasetDict

In [None]:
def prepare_dataset(qas):
    data = {'question': [], 'context': [], 'answers': []}
    for qa in qas:
        data['question'].append(qa['question'])
        data['context'].append(qa['context'])
        data['answers'].append({'text': [qa['answer']], 'answer_start': [qa['context'].find(qa['answer'])]})
    return Dataset.from_dict(data)

def fine_tune_bert(dataset):
    tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
    model = AutoModelForQuestionAnswering.from_pretrained("bert-base-uncased")

    def preprocess_function(examples):
        return tokenizer(
            examples['question'], examples['context'], truncation=True, padding=True, max_length=512
        )

    tokenized_datasets = dataset.map(preprocess_function, batched=True)

    training_args = TrainingArguments(
        output_dir="./results",
        evaluation_strategy="epoch",
        learning_rate=2e-5,
        per_device_train_batch_size=16,
        num_train_epochs=3,
        weight_decay=0.01,
        save_strategy="epoch",
        logging_dir="./logs",
        logging_steps=10,
    )

    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=tokenized_datasets["train"],
        eval_dataset=tokenized_datasets["validation"],
        tokenizer=tokenizer,
    )

    trainer.train()
    return model

In [None]:
    dataset = prepare_dataset(all_qa_pairs)

    dataset = dataset.train_test_split(test_size=0.2)
    dataset = DatasetDict({
        'train': dataset['train'],
        'validation': dataset['test']
    })

    fine_tuned_model = fine_tune_bert(dataset)
    fine_tuned_model.save_pretrained("./fine_tuned_bert")