# Setup Gemini

In [None]:
%%capture
%pip install -q -U google-generativeai

In [None]:
import google.generativeai as genai
from IPython.display import Markdown

In [None]:
import os

genai.configure(api_key=os.getenv('GOOGLE_API_KEY'))

In [None]:
# Choose a Gemini API model.
model = genai.GenerativeModel(model_name="gemini-1.5-pro-latest")

# Process all files with unstructured

In [None]:
import os
path = os.getcwd()
parent_directory = os.path.abspath(os.path.join(path, os.pardir))
input_dir = parent_directory + "/Doc_Panthera"
output_dir_base = parent_directory + "/Doc_Panthera_Augmented"
print("Filepath:", input_dir)
print("Output path:", output_dir_base)

In [None]:
import os
import csv
import json
from unstructured.partition.pdf import partition_pdf

# -------------------------------
# Configuration and Setup
# -------------------------------

# Define the types of elements to exclude
excluded_values = ["Header", "Footer", "PageNumber"]

# Get the current working directory
current_path = os.getcwd()

# Determine the parent directory
parent_directory = os.path.abspath(os.path.join(current_path, os.pardir))

# Define input and output directories
input_dir = os.path.join(parent_directory, "Doc_Panthera")
output_dir_base = os.path.join(parent_directory, "Doc_Panthera_Augmented")

# Print input and output paths for verification
print("Input Directory:", input_dir)
print("Output Directory:", output_dir_base)

# Create the base output directory if it doesn't exist
os.makedirs(output_dir_base, exist_ok=True)

# Define the path for the CSV output
csv_output_path = os.path.join(output_dir_base, "extracted_texts.csv")

# -------------------------------
# CSV Setup
# -------------------------------

# Open the CSV file for writing extracted texts
with open(csv_output_path, mode="w", encoding="utf-8", newline="") as csv_file:
    csv_writer = csv.writer(csv_file, delimiter="|")
    # Write the header row
    csv_writer.writerow(["File Name", "Extracted Text"])

    # -------------------------------
    # Directory Traversal and PDF Processing
    # -------------------------------

    # Walk through the input directory and its subdirectories
    for root, _, files in os.walk(input_dir):
        for filename in files:
            # Process only PDF files (case-insensitive)
            if filename.lower().endswith(".pdf"):
                pdf_path = os.path.join(root, filename)
                file_name_without_extension = os.path.splitext(filename)[0]

                print(f"\nProcessing PDF: {filename}")

                # Define a specific output directory for images from this PDF
                pdf_output_dir = os.path.join(output_dir_base, f"{file_name_without_extension}_Images")
                os.makedirs(pdf_output_dir, exist_ok=True)

                # -------------------------------
                # PDF Parsing and Element Extraction
                # -------------------------------

                # Extract elements from the PDF
                elements = partition_pdf(
                    pdf_path,
                    extract_images_in_pdf=True,
                    strategy="hi_res",
                    languages=['ita'],  # Specify languages as needed
                    extract_image_block_output_dir=pdf_output_dir
                )

                # Initialize lists and dictionaries to store processed data
                processed_elements = []
                image_metadata = {}

                # Iterate over each extracted element
                for el in elements:
                    el_dict = el.to_dict()  # Convert element to a dictionary
                    el_type = el_dict.get("type", None)  # Get the type of the element

                    # -------------------------------
                    # Filtering Unwanted Elements
                    # -------------------------------

                    if el_type in excluded_values:
                        print(el)
                        # Skip unwanted elements
                        continue

                    # -------------------------------
                    # Handling Image Elements
                    # -------------------------------
                    if el_type == "Image":
                        # Retrieve the image filename from metadata
                        image_filename = el_dict.get("metadata", {}).get("filename")
                        image_path = os.path.join(pdf_output_dir, image_filename) if image_filename else None

                        # Create a placeholder for the image in the text output
                        placeholder = f"[IMAGE: {el_dict['element_id']}]"
                        processed_elements.append(placeholder)

                        # Store image metadata for future reference
                        image_metadata[el_dict['element_id']] = {
                            "metadata": el_dict,
                            "image_path": image_path,
                        }
                    else:
                        # -------------------------------
                        # Handling Text Elements
                        # -------------------------------
                        text = el_dict.get("text", "").strip()
                        if text:  # Ensure that empty strings are not added
                            processed_elements.append(text)

                # -------------------------------
                # Combining and Saving Extracted Data
                # -------------------------------

                # Combine all processed elements into a single text block
                output_text = "\n".join(processed_elements)

                # Define the path for the image metadata JSON file
                image_metadata_path = os.path.join(output_dir_base, f"{file_name_without_extension}_image_metadata.json")
                with open(image_metadata_path, "w", encoding="utf-8") as f:
                    json.dump(image_metadata, f, indent=2)

                # -------------------------------
                # Writing to CSV and Text Files
                # -------------------------------

                # Write the filename and extracted text to the CSV file
                csv_writer.writerow([filename, output_text])

                # Define the path for the processed text output
                text_output_path = os.path.join(output_dir_base, f"{file_name_without_extension}_processed_output.txt")
                with open(text_output_path, "w", encoding="utf-8") as f:
                    f.write(output_text)

                print(f"Processed and saved: {filename}")

