# 1 Getting the document
# 2 Extract text
# 3 Split document into chuncks

In [48]:
from typing import List, Dict, Any
from langchain.schema.runnable import RunnableLambda
import tempfile
from langchain.document_loaders import PyPDFLoader
import requests  

# Function to download PDFs from URLs
def download_pdfs(urls: List[str]) -> List[str]:
    pdf_paths = []
    for url in urls:
        try:
            response = requests.get(url)
            response.raise_for_status()
            with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as temp_file:
                temp_file.write(response.content)
                pdf_paths.append(temp_file.name)
        except Exception as e:
            print(f"Error downloading {url}: {str(e)}")
    return pdf_paths

# Step 1: Define extract_text_from_pdfs for a single PDF path
def extract_text_from_pdfs(pdf_path: str) -> Dict[str, Any]:
    loader = PyPDFLoader(pdf_path)
    pages = loader.load()

    # Combine pages into a single document
    full_text = "\n".join(page.page_content for page in pages)

    return {
        "text": full_text,
        "source": pdf_path,
        "pages": len(pages)
    }

# Step 2: Define split_pdf_to_chunks to split extracted text
def split_pdf_to_chunks(extracted_text: str, chunk_size: int = 10000) -> List[str]:
    # Split the extracted text into chunks of specified size
    chunks = [extracted_text[i:i + chunk_size] for i in range(0, len(extracted_text), chunk_size)]
    return chunks

# Step 3: Define the RunnableLambda for each step
extract_text = RunnableLambda(lambda pdf_path: extract_text_from_pdfs(pdf_path)["text"])  # Extract the text
chuncks_split = RunnableLambda(lambda extracted_text: split_pdf_to_chunks(extracted_text))  # Split the text into chunks

# Step 4: Chain the runnables together
split_pdf_runnable = extract_text | chuncks_split

# Download the PDF
# pdf_paths = download_pdfs(['https://arxiv.org/pdf/2410.15288'])  # This returns a list
pdf_paths = download_pdfs(['https://arxiv.org/pdf/2410.17220'])

# Pass the first PDF path to the pipeline
if pdf_paths:
    pdf_path = pdf_paths[0]  # Extract the single path from the list

    # Invoke the runnable chain on the single PDF path
    chunks = split_pdf_runnable.invoke(pdf_path)

else:
    print("No PDF files were downloaded.")


# 4 Extracting informations from chuncks

In [49]:
from langchain.prompts import PromptTemplate
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import HumanMessage
from typing import List
# Initialize the LLM
llm = ChatGoogleGenerativeAI(
    model="models/gemini-1.0-pro",
    temperature=0.3,
    google_api_key="YOUR_API_KEY"
)
# Step 1: Create a Prompt Template for Information Extraction
extract_info_prompt = """
Analyze the following section of a scientific document and extract key information:
1. Title (if present)
2. Authors (if present)
3. Main research questions/objectives
4. Methodology details
5. Key findings and results
6. Conclusions or implications
7. Important citations or references
8. Technical terms and their definitions

If certain information is not present in this section, focus on extracting what is available.

Text: {text}

Provide the information in a structured format.
"""
# Prompt Template for the LLM
prompt_template = PromptTemplate(input_variables=["text"], template=extract_info_prompt)
# Create a function to invoke the LLM for each chunk
def extract_information_from_chunk(chunk: str) -> str:
    # Format the prompt
    formatted_prompt = prompt_template.format(text=chunk)
    # Create a message
    messages = [HumanMessage(content=formatted_prompt)]
    # Get the response
    response = llm.invoke(messages)
    # Return the response content
    return response.content

# Step 2: Turn the function into a RunnableLambda
from langchain.schema.runnable import RunnableLambda
extract_info_runnable = RunnableLambda(extract_information_from_chunk)
# Step 3: Apply the LLM to each chunk
def process_chunks(chunks: List[str]) -> List[str]:
    return [extract_info_runnable.invoke(chunk) for chunk in chunks]
