<src img='https://raw.githubusercontent.com/Evogelpohl/linkArtifacts/main/pdf_openai.png'>

<p align="center">
  <img src='https://raw.githubusercontent.com/Evogelpohl/linkArtifacts/main/dod-qa-sum-logo2.png'>
</p>


## Change to `True` for those processes to initiate.

In [1]:
process_index_rebuild = True
process_pdf_ocr = True

## Install packages

In [2]:
pip install -q pdf2image pytesseract reportlab pinecone-client Pillow gradio langchain

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 23.0.1 -> 23.1.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [3]:
#Lets define some key variable for the PDF OCR'ing process
#Configure paths and variables

from pathlib import Path

TESSERACT_PATH = r"C:\Program Files\Tesseract-OCR\tesseract.exe" #tesseract must be installed separately
POPPLER_PATH = r"C:\Program Files\poppler-23.01.0\Library\bin" #poppler must be installed separately

LLM_DIRECTORY = Path(r"C:\Temp\pdf_to_openai_chat")
LLM_DIRECTORY.mkdir(parents=True, exist_ok=True)

# Store the PDFs you want processed by this solution in this folder, as a *.pdf file only:
SRC_PDFS_DIRECTORY = LLM_DIRECTORY / "src_pdfs"

PAGES_DIRECTORY = LLM_DIRECTORY / "pages"
PAGES_DIRECTORY.mkdir(parents=True, exist_ok=True)

TEXT_OUTPUT_DIRECTORY = LLM_DIRECTORY / "text_output"
TEXT_OUTPUT_DIRECTORY.mkdir(parents=True, exist_ok=True)

## PDF image OCR process, saving the results to a text file

In [4]:
import platform
import pytesseract
from PIL import Image, ImageOps
from pdf2image import convert_from_path
import os

if platform.system() == "Windows":
    pytesseract.pytesseract.tesseract_cmd = TESSERACT_PATH

def create_page_images_pdf2image(pdf_path, output_directory, dpi=300):
    """
    Convert a PDF file into a list of JPEG images, one for each page.
    
    :param pdf_path: Path to the input PDF file.
    :param output_directory: Directory to save the generated JPEG images.
    :param dpi: DPI for the generated images.
    :return: A list of file paths for the generated images.
    """
    image_file_list = []
    convert_args = {"pdf_path": pdf_path, "dpi": dpi}

    if platform.system() == "Windows":
        convert_args["poppler_path"] = POPPLER_PATH

    pdf_pages = convert_from_path(**convert_args)

    for page_number, page in enumerate(pdf_pages, start=1):
        output_file = output_directory / f"page_{page_number:03}.jpg"
        page.save(output_file, "JPEG")
        image_file_list.append(output_file)
        #print(f"Image created: {output_file}")

    return image_file_list


def convert_to_bw(image_list):
    """
    Convert a list of images to black and white.
    
    :param image_list: List of input image file paths.
    """
    for image_file in image_list:
        image = Image.open(image_file)
        gray_image = ImageOps.grayscale(image)
        bw_image = gray_image.point(lambda x: 0 if x < 128 else 255, '1')
        bw_image.save(image_file)


def ocr_images(image_list, text_output_path):
    """
    Perform OCR on a list of images and append the extracted text to a file.
    
    :param image_list: List of input image file paths.
    :param text_output_path: File path to save the extracted text.
    """
    with open(text_output_path, "a") as output_file:
        for image_file in image_list:
            text = str(((pytesseract.image_to_string(Image.open(image_file)))))
            text = text.replace("-\n", "")
            output_file.write(text)
            #print(f"OCR processed: {image_file}")


if process_pdf_ocr:
    pdf_files = SRC_PDFS_DIRECTORY.glob("*.pdf")

    for pdf_file in pdf_files:
        pdf_name = pdf_file.stem
        
        pages_subdir = PAGES_DIRECTORY / pdf_name
        pages_subdir.mkdir(parents=True, exist_ok=True)
        
        image_file_list = create_page_images_pdf2image(pdf_file, pages_subdir, dpi=300)
        print(f'Completed PDF2Image Page Creation for {pdf_file}')
        
        convert_to_bw(image_file_list)
        print(f'Completed BWConvert for {pdf_file}')
        
        text_output_file = TEXT_OUTPUT_DIRECTORY / f"{pdf_name}_text.txt"
          
        ocr_images(image_file_list, text_output_file)
        print(f'Completed OCRing file {pdf_file}')