# -------------------------------
# Completion Message
# -------------------------------

print(f"\nProcessing complete. Extracted texts saved to {csv_output_path}.")

# Image description generation

In [None]:
import os
from PIL import Image

def process_images_and_get_descriptions(directory_path, model):
    """
    Processes each .jpg image in the specified folder, opens it using PIL, 
    and calls the LLM model to get a description.

    Args:
    - directory_path: The path to the folder containing images.
    - model: The LLM model that will generate descriptions for images.

    Returns:
    - descriptions: A dictionary where the keys are image filenames 
      and the values are the generated descriptions.
    """
    descriptions = {}

    # Loop through the directory and find all .jpg files
    for filename in os.listdir(directory_path):
        if filename.lower().endswith('.jpg'):
            image_path = os.path.join(directory_path, filename)

            # Open the image using PIL
            try:
                with Image.open(image_path) as img:
                    # Generate description using the LLM model
                    print(f"Processing image: {filename}")
                    response = model.generate_content(
                        [image_path, "Descrivi la parte principale della finestra, nell'immagine fornita."]
                    )
                    
                    # Extract description from the model's response
                    description = response.text
                    descriptions[filename] = description
            except Exception as e:
                print(f"Error processing {filename}: {e}")
                descriptions[filename] = "Error processing image"

    return descriptions

In [None]:
def extract_image_path_and_id_from_element(element):
    """
    Extracts the nested 'image_path' for .jpg files and the 'element_id' from a given element.

    Args:
    - element: A dictionary representing an element containing metadata.

    Returns:
    - (image_path, element_id): A tuple containing the .jpg image path and element id if present, else None for each.
    """
    try:
        # Extract 'element_id'
        element_id = element.get("metadata", {}).get("element_id")

        # Extract nested 'image_path' for the .jpg files
        nested_metadata = element.get("metadata", {}).get("metadata", {})
        image_path = nested_metadata.get("image_path")

        # Ensure the extracted path is a .jpg
        if image_path and image_path.endswith(".jpg"):
            return image_path, element_id
        else:
            print(f"Warning: No .jpg image_path found for element_id {element_id}")
    except Exception as e:
        print(f"Error processing element: {element}. Error: {e}")
    return None


def extract_image_paths_and_ids(metadata_file):
    """
    Extracts the .jpg 'image_path' and 'element_id' for each element in the image metadata JSON file.

    Args:
    - metadata_file: The path to the image metadata JSON file.

    Returns:
    - image_data: A list of tuples containing the .jpg image path and element id.
    """
    image_data = []

    try:
        # Load the image metadata from the JSON file
        with open(metadata_file, "r", encoding="utf-8") as f:
            image_metadata = json.load(f)

        # Iterate through the elements in the metadata and extract the .jpg image_path and element_id
        for element in image_metadata.values():
            data = extract_image_path_and_id_from_element(element)
            if data:
                image_data.append(data)
            else:
                print(f"Warning: No valid .jpg 'image_path' or 'element_id' found in element.")
    except Exception as e:
        print(f"Error reading metadata file: {e}")

    return image_data

