In [1]:
import os
import sys
from pathlib import Path

from dotenv import load_dotenv

In [2]:
project_root = Path.cwd().parent
sys.path.append(str(project_root))

load_dotenv(project_root / '.env')

True

In [3]:
GUIDELINE_PATH = "../resources/original/guideline.pdf"
API_SPEC_PATH = "../resources/original/api_specification.pdf"

In [4]:
BASE_DIR = Path.cwd().parent
RESOURCES_DIR = BASE_DIR / "resources"
ORIGINAL_DIR = RESOURCES_DIR / "original"
PROCESSED_DIR = RESOURCES_DIR / "processed"

def get_file_paths(doc_type):
    original_pdf = ORIGINAL_DIR / f"{doc_type}.pdf"
    processed_doc_dir = PROCESSED_DIR / doc_type
    return {
        "original_pdf": original_pdf,
        "processed_dir": processed_doc_dir,
        "splitted_pdfs": processed_doc_dir / "splitted_pdfs",
        "analyze_request_info": processed_doc_dir / "analyze_request_info.json",
        "analyzed_jsons": processed_doc_dir / "analyzed_jsons"
    }

GUIDELINE_PATHS = get_file_paths("guideline")
API_SPEC_PATHS = get_file_paths("api_specification")

print(f"GUIDELINE_PATHS: {GUIDELINE_PATHS}")
print(f"API_SPEC_PATHS: {API_SPEC_PATHS}")

