In [None]:
!apt-get install -y libmagic-dev poppler-utils tesseract-ocr
%pip install "unstructured[pdf]" unstructured langchain PyMuPDF openai langchain_openai

In [None]:
import openai
import os
from getpass import getpass
from langchain_openai import ChatOpenAI
os.environ["OPENAI_API_KEY"] = getpass()
openai.api_key = os.environ["OPENAI_API_KEY"]
chat = ChatOpenAI(model="gpt-3.5-turbo-0125")

## 本文を抽出

In [None]:
import time
from langchain_core.prompts import ChatPromptTemplate,MessagesPlaceholder
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_community.chat_message_histories import ChatMessageHistory
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_text_splitters import CharacterTextSplitter
import fitz  # PyMuPDF

def extract_text_from_pdf_range_and_save(pdf_path, output_file_path, start_page=None, end_page=None):
    # PDFファイルを開く
    doc = fitz.open(pdf_path)

    # テキストを保持するための空の文字列を初期化
    text = ""
      # start_pageとend_pageが指定されていない場合、全ページを対象にする
    if start_page is None or end_page is None:
        start_page = 0
        end_page = doc.page_count - 1

    # 指定されたページ範囲内の各ページを反復処理
    for page_num in range(start_page, end_page + 1):
        # ページオブジェクトを取得
        page = doc.load_page(page_num)

        # ページからテキストを抽出
        text += page.get_text()

    # PDFドキュメントを閉じる
    doc.close()

    # 抽出したテキストをテキストファイルに保存
    with open(output_file_path, 'w', encoding='utf-8') as f:
        f.write(text)

    print(f"テキストがファイルに保存されました: {output_file_path}")
    return text

def process_segments_with_langchain(segments):
  store = {}

  def get_session_history(session_id: str) -> BaseChatMessageHistory:
      if session_id not in store:
          store[session_id] = ChatMessageHistory()
      return store[session_id]

  system = (
    "You are an excellent text extractor. You remove headers, footers and figure captions that are not relevant to the body text. Don't skip the headline.The output is always the body text only, with no explanations."
  )
  human = "{text}"
  prompt = ChatPromptTemplate.from_messages([("system", system), MessagesPlaceholder(variable_name="chat_history"), ("human", human),])
   # RunnableWithMessageHistoryのインスタンスを作成
  runnable = prompt | chat
  with_message_history = RunnableWithMessageHistory(
      runnable,
      get_session_history,
      input_messages_key="text",
      history_messages_key="chat_history",
  )
  # 一意のセッションIDを指定
  session_id = "unique_session_id"
  processed_segments = []
  for i, segment in enumerate(segments):
    print("segment processed ....")

    response = with_message_history.invoke(
          {"text": segment},
          config={"configurable": {"session_id": session_id}},  # 一意のセッションIDを指定
      )
    processed_segments.append(response)
    store[session_id].clear()
    prememory=ChatMessageHistory()
    prememory.add_user_message(segment)
    prememory.add_ai_message(response.content)
    store[session_id] = prememory


  return processed_segments



In [None]:

output_file_path = './extracted_text.txt'  # 保存するテキストファイルのパス

preprocess_text=extract_text_from_pdf_range_and_save(pdf_path, output_file_path)

text_splitter = CharacterTextSplitter(
    separator="",
    chunk_size=4096,
    chunk_overlap=0,
    length_function=len,
    is_separator_regex=False,
)
segments = text_splitter.split_text(preprocess_text)
processed_segments = process_segments_with_langchain(segments)
final_text=""
for processed_segment in processed_segments:
  final_text+= processed_segment.content

# 処理されたテキストをファイルに保存
output_file_path = 'final_processed_text.txt'  # 保存先のファイルパス
with open(output_file_path, 'w', encoding='utf-8') as file:
    file.write(final_text)

print(f"処理されたテキストが保存されました: {output_file_path}")


## 画像・表・キャプションの抽出

In [None]:
from unstructured.partition.pdf import partition_pdf

def extract_images_and_captions(filename):
    """
    Extracts images and their captions from a given PDF file.

    :param filename: The path to the PDF file.
    :return: A tuple containing two lists - the first for images and the second for captions.
    """
    elements = partition_pdf(filename=filename, strategy="hi_res",
                             extract_images_in_pdf=True,
                             extract_image_block_types=["Image","Table"],
                             extract_image_block_to_payload=False,
                             extract_image_block_output_dir="./images")

    tables = [el for el in elements if el.category == "Image"]
    caption = [el for el in elements if el.category == "FigureCaption" or el.text.lower().startswith(("figure", "fig"))]

    return tables, caption


In [None]:
pdf_path="./2402.12352.pdf"
images, captions = extract_images_and_captions(pdf_path)

In [None]:


def find_midpoint_from_corners(points):
    """四角形の座標から中心点を計算します。"""
    x_coords = [p[0] for p in points]
    y_coords = [p[1] for p in points]
    center_x = sum(x_coords) / len(x_coords)
    center_y = sum(y_coords) / len(y_coords)
    return center_x, center_y

def calculate_distance(center1, center2):
    """二点間の距離を計算します。"""
    return ((center1[0] - center2[0]) ** 2 + (center1[1] - center2[1]) ** 2) ** 0.5

def match_figures_to_captions(figures, captions):
    """図とキャプションを紐付けます。"""
    matches = []
    for image in images:
      image_midpoint = find_midpoint_from_corners(image.metadata.coordinates.points)
      closest_caption = None
      closest_distance = float('inf')
      for caption in captions:
              # ページ番号が一致し、キャプションが図より下にあるか確認
              if image.metadata.page_number == caption.metadata.page_number and \
                find_midpoint_from_corners(caption.metadata.coordinates.points)[1] > image_midpoint[1]:
                  caption_midpoint = find_midpoint_from_corners(caption.metadata.coordinates.points)
                  distance = calculate_distance(image_midpoint, caption_midpoint)
                  if distance < closest_distance:
                      closest_distance = distance
                      closest_caption = caption
      if closest_caption is not None:
            matches.append((image.id, closest_caption.id))
      else:
            matches.append((image.id, None))
    return matches




In [None]:
import re
import json

def match_figures_and_captions_to_json(matches, images, captions, output_file_path):
    output_json = []
    for match in matches:
        image_id, caption_id = match
        image_info = next((item for item in images if item.id == image_id), None)
        caption_info = next((item for item in captions if item.id == caption_id), None)
        if image_info and getattr(image_info, 'category', None) == "Image":
            # キャプションテキストから図番号を抽出
            if caption_info:
                search_result = re.search(r'^(fig|figure)\s*(\d+)', caption_info.text, re.IGNORECASE)
                if search_result:
                    figure_number = search_result.group(2)
                    figure_name = f"Figure {figure_number}"
                else:
                    figure_name = "Unknown Figure"
                caption_text = caption_info.text
            else:
                figure_name = "Unknown Figure"
                caption_text = "No Caption"

            figure_path = image_info.metadata.image_path
            output_json.append({
                "id": image_info.id,
                "category": image_info.category,
                "name": figure_name,
                "caption": caption_text,
                "image_path": figure_path,
                "image_text": image_info.text,
            })

    with open(output_file_path, 'w', encoding='utf-8') as f:
        json.dump(output_json, f, ensure_ascii=False, indent=4)




In [None]:
matches = match_figures_to_captions(images, captions)
output_file_path = 'matched_figures_captions.json'
match_figures_and_captions_to_json(matches, images, captions, output_file_path)