In [None]:
import os
import json

def integrate_descriptions(metadata_file, descriptions, processed_output_path, final_output_path):
    """
    Integrates image descriptions into the text by replacing [IMAGE: element_id] placeholders.

    Args:
    - metadata_file: Path to the metadata JSON file containing image_path and element_id.
    - descriptions: Dictionary where keys are image filenames, and values are descriptions.
    - processed_output_path: Path to the text file containing placeholders.
    - final_output_path: Path to save the final text with descriptions integrated.

    Returns:
    - The final text with descriptions integrated.
    """
    # Step 1: Extract image paths and element IDs using the helper functions
    print("Extracting image paths and element IDs...")
    image_data = extract_image_paths_and_ids(metadata_file)

    # Step 2: Map element IDs to descriptions
    element_id_to_description = {}
    for image_path, element_id in image_data:
        filename = os.path.basename(image_path)  # Extract the filename
        description = descriptions.get(filename)  # Match filename to description
        if description:
            element_id_to_description[element_id] = description
        else:
            print(f"Warning: No description found for {filename}")

    # Step 3: Replace placeholders in processed output text
    print("\nReading processed output text...")
    with open(processed_output_path, "r", encoding="utf-8") as processed_file:
        processed_text = processed_file.read()

    print("Replacing placeholders...")
    unresolved_placeholders = []
    for element_id, description in element_id_to_description.items():
        placeholder = f"[IMAGE: {element_id}]"
        if placeholder in processed_text:
            processed_text = processed_text.replace(placeholder, description)
        else:
            unresolved_placeholders.append(placeholder)

    # Log unresolved placeholders
    if unresolved_placeholders:
        print("\nUnresolved Placeholders:")
        for placeholder in unresolved_placeholders:
            print(f"  {placeholder}")

    # Step 4: Write the final output to a file
    print(f"\nWriting final output to {final_output_path}...")
    with open(final_output_path, "w", encoding="utf-8") as final_file:
        final_file.write(processed_text)

    print("Descriptions successfully integrated.")
    return processed_text

In [None]:
import os
import json
from PIL import Image

output_dir_base = parent_directory + "/Doc_Panthera_Augmented"

# Iterate over processed metadata files
for root, dirs, files in os.walk(output_dir_base):
    for dir_name in dirs:
        if dir_name.endswith("_Images"):  # Look for directories storing images
            pdf_name = dir_name.replace("_Images", "")
            image_dir = os.path.join(root, dir_name)
            metadata_file = os.path.join(output_dir_base, f"{pdf_name}_image_metadata.json")
            processed_output_path = os.path.join(output_dir_base, f"{pdf_name}_processed_output.txt")
            final_output_path = os.path.join(output_dir_base, f"{pdf_name}_final_output.txt")

            # Check if the final output already exists
            if os.path.exists(final_output_path):
                print(f"Skipping {pdf_name}: Final output already exists at {final_output_path}.")
                continue

            # Skip if required files are missing
            if not (os.path.exists(metadata_file) and os.path.exists(processed_output_path)):
                print(f"Skipping {pdf_name}: Missing metadata or processed text file.")
                continue

            # Step 1: Generate descriptions for images in the directory
            print(f"Processing images in {image_dir}...")
            descriptions = process_images_and_get_descriptions(image_dir, model)

            # Step 2: Integrate descriptions into processed text
            print(f"Integrating descriptions into text for {pdf_name}...")
            try:
                integrate_descriptions(metadata_file, descriptions, processed_output_path, final_output_path)
                print(f"Descriptions integrated successfully for {pdf_name}.")
            except Exception as e:
                print(f"Error integrating descriptions for {pdf_name}: {e}")

print("All files processed.")

# Create a CSV for final augmented dataset