GUIDELINE_PATHS: {'original_pdf': PosixPath('/Users/gabriela/src/workspace/rag-agent/resources/original/guideline.pdf'), 'processed_dir': PosixPath('/Users/gabriela/src/workspace/rag-agent/resources/processed/guideline'), 'splitted_pdfs': PosixPath('/Users/gabriela/src/workspace/rag-agent/resources/processed/guideline/splitted_pdfs'), 'analyze_request_info': PosixPath('/Users/gabriela/src/workspace/rag-agent/resources/processed/guideline/analyze_request_info.json'), 'analyzed_jsons': PosixPath('/Users/gabriela/src/workspace/rag-agent/resources/processed/guideline/analyzed_jsons')}
API_SPEC_PATHS: {'original_pdf': PosixPath('/Users/gabriela/src/workspace/rag-agent/resources/original/api_specification.pdf'), 'processed_dir': PosixPath('/Users/gabriela/src/workspace/rag-agent/resources/processed/api_specification'), 'splitted_pdfs': PosixPath('/Users/gabriela/src/workspace/rag-agent/resources/processed/api_specification/splitted_pdfs'), 'analyze_request_info': PosixPath('/Users/gabriela/s

In [5]:
UPSTAGE_INFERENCE_URL = "https://ocr-demo.upstage.ai/api/layout-analysis/inference"
UPSTAGE_RESULT_BASE_URL = "https://ocr-demo.upstage.ai/api/result/"

In [6]:
upstage_api_headers = {"Accept": "*/*",
                       "origin": "https://d3tgkvf102zvh7.cloudfront.net",
                       "priority": "u=1, i",
                       "referer": "https://d3tgkvf102zvh7.cloudfront.net/",
                       "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36"
                       }

In [7]:
from langgraph.graph import END, StateGraph
from langgraph.checkpoint.memory import MemorySaver

from ingestion.nodes import pdf
from ingestion.nodes import layout
from ingestion.nodes import summary
from ingestion.nodes import elements
from ingestion.states import FileState


init_pdf_node = pdf.InitPDFNode(verbose=True)
pdf_split_node = pdf.SplitPDFNode(batch_size=1, verbose=True)
layout_node = layout.LayoutNode(verbose=True)
page_elements_extractor_node = elements.ElementsNode(verbose=True)
image_cropper_node = elements.ImageCropperNode(verbose=True)
table_cropper_node = elements.TableCropperNode(verbose=True)
text_extractor_node = elements.ExtractTextNode(verbose=True)
page_summary_node = summary.PageSummaryNode(
    api_key=os.getenv("OPENAI_API_KEY"),
    verbose=True
)
image_summary_node = summary.ImageSummaryNode(
    api_key=os.getenv("OPENAI_API_KEY"),
    verbose=True
)
table_summary_node = summary.TableSummaryNode(
    api_key=os.getenv("OPENAI_API_KEY"),
    verbose=True
)
table_transformer_node = elements.TableMarkdownExtractorNode(verbose=True)

workflow = StateGraph(FileState)

workflow.add_node("init_pdf_node", init_pdf_node)
workflow.add_node("pdf_split_node", pdf_split_node)
workflow.add_node("layout_node", layout_node)
workflow.add_node("page_element_extractor_node", page_elements_extractor_node)
workflow.add_node("image_cropper_node", image_cropper_node)
workflow.add_node("table_cropper_node", table_cropper_node)
workflow.add_node("text_extractor_node", text_extractor_node)
workflow.add_node("page_summary_node", page_summary_node)
workflow.add_node("image_summary_node", image_summary_node)
workflow.add_node("table_summary_node", table_summary_node)
workflow.add_node("table_transformer_node", table_transformer_node)

workflow.add_edge("init_pdf_node", "pdf_split_node")
workflow.add_edge("pdf_split_node", "layout_node")
workflow.add_edge("layout_node", "page_element_extractor_node")
workflow.add_edge("page_element_extractor_node", "image_cropper_node")
workflow.add_edge("page_element_extractor_node", "table_cropper_node")
workflow.add_edge("page_element_extractor_node", "text_extractor_node")
workflow.add_edge("image_cropper_node", "page_summary_node")
workflow.add_edge("table_cropper_node", "page_summary_node")
workflow.add_edge("text_extractor_node", "page_summary_node")

workflow.add_edge("page_summary_node", "image_summary_node")
workflow.add_edge("page_summary_node", "table_summary_node")
workflow.add_edge("image_summary_node", END)
workflow.add_edge("table_summary_node", "table_transformer_node")
workflow.add_edge("table_transformer_node", END)
workflow.set_entry_point("init_pdf_node")

memory_saver = MemorySaver()
app = workflow.compile(checkpointer=memory_saver)

In [8]:
from langgraph.errors import GraphRecursionError
from langchain_core.runnables import RunnableConfig

In [9]:
guideline_state = FileState(file_paths=GUIDELINE_PATHS)
api_spec_state = FileState(file_paths=API_SPEC_PATHS)

guideline_config = RunnableConfig({"thread_id": "ingestion-guideline"})
api_spec_config = RunnableConfig({"thread_id": "ingestion-api-specification"})

def run_workflow(app, input_state, config):
    try:
        result = app.invoke(input_state, config=config)
        print(f"Completed: {config['thread_id']}")
        return result
    except GraphRecursionError as e:
        print(e)
        return None

guideline_result = run_workflow(app, guideline_state, guideline_config)
api_spec_result = run_workflow(app, api_spec_state, api_spec_config)

2024-10-20 20:44:51,132 - ingestion - INFO - {
  "class": "InitPDFNode",
  "message": "InitPDFNode execution completed",
  "extra": {
    "file_basename": "guideline",
    "file_type": "pdf"
  }
}
2024-10-20 20:44:51,382 - ingestion - INFO - {
  "class": "SplitPDFNode",
  "message": "analyze_request_info file already exists: /Users/gabriela/src/workspace/rag-agent/resources/processed/guideline/analyze_request_info.json"
}
2024-10-20 20:44:51,383 - ingestion - INFO - {
  "class": "SplitPDFNode",
  "message": "SplitPDFNode execution completed",
  "extra": {
    "num_total_page": 136,
    "num_split_files": 136
  }
}
2024-10-20 20:44:51,385 - ingestion - INFO - {
  "class": "LayoutNode",
  "message": "LayoutNode execution completed",
  "extra": {
    "total_requests": 136,
    "sucessed_requests": 136,
    "succed_get_result_requests": 136
  }
}
2024-10-20 20:44:51,569 - ingestion - INFO - {
  "class": "ExtractTextNode",
  "message": "ExtractTextNode execution completed",
  "extra": {
   

In [10]:
from ingestion.utils.state_serializer import(
    save_state,
    load_state
)

save_state(guideline_result)
loaded_guideline_state = load_state(GUIDELINE_PATHS["processed_dir"] / "guideline.pkl")

save_state(api_spec_result)
loaded_api_spec_state = load_state(API_SPEC_PATHS["processed_dir"] / "api_specification.pkl")