# Use the function
extracted_info = process_chunks(chunks)

# 5 Merging the chuncks proceeded <br> and saving them in JSON file

In [50]:
import json
import re
from typing import List, Dict, Any

def parse_chunk(text: str) -> Dict[str, Any]:
    """Parse a single chunk of text into a dictionary of paper information."""
    info_dict = {
        "Title": None,
        "Authors": None,
        "Research Questions": None,
        "Methodology": None,
        "Findings": None,
        "Conclusions": None,
        "Citations": None,
        "Technical Terms": None
    }
    
    # Define mapping of possible section headers to standardized keys
    header_mapping = {
        "title": "Title",
        "authors": "Authors",
        "main research questions/objectives": "Research Questions",
        "methodology details": "Methodology",
        "key findings and results": "Findings",
        "conclusions or implications": "Conclusions",
        "important citations or references": "Citations",
        "technical terms and their definitions": "Technical Terms"
    }
    
    current_section = None
    current_content = []
    
    # Split text into lines and process
    lines = text.split('\n')
    for line in lines:
        line = line.strip()
        if not line:
            continue
            
        # Remove bold markers and numbers at start
        clean_line = re.sub(r'\*\*|\d+\.\s*|\d+\:\s*', '', line).strip()
        lower_clean = clean_line.lower()
        
        # Check if this is a section header
        found_header = False
        for header_pattern, standard_key in header_mapping.items():
            if lower_clean.startswith(header_pattern):
                # Save previous section content if exists
                if current_section and current_content:
                    content_text = ' '.join(current_content).strip()
                    if content_text:
                        info_dict[current_section] = content_text
                
                # Start new section
                current_section = standard_key
                current_content = []
                found_header = True
                
                # Get any content after the header
                header_content = clean_line[len(header_pattern):].strip()
                if header_content and header_content not in [':', '-']:
                    current_content.append(header_content)
                break
        
        # If not a header, append to current section content
        if not found_header and current_section:
            current_content.append(clean_line)
    
    # Save the last section
    if current_section and current_content:
        content_text = ' '.join(current_content).strip()
        if content_text:
            info_dict[current_section] = content_text
    
    return info_dict

def consolidate_chunks(chunks: List[str]) -> Dict[str, Any]:
    """Consolidate all chunks into a single dictionary with combined content."""
    all_content = {
        "Title": [],
        "Authors": [],
        "Research Questions": [],
        "Methodology": [],
        "Findings": [],
        "Conclusions": [],
        "Citations": [],
        "Technical Terms": []
    }
    
    # Process each chunk
    for chunk in chunks:
        if not chunk or not isinstance(chunk, (str, bytes)):
            continue
            
        chunk_info = parse_chunk(str(chunk))
        
        # Add non-None values to respective lists
        for key, value in chunk_info.items():
            if value is not None:
                all_content[key].append(value)
    
    # Combine content and create final output
    final_output = {}
    
    for key, values in all_content.items():
        if values:  # Only include non-empty lists
            # Join all values with proper separation
            if key == "Authors":
                # Split authors by common separators and create unique list
                author_list = []
                for author_text in values:
                    authors = re.split(r'[,;]\s*|\s+and\s+|\s*\|\s*', author_text)
                    author_list.extend([a.strip() for a in authors if a.strip()])
                final_output[key] = list(dict.fromkeys(author_list))
            else:
                # Combine other fields with proper separation
                combined = " | ".join(values)
                if combined.strip():
                    final_output[key] = combined
    
    return final_output

def process_and_save_json(extracted_info: List[str], output_file: str = "document_analysis2.json") -> Dict[str, Any]:
    """Process extracted information and save as JSON."""
    # Consolidate all chunks
    consolidated_data = consolidate_chunks(extracted_info)
    
    # Save to JSON file
    with open(output_file, 'w', encoding='utf-8') as f:
        json.dump(consolidated_data, f, indent=2, ensure_ascii=False)
    
    return consolidated_data

result = process_and_save_json(extracted_info)