### Clean some line-returns out of the text for better readability

In [None]:
import os
import re

TEXT_OUTPUT_DIRECTORY = r"C:\Temp\pdf_to_openai_chat\text_output"

def clean_text_file(input_file, output_file):
    with open(input_file, "r") as f:
        text = f.read()

    cleaned_text = re.sub(r"(?<!\n)\n(?!\n)", " ", text)

    with open(output_file, "w") as f:
        f.write(cleaned_text)

if process_pdf_ocr:
    for filename in os.listdir(TEXT_OUTPUT_DIRECTORY):
        if filename.endswith(".txt"):
            input_file = os.path.join(TEXT_OUTPUT_DIRECTORY, filename)
            output_file = os.path.join(TEXT_OUTPUT_DIRECTORY, f"{os.path.splitext(filename)[0]}_cleaned.txt")
            clean_text_file(input_file, output_file)
            os.remove(input_file)


## Load our data (the process of OCR)

In [None]:
import os
from langchain.document_loaders import TextLoader

# loop through all the files in the directory
for filename in os.listdir(TEXT_OUTPUT_DIRECTORY):
    if filename.endswith(".txt"):
        filepath = os.path.join(TEXT_OUTPUT_DIRECTORY, filename)
        print(f"Processing file: {filepath}")
        
        # load the text file using the TextLoader
        loader = TextLoader(filepath)
        data = loader.load()
        
        print (f'You have {len(data)} document(s) in your data')
        print (f'There are {len(data[0].page_content)} characters in your document')
        
        # # print each metadata key and their values
        # for key, value in data[0].metadata.items():
        #     print(f'metadata: {key} = {value}')



def data_doc_summerizer(docs):
    print (f'You have {len(docs)} document(s)')
    
    num_words = sum([len(doc.page_content.split(' ')) for doc in docs])
    
    print (f'You have roughly {num_words} words in your docs')
    print ()
    print (f'Preview: \n{docs[0].page_content.split(". ")[0]}')

data_doc_summerizer(data)



In [None]:
from langchain.document_loaders import TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter


chunking_size = 1536
namespace_chunk_name = f'chunk_size_{chunking_size}'

data = []  # list to store the loaded data for all text files
text_chunks = []  # list to store the split text chunks for all text files

# loop through all the files in the directory
for filename in os.listdir(TEXT_OUTPUT_DIRECTORY):
    if filename.endswith(".txt"):
        filepath = os.path.join(TEXT_OUTPUT_DIRECTORY, filename)
        print(f"Processing file: {filepath}")
        
        # load the text file using the TextLoader
        loader = TextLoader(filepath)
        loaded_data = loader.load()
        
        # add the loaded data to the list
        data.extend(loaded_data)
        
        # split the page content of the loaded data into smaller chunks
        text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunking_size, chunk_overlap=20)
        text_chunks.extend(text_splitter.split_documents(loaded_data))
        
# print some information about the loaded data and text chunks
print(f'Loaded {len(data)} documents from {len(os.listdir(TEXT_OUTPUT_DIRECTORY))} text files')
print(f'Got {len(text_chunks)} text chunks in total')

# # example loop to print metadata for each document in the data list
# for document in data:
#     print(f"Metadata for document in {document.metadata['source']}:\n{document.metadata}\n")

# # example loop to print the length of each text chunk
# for i, chunk in enumerate(text_chunks):
#     print(f"Length of text chunk {i}: {len(chunk.page_content)}")


## Create the embeddings of our documents

In [None]:
import os
from getpass import getpass

# We need to get the OpenAI or Azure OpenAI API key. This is how we use & get charged for LLM usage
if "OPENAI_API_KEY" in os.environ:
    OPENAI_API_KEY = os.environ["OPENAI_API_KEY"]
else:
    OPENAI_API_KEY = getpass("Enter your OpenAI API Key: ")
    os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY


In [None]:
# Pinecone is a service that will take the documents that you split
# And store the embedding vectors (a math construct that tells LLMs where in the model to find similar words)
# Currently, Pinecone is free for use cases like this. Other vector stores exist; FAISS, ChromaDB, etc.

import os
from getpass import getpass

PINECONE_API_ENV = "us-central1-gcp"

# We need to get the Pinecone API key.
if "PINECONE_API_KEY" in os.environ:
    PINECONE_API_KEY = os.environ["PINECONE_API_KEY"]
