<a href="https://colab.research.google.com/github/erindakapllani/AppliedMath_Task2/blob/main/qa_gan2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install PyMuPDF

Collecting PyMuPDF
  Downloading PyMuPDF-1.24.7-cp310-none-manylinux2014_x86_64.whl (3.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.5/3.5 MB[0m [31m12.0 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting PyMuPDFb==1.24.6 (from PyMuPDF)
  Downloading PyMuPDFb-1.24.6-py3-none-manylinux2014_x86_64.manylinux_2_17_x86_64.whl (15.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m15.7/15.7 MB[0m [31m31.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: PyMuPDFb, PyMuPDF
Successfully installed PyMuPDF-1.24.7 PyMuPDFb-1.24.6


In [4]:
import os
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import spacy
import fitz  # PyMuPDF for PDF extraction

# Initialize SpaCy for tokenization
nlp = spacy.load("en_core_web_sm")

In [5]:
from google.colab import files
import os

# Upload files
uploaded = files.upload()

# Define a folder name for storing uploaded files
folder_name = 'Nuclear_Safety'

# Create the folder if it doesn't exist
if not os.path.exists(folder_name):
    os.makedirs(folder_name)

# Move each uploaded file into the folder
for filename in uploaded.keys():
    # Get the content of the file
    file_content = uploaded[filename]

    # Define the path to store the file
    file_path = os.path.join(folder_name, filename)

    # Write the file content to the specified path
    with open(file_path, 'wb') as f:
        f.write(file_content)

    print(f"File '{filename}' has been saved to '{folder_name}' folder.")

Saving BT31-4-17-2008E.pdf to BT31-4-17-2008E.pdf
File 'BT31-4-17-2008E.pdf' has been saved to 'Nuclear_Safety' folder.


In [7]:

# Function to read all text from PDF files in the folder
def read_documents(folder_path):
    documents = []
    for filename in os.listdir(folder_path):
        if filename.endswith('.pdf'):
            pdf_path = os.path.join(folder_path, filename)
            doc = fitz.open(pdf_path)
            text = ""
            for page_num in range(len(doc)):
                page_text = doc[page_num].get_text()
                text += page_text
            documents.append(text)
    return documents

# Function to generate rule-based questions from the document
def generate_rule_based_questions(doc):
    matcher = spacy.matcher.Matcher(nlp.vocab)

    # Define patterns specific to the nuclear industry including adjectives and nouns
    patterns = [
        [{"POS": "ADJ", "OP": "?"}, {"LOWER": {"IN": ["nuclear", "atomic", "radiation"]}}],
        [{"POS": "ADJ", "OP": "?"}, {"LOWER": {"IN": ["reactor", "fission", "fusion"]}}],
        [{"POS": "ADJ", "OP": "?"}, {"LOWER": {"IN": ["safety", "security", "emergency", "protocol", "regulation"]}}],
        [{"POS": "ADJ", "OP": "?"}, {"LOWER": {"IN": ["waste", "spent", "fuel", "radioactive"]}}],
        [{"POS": "ADJ", "OP": "?"}, {"LOWER": {"IN": ["agency", "international"]}}],
        [{"POS": "ADJ", "OP": "?"}, {"LOWER": {"IN": ["cooling", "monitoring", "response"]}}],
        [{"POS": "ADJ", "OP": "?"}, {"LOWER": "critical"}, {"LOWER": "mass"}],
    ]

    # Adding patterns to the matcher
    for i, pattern in enumerate(patterns):
        matcher.add(f"NUCLEAR_PATTERN_{i}", [pattern])

    # Processing the document and extracting QA pairs
    qa_pairs = []
    seen_questions = set()
    matches = matcher(doc)
    for match_id, start, end in matches:
        span = doc[start:end]
        question = f"What is {span.text}?"
        answer = span.sent.text
        if question not in seen_questions:
            qa_pairs.append((question, answer))
            seen_questions.add(question)

    return qa_pairs



In [8]:
# QADataset class for question-answer dataset
class QADataset(Dataset):
    def __init__(self, documents, tokenizer):
        self.documents = documents
        self.tokenizer = tokenizer
        self.qa_pairs = self.generate_qa_pairs()

    def __len__(self):
        return len(self.qa_pairs)

    def __getitem__(self, idx):
        question, answer = self.qa_pairs[idx]
        question_tokens = self.tokenizer(question)  # Replace with your actual tokenization logic
        answer_tokens = self.tokenizer(answer)      # Replace with your actual tokenization logic
        return question_tokens, answer_tokens

    def generate_qa_pairs(self):
        qa_pairs = []
        for document in self.documents:
            doc = nlp(document)
            pairs = generate_rule_based_questions(doc)
            qa_pairs.extend(pairs)
        return qa_pairs


In [9]:

# Generator and Discriminator models (example)
class Generator(nn.Module):
    def __init__(self):
        super(Generator, self).__init__()
        # Define your generator architecture
        self.fc1 = nn.Linear(100, 256)  # Example linear layer

    def forward(self, x):
        # Generator forward pass logic
        x = self.fc1(x)
        return x

class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()
        # Define your discriminator architecture
        self.fc1 = nn.Linear(256, 1)  # Example linear layer

    def forward(self, x):
        # Discriminator forward pass logic
        x = self.fc1(x)
        return x


In [10]:


# Training function for QA-GANs
def train_qagan(generator, discriminator, dataloader, num_epochs, optimizer_gen, optimizer_disc):
    criterion = nn.BCEWithLogitsLoss()
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    generator.to(device)
    discriminator.to(device)
    generator.train()
    discriminator.train()

    for epoch in range(num_epochs):
        for i, (questions, answers) in enumerate(dataloader):
            # Move data to device
            questions = questions.to(device)
            answers = answers.to(device)

            # Train Discriminator
            optimizer_disc.zero_grad()
            real_outputs = discriminator(questions)
            fake_outputs = discriminator(generator(questions))
            real_labels = torch.ones_like(real_outputs)
            fake_labels = torch.zeros_like(fake_outputs)
            disc_loss_real = criterion(real_outputs, real_labels)
            disc_loss_fake = criterion(fake_outputs, fake_labels)
            disc_loss = disc_loss_real + disc_loss_fake
            disc_loss.backward()
            optimizer_disc.step()

            # Train Generator
            optimizer_gen.zero_grad()
            gen_outputs = generator(questions)
            disc_outputs = discriminator(gen_outputs)
            gen_loss = criterion(disc_outputs, torch.ones_like(disc_outputs))
            gen_loss.backward()
            optimizer_gen.step()

            if i % 10 == 0:
                print(f'Epoch [{epoch}/{num_epochs}], Step [{i}/{len(dataloader)}], Gen Loss: {gen_loss.item()}, Disc Loss: {disc_loss.item()}')


In [11]:

# Main function
def main():
    # Read documents and create dataset
    documents = read_documents(folder_path)
    dataset = QADataset(documents, nlp)

    # Initialize DataLoader
    batch_size = 16
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

    # Initialize generator and discriminator models
    generator = Generator()
    discriminator = Discriminator()

    # Initialize optimizers with model parameters
    optimizer_gen = torch.optim.Adam(generator.parameters(), lr=0.001)
    optimizer_disc = torch.optim.Adam(discriminator.parameters(), lr=0.001)

    # Train QA-GANs
    num_epochs = 10
    train_qagan(generator, discriminator, dataloader, num_epochs, optimizer_gen, optimizer_disc)

# Execute main function
if __name__ == "__main__":
    main()


TypeError: default_collate: batch must contain tensors, numpy arrays, numbers, dicts or lists; found <class 'spacy.tokens.doc.Doc'>