## **Generating Q&As from Documents**

In [1]:
import re
import PyPDF2
from openai import OpenAI
from typing import List, Tuple, Dict
import logging
from pathlib import Path
import concurrent.futures
import json
from datetime import datetime
from tqdm import tqdm

In [2]:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

Process a single PDF file

In [3]:
def process_pdf(pdf_path: Path, chunk_size: int = 400) -> Dict:
    """
    Process a single PDF file and return its chunks and metadata.
    
    Args:
        pdf_path: Path to the PDF file
        chunk_size: Size of each text chunk
        
    Returns:
        Dictionary containing text chunks and metadata
    """
    try:
        if not pdf_path.exists():
            raise FileNotFoundError(f"PDF file not found: {pdf_path}")
            
        with pdf_path.open('rb') as pdf_file:
            pdf_reader = PyPDF2.PdfReader(pdf_file)
            text = ''
            metadata = {
                'filename': pdf_path.name,
                'pages': len(pdf_reader.pages),
                'processed_date': datetime.now().isoformat()
            }
            
            # Extract text page by page
            for i, page in enumerate(pdf_reader.pages):
                try:
                    text += page.extract_text()
                except Exception as e:
                    logger.warning(f"Failed to extract text from {pdf_path.name}, page {i}: {e}")
                    continue
            
            # Split text into chunks
            chunks = []
            current_chunk = ""
            
            sentences = re.split(r'(?<=[.!?])\s+', text)
            for sentence in sentences:
                if len(current_chunk) + len(sentence) <= chunk_size:
                    current_chunk += sentence + " "
                else:
                    chunks.append(current_chunk.strip())
                    current_chunk = sentence + " "
            
            if current_chunk:
                chunks.append(current_chunk.strip())
            
            metadata['chunk_count'] = len(chunks)
            
            return {
                'chunks': chunks,
                'metadata': metadata
            }
            
    except Exception as e:
        logger.error(f"Error processing PDF {pdf_path.name}: {e}")
        return {
            'chunks': [],
            'metadata': {
                'filename': pdf_path.name,
                'error': str(e),
                'processed_date': datetime.now().isoformat()
            }
        }


Function to generate Q&A pairs for multiple chunks

In [4]:
def generate_qa_for_chunks(chunks: List[str], pdf_name: str, client: OpenAI, model: str = "llama2:7b") -> List[Dict]:
    """
    Generate Q&A pairs for multiple chunks of text in a batch.
    
    Args:
        chunks: List of text chunks to process
        pdf_name: Name of the source PDF
        client: OpenAI client instance
        model: Name of the model to use
        
    Returns:
        List of dictionaries containing Q&A pairs and metadata for each chunk
    """
    results = []
    for chunk_index, chunk in enumerate(chunks):
        try:
            prompt = (
                "Read the following text and generate 10 question-answer pairs. "
                "Make sure questions are clear and answers are accurate and concise.\n\n"
                f"Text: {chunk}\n\n"
                "Format each pair as:\nQ: Question\nA: Answer\n. Strictly follow the format"
            )
            
            response = client.completions.create(
                model=model,
                prompt=prompt,
                max_tokens=3000,
                temperature=0.3
            )
            
            results.append({
                'content': response.choices[0].text.strip(),
                'metadata': {
                    'pdf_name': pdf_name,
                    'chunk_index': chunk_index,
                    'generated_date': datetime.now().isoformat()
                }
            })
        except Exception as e:
            logger.error(f"Error generating Q&A pairs for {pdf_name}, chunk {chunk_index}: {e}")
            results.append({
                'content': '',
                'metadata': {
                    'pdf_name': pdf_name,
                    'chunk_index': chunk_index,
                    'error': str(e),
                    'generated_date': datetime.now().isoformat()
                }
            })
    return results

Function to process a folder of PDFs efficiently