else:
    PINECONE_API_KEY = getpass("Enter your Pinecone API Key: ")
    os.environ["PINECONE_API_KEY"] = PINECONE_API_KEY


In [None]:
import pinecone

# Initialize pinecone
pinecone.init(
    api_key=PINECONE_API_KEY,
    environment=PINECONE_API_ENV
)

index_name = 'aaa-reports-002'

# Remove our Pinecone Index if it exists and create a new one.
if process_index_rebuild:
    try:
        pinecone.delete_index(name=index_name)
    except:
        print(f"The index {index_name} does not exist.")
    # Create an index (a database) for our embeddings
    pinecone.create_index(name=index_name, dimension=1536, metric="cosine")
    print(f"The index {index_name} was created.")


In [None]:
from langchain.vectorstores import Pinecone
from langchain.embeddings.openai import OpenAIEmbeddings
import pinecone
import os

# Let's create the embeddings (vector math pointers of our docs) using OpenAI's Embeddings Creator model
embeddings = OpenAIEmbeddings(openai_api_key=OPENAI_API_KEY)

# Initialize pinecone
pinecone.init(
    api_key=PINECONE_API_KEY,
    environment=PINECONE_API_ENV
)

# create an empty list to store the metadata for each document
metadata_list = []

# loop through the text_chunks and set the metadata for each document
for i, chunk in enumerate(text_chunks):
    # get the filename of the source text file from the metadata of the first page in the chunk
    filename = chunk.metadata['source'].split(os.sep)[-1].split('.')[0]
    # set the sources metadata key to the filename
    metadata = {"source": filename}
    # add the metadata dictionary to the metadata_list
    metadata_list.append(metadata)

# Create the docsearch which both builds the index and provides us an object to use the index.
docsearch = Pinecone.from_texts([t.page_content for t in text_chunks], embeddings, metadatas=metadata_list, index_name=index_name, namespace=namespace_chunk_name)


## Initialize the User Interface

In [None]:
import gradio as gr
from langchain.llms import OpenAI
from langchain.chains.question_answering import load_qa_chain
from langchain.chains.summarize import load_summarize_chain
from langchain.prompts import PromptTemplate

# Load the pre-trained question-answering LLM model using the langchain library
llm = OpenAI(temperature=0.1, openai_api_key=OPENAI_API_KEY, max_tokens=1024)

# Create a list of unique document titles from the metadata of the text chunks
doc_titles = list(set(metadata['source'] for metadata in metadata_list))

# Define custom prompt template: QA-STUFF Prompt
qa_stuff_prompt_template = """You are a military specialist. Use the following pieces of context to answer the question at the end. If you don't know the answer, just say that you don't know, don't try to make up an answer.

{context}

Question: {question}
Answer as a military expert in after-action reports.:
"""
QA_STUFF_PROMPT = PromptTemplate(
    template=qa_stuff_prompt_template, input_variables=["context", "question"]
)

# Define custom prompt template: SUMMARIZE-MAP_REDUCE Prompt
summarize_mr_prompt_template = """Write a summary of the following:


{text}

"""
SUMMARIZE_MR_PROMPT = PromptTemplate(template=summarize_mr_prompt_template, input_variables=["text"]
)

# Create our CHAIN object for QA_STUFFed Prompts
chain_qa = load_qa_chain(llm=llm, chain_type="stuff", prompt=QA_STUFF_PROMPT)

# Create our CHAIN object for SUMMARIZED_MR Prompts
chain_summarize = load_summarize_chain(llm=llm, chain_type="map_reduce", return_intermediate_steps=False, map_prompt=SUMMARIZE_MR_PROMPT, combine_prompt=SUMMARIZE_MR_PROMPT)

# Create our Function to process QA (mode) prompts
def qa_function(query, doc_title, k):

    # Look in the embeddings store for documents (splits of the orig text) that are similar to your question
    
    metadata_filter = {"source": doc_title}
    
    qa_docs_to_search_with_scores = docsearch.similarity_search_with_score(
        query=query, k=k, filter=metadata_filter, namespace="chunk_size_512"
    )

    # Separate the documents from their scores
    qa_docs_to_search = [doc for doc, score in qa_docs_to_search_with_scores]

    # Send the matching docs & our question to the LLM. It will return the answer below.
    output = chain_qa({"input_documents": qa_docs_to_search, "question": query}, return_only_outputs=True)

    # Extract the actual answer text from the output dictionary
    answer = output['output_text']

    # Extract the docs that the similarity_search found, scored
    truncated_docs_str = ""
    for index, (doc, score) in enumerate(qa_docs_to_search_with_scores):
        truncated_text = doc.page_content[:200].replace("\n", "")
        truncated_docs_str += f"Doc {index + 1} snippet (score: {score:.2f}): {truncated_text}\n------\n"

    return answer, truncated_docs_str