In [None]:
import os
import pandas as pd

# Initialize a list to store file information
data = []

# Iterate over all files in the directory
for root, dirs, files in os.walk(output_dir_base):
    for file in files:
        if file.endswith("_final_output.txt"):
            # Extract the original file name
            original_file = file.replace("_final_output.txt", "")

            # Read the contents of the final output file
            file_path = os.path.join(root, file)
            with open(file_path, 'r', encoding='utf-8') as f:
                text_content = f.read()

            # Append to the data list
            data.append({"FileName": original_file, "Text": text_content})

# Create a dataframe
df = pd.DataFrame(data)

# Save the dataframe to a CSV file
csv_output_path = os.path.join(output_dir_base, "augmented_dataset_final_outputs.csv")
df.to_csv(csv_output_path, index=False, encoding='utf-8')

print(f"Dataframe saved to {csv_output_path}")

In [None]:
display(df)

# Experiment the improvement obtained with the image description

In [None]:
from langchain.prompts import ChatPromptTemplate

# Prompt
template = """
Comportati come un assistente che risponde alle domande del cliente.
Rispondi alla domanda basandoti solo sui seguenti documenti: {context}

Rispondi in modo conciso e chiaro, spiegando passo passo al cliente le azioni necessarie da effettuare.
Se possibile, dai indicazioni dettagliate al cliente, su come risolvere il problema o effettuare l'azione desiderata.
Evita troppe ripetizioni nella risposta fornita.
Quando spieghi che cosa è o cosa significa un certo elemento richiesto, non parlarne come se fosse un problema.

In caso di più domande rispondi solo a quelle inerenti alla documentazione e rimani a disposizione per altre domande sull'argomento, specificando,
invece, che le altre domande non sono state trovate pertinenti in questo contesto.

Domanda relativa al software Panthera: {question}
"""

prompt = ChatPromptTemplate.from_template(template)
prompt

In [None]:
# Embedding model common to both solutions
from FlagEmbedding import BGEM3FlagModel
from langchain_community.vectorstores import FAISS

model_fp16 = BGEM3FlagModel('BAAI/bge-m3', use_fp16=True)

class M3EmbeddingFP16:
    def embed_documents(self, texts):
        return model_fp16.encode(texts)['dense_vecs']
    
    def __call__(self, texts):
        return self.embed_documents(texts)
    
embd = M3EmbeddingFP16()

In [None]:
from langchain_google_genai import ChatGoogleGenerativeAI

model_gemini = ChatGoogleGenerativeAI(
    model="gemini-1.5-pro-latest",
    temperature=0
)

In [None]:
# without augmented data
# Contains the documents without any data preprocessing steps
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser

vectorstore = FAISS.load_local("local_model_index", embd, allow_dangerous_deserialization=True)
print(vectorstore, vectorstore.index.ntotal)

retriever = vectorstore.as_retriever(search_kwargs={"k": 4})

# Post-processing
def baseline_format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

# Chain
basic_rag_chain = (
    {"context": retriever | baseline_format_docs, "question": RunnablePassthrough()}
    | prompt
    | model_gemini
    | StrOutputParser()
)

In [None]:
from pprint import pprint
question = "Come posso decidere se nel calcolo della percentuale di saturazione del contratto vadano considerate anche le quantità in previsione e come si aggiungono?"
print(basic_rag_chain.invoke(question))

In [None]:
# with augmented data
# Contains the documents without any data preprocessing steps
vectorstore_augmented = FAISS.load_local("augmented_faiss_index", embd, allow_dangerous_deserialization=True)
print(vectorstore_augmented, vectorstore_augmented.index.ntotal)

retriever_augmented = vectorstore_augmented.as_retriever(search_kwargs={"k": 4})

# Chain
basic_rag_chain_augmented = (
    {"context": retriever_augmented | baseline_format_docs, "question": RunnablePassthrough()}
    | prompt
    | model_gemini
    | StrOutputParser()
)

In [None]:
print(basic_rag_chain_augmented.invoke(question))