In [5]:
def process_folder(folder_path: str, output_dir: str, client: OpenAI, max_workers: int = 4) -> None:
    """
    Process all PDFs in a folder and generate Q&A pairs efficiently.
    
    Args:
        folder_path: Path to folder containing PDFs
        output_dir: Path to output directory
        client: OpenAI client instance
        max_workers: Maximum number of parallel workers
    """
    try:
        # Create output directory if it doesn't exist
        output_path = Path(output_dir)
        output_path.mkdir(parents=True, exist_ok=True)
        
        # Get all PDF files in the folder
        folder = Path(folder_path)
        pdf_files = list(folder.glob('*.pdf'))
        
        if not pdf_files:
            logger.warning(f"No PDF files found in {folder_path}")
            return
        
        logger.info(f"Found {len(pdf_files)} PDF files to process")
        
        # Process PDFs in parallel
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            # Process PDFs and extract chunks
            pdf_futures = {executor.submit(process_pdf, pdf_file): pdf_file 
                         for pdf_file in pdf_files}
            
            pdf_results = {}
            for future in tqdm(concurrent.futures.as_completed(pdf_futures), 
                             total=len(pdf_files), 
                             desc="Processing PDFs"):
                pdf_file = pdf_futures[future]
                try:
                    pdf_results[pdf_file.name] = future.result()
                except Exception as e:
                    logger.error(f"Failed to process {pdf_file.name}: {e}")
            
            # Generate Q&A pairs for all chunks
            for pdf_name, result in pdf_results.items():
                chunks = result['chunks']
                if not chunks:
                    continue
                
                qa_results = generate_qa_for_chunks(chunks, pdf_name, client)
                
                # Save Q&A pairs
                qa_output_path = output_path / f"{Path(pdf_name).stem}_qa.txt"
                with qa_output_path.open('w', encoding='utf-8') as f:
                    for qa in sorted(qa_results, key=lambda x: x['metadata']['chunk_index']):
                        f.write(qa['content'] + '\n\n')
                
                # Save metadata
                metadata_output_path = output_path / f"{Path(pdf_name).stem}_metadata.json"
                metadata = {
                    'pdf_metadata': result['metadata'],
                    'qa_metadata': [qa['metadata'] for qa in qa_results]
                }
                with metadata_output_path.open('w', encoding='utf-8') as f:
                    json.dump(metadata, f, indent=2)
        
        logger.info(f"Successfully processed {len(pdf_files)} PDFs. Results saved in {output_dir}")
        
    except Exception as e:
        logger.error(f"Error processing folder: {e}")
        raise


Function to extract Q&A pairs from a file

In [6]:
def extract_qna_from_file(file_path: str) -> Tuple[List[str], List[str]]:
    """
    Extract questions and answers from a generated Q&A file.
    
    Args:
        file_path: Path to the Q&A file
        
    Returns:
        Tuple of (questions list, answers list)
    """
    try:
        input_path = Path(file_path)
        if not input_path.exists():
            raise FileNotFoundError(f"Input file not found: {file_path}")
            
        with input_path.open('r', encoding='utf-8') as file:
            content = file.read()
            
        pattern = r'Q:\s*(.*?)\s*\nA:\s*(.*?)(?=\s*(?:Q:|$))'
        qna_pairs = re.findall(pattern, content, re.DOTALL)
        
        questions = [q.strip() for q, _ in qna_pairs]
        answers = [a.strip() for _, a in qna_pairs]
        
        logger.info(f"Successfully extracted {len(questions)} Q&A pairs from {file_path}")
        return questions, answers
        
    except Exception as e:
        logger.error(f"Error extracting Q&A pairs from {file_path}: {e}")
        raise

 Main script

In [7]:
if __name__ == "__main__":
    # Configuration
    pdf_folder = "pdfs"  # Folder containing PDFs
    output_folder = "qa_output"  # Folder for output files
    max_workers = 4  # Number of parallel workers

    # Initialize the OpenAI client
    client = OpenAI(base_url='http://localhost:11434/v1', api_key='ollama')

    # Process all PDFs in the folder
    process_folder(pdf_folder, output_folder, client, max_workers)

    # Optionally, extract and display Q&As for a specific file
    qa_files = list(Path(output_folder).glob('*_qa.txt'))
    if qa_files:
        sample_file = qa_files[0]
        print(f"\nSample Q&As from {sample_file.name}:")
        questions, answers = extract_qna_from_file(sample_file)
        for i, (q, a) in enumerate(zip(questions, answers), 1):
            print(f"\nQ{i}: {q}")
            print(f"A{i}: {a}")

2025-01-09 18:22:20,348 - INFO - Found 18 PDF files to process
Processing PDFs: 100%|██████████| 18/18 [00:00<00:00, 50.92it/s] 
2025-01-09 18:22:35,593 - INFO - HTTP Request: POST http://localhost:11434/v1/completions "HTTP/1.1 200 OK"
2025-01-09 18:22:48,272 - INFO - HTTP Request: POST http://localhost:11434/v1/completions "HTTP/1.1 200 OK"
2025-01-09 18:23:00,579 - INFO - HTTP Request: POST http://localhost:11434/v1/completions "HTTP/1.1 200 OK"
2025-01-09 18:23:11,367 - INFO - HTTP Request: POST http://localhost:11434/v1/completions "HTTP/1.1 200 OK"
2025-01-09 18:23:19,987 - INFO - HTTP Request: POST http://localhost:11434/v1/completions "HTTP/1.1 200 OK"
2025-01-09 18:23:31,656 - INFO - HTTP Request: POST http://localhost:11434/v1/completions "HTTP/1.1 200 OK"
2025-01-09 18:23:42,437 - INFO - HTTP Request: POST http://localhost:11434/v1/completions "HTTP/1.1 200 OK"
2025-01-09 18:23:51,696 - INFO - HTTP Request: POST http://localhost:11434/v1/completions "HTTP/1.1 200 OK"
2025-01

KeyboardInterrupt: 

In [1]:
import os

def reformat_qa_file(input_file, output_file):
    """
    Reformat Q&A text file to a cleaner format for easier reading and processing.
    Handles both `**Q:` and `**Q**:` formats. Stray lines are removed.

    Args:
        input_file (str): Path to the input text file.
        output_file (str): Path to save the reformatted text file.
    """
    with open(input_file, 'r') as file:
        lines = file.readlines()
    
    reformatted_lines = []
    current_question = ""
    current_answer = ""
    
    for line in lines:
        line = line.strip()
        if line.startswith("**Q:") or line.startswith("**Q**:"):
            # If we already have a Q and A in progress, append them first
            if current_question:
                reformatted_lines.append(f"Q: {current_question}\nA: {current_answer}\n")
            # Start a new Q
            current_question = line.replace("**Q:", "").replace("**Q**:", "").replace("**", "").strip()
            current_answer = ""
        elif line.startswith("**A:") or line.startswith("**A**:"):
            # Capture the answer portion
            current_answer = line.replace("**A:", "").replace("**A**:", "").replace("**", "").strip()
        else:
            # Ignore stray lines
            continue

    # If there's one last Q&A after the loop finishes, append it
    if current_question:
        reformatted_lines.append(f"Q: {current_question}\nA: {current_answer}\n")
    
    with open(output_file, 'w') as file:
        file.writelines(reformatted_lines)


def process_all_txt_files_in_folder(input_folder, output_folder):
    """
    Processes all .txt files in the input folder, reformats them, and saves to the output folder.

    Args:
        input_folder (str): Path to the folder containing input .txt files.
        output_folder (str): Path to the folder to save reformatted .txt files.
    """
    # Ensure output folder exists
    os.makedirs(output_folder, exist_ok=True)
    
    # Iterate over all .txt files in the input folder
    for file_name in os.listdir(input_folder):
        if file_name.endswith(".txt"):
            input_file_path = os.path.join(input_folder, file_name)
            output_file_path = os.path.join(output_folder, file_name)
            
            # Reformat the Q&A file
            reformat_qa_file(input_file_path, output_file_path)
            print(f"Processed: {file_name} -> {output_file_path}")


In [2]:
# Example usage:
input_folder_path = 'qa_output'       # Replace with your actual folder containing .txt Q&A files
output_folder_path = 'refined_q&as'   # The folder where you want the cleaned files to go

process_all_txt_files_in_folder(input_folder_path, output_folder_path)
print("Processing completed.")

Processed: 2008_Summer_Olympics_torch_relay_qa.txt -> refined_q&as\2008_Summer_Olympics_torch_relay_qa.txt
Processed: Adult_contemporary_music_qa.txt -> refined_q&as\Adult_contemporary_music_qa.txt
Processed: Affirmative_action_in_the_United_States_qa.txt -> refined_q&as\Affirmative_action_in_the_United_States_qa.txt
Processed: Aircraft_carrier_qa.txt -> refined_q&as\Aircraft_carrier_qa.txt
Processed: Airport_qa.txt -> refined_q&as\Airport_qa.txt
Processing completed.