# Create our Function to process Summarization (mode) prompts
def summarization_function(query, doc_title, k):

    # Look in the embeddings store for documents (splits of the orig text) that are similar to your question
    
    metadata_filter = {"source": doc_title}
    
    sum_docs_to_search_with_scores = docsearch.similarity_search_with_score(
        query=query, k=k, filter=metadata_filter, namespace="chunk_size_512"
    )

    # Separate the documents from their scores
    sum_docs_to_search = [doc for doc, score in sum_docs_to_search_with_scores]

    # Send the matching docs & our question to the LLM. It will return the answer below.
    output = chain_summarize({"input_documents": sum_docs_to_search}, return_only_outputs=False)

    # Extract the actual answer text from the output dictionary
    answer = output['output_text']

    # Extract the docs that the similarity_search found, scored
    truncated_docs_str = ""
    for index, (doc, score) in enumerate(sum_docs_to_search_with_scores):
        truncated_text = doc.page_content[:200].replace("\n", "")
        truncated_docs_str += f"Doc {index + 1} snippet (score: {score:.2f}): {truncated_text}\n------\n"

    return answer, truncated_docs_str


# Function to determine which Function to use: qa or summarization along with the vars needed (order must match the list order provided to gradio)
def process_input(mode, query, doc_title, k):
    if mode == "Question/Answer":
        return qa_function(query, doc_title, k)
    elif mode == "Summarization":
        return summarization_function(query, doc_title, k)
    else:
        return "Invalid mode selected", ""


# The main function to launch the gradio interface
def main():
    with gr.Interface(
        fn=process_input,
        inputs=[
            gr.Radio(choices=["Question/Answer", "Summarization"], label="Model Interaction Mode", default="Summarization"),
            gr.Textbox(lines=1, label="Question"),
            gr.Dropdown(choices=doc_titles, label="Filter by AAR title", info="Filter by after-action report name"),
            gr.Slider(3, 30, step=1, value=5, label="Docs to Search", info="Number of AAR document 'chunks' to search and summarize"),
        ],
        outputs=[
            gr.Textbox(label="Answer", lines=10),
            gr.Textbox(label="Docs presented to the LLM", info="A list of document 'chunks' from the AAR with its question-similarity score."),
        ],
        title="After-Action Report ChatGPT",
        examples=[
            ["Question/Answer", "Echeloning and it's use in Desert Shield and Storm.", "aar-desertStorm_text_cleaned"],
            ["Question/Answer", "What were the DDS problems?", "aar-desertStorm_text_cleaned"],
            ["Question/Answer", "Tell me about Brown and Root Corporation.", "aar-Somalia_text_cleaned"],
            ["Question/Answer", "Can you classify the US military equipment, like tanks and helicopters used in Somalia by their type? Respond in a table.", "aar-Somalia_text_cleaned"],
            ["Question/Answer", "List the battalions involved with Somalia. Confirm your list refers to US military battalions only.", "aar-Somalia_text_cleaned"],
            ["Summarization", "Brown and Root", "aar-Somalia_text_cleaned"],
            ["Summarization", "Issues related to SEABEE uniforms.", "aar-desertStorm_text_cleaned"],
            ["Question/Answer", "Tell me about Osman Atto", "aar-Somalia_text_cleaned"],
            ["Question/Answer", "Based on what you know about Osman Atto, write a briefing to US Command officials.", "aar-Somalia_text_cleaned"],
            ["Summarization", "Response to Hurrican Sandy", "aar-hurricanSandy_text_cleaned"],
            ["Summarization", "UNOSOM II", "aar-Somalia_text_cleaned"],
            ["Question/Answer", "List the key US military equipment deployed in Somalia, as a bulleted list.","aar-Somalia_text_cleaned"],
            ["Question/Answer", "How long did the air war last?", "aar-desertStorm_text_cleaned"]
        ],
    ) as iface:
        iface.launch()

if __name__ == '__main__':
    main()

