In [1]:
# 秘密鍵を読み込む
from env_loader import load_keys
load_keys()

LangSmith API Key: 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

✅ .env ファイルを読み込みました


In [2]:
import os
import shutil
import stat
import time
import subprocess
import tempfile
import gc

# 読み取り専用ファイルを削除する補助関数
def remove_readonly(func, path, _):
    os.chmod(path, stat.S_IWRITE)
    func(path)

# post-checkoutフックのセキュリティ制限を回避し、安全にGitリポジトリをcloneする関数
def safe_git_clone(clone_url: str, branch: str = "main", target_dir: str = None) -> str:
    
    # デフォルトは一時ディレクトリ
    if target_dir is None:
        target_dir = os.path.join(tempfile.gettempdir(), "temp_repo")

    # 既存ディレクトリを削除（リソース開放してから削除）
    if os.path.exists(target_dir):
        gc.collect()         # ガーベジコレクションでリソース開放
        time.sleep(1)        # 少し待ってから削除
        try:
            shutil.rmtree(target_dir, onerror=remove_readonly)
        except Exception as e:
            raise RuntimeError(f"既存の {target_dir} を削除できませんでした: {e}")

    # クローン（checkout はあとで行う）
    result = subprocess.run(
        ["git", "clone", "--no-checkout", clone_url, target_dir],
        capture_output=True, text=True
    )
    if result.returncode != 0:
        raise RuntimeError(f"Git clone failed:\n{result.stderr}")

    # post-checkout フックを削除（セキュリティ制限対策）
    hook_path = os.path.join(target_dir, ".git", "hooks", "post-checkout")
    if os.path.exists(hook_path):
        try:
            os.remove(hook_path)
        except Exception as e:
            raise RuntimeError(f"post-checkout フックの削除に失敗しました: {e}")

    # 明示的に checkout
    result = subprocess.run(
        ["git", "checkout", branch],
        cwd=target_dir,
        capture_output=True, text=True
    )
    if result.returncode != 0:
        raise RuntimeError(f"Git checkout failed:\n{result.stderr}")

    return target_dir


In [3]:
import tempfile
import os
import stat
import shutil
import pdfplumber
from langchain_community.document_loaders import GitLoader
from langchain_core.documents import Document

pdf_file_paths = []

# 指定パスのpdfファイルのみを抽出する。
def file_filter(file_path: str) -> bool:
    if file_path.endswith(".pdf") and "Insurance_documents" in file_path:
        pdf_file_paths.append(file_path)
        return True
    return False

clone_url = "https://github.com/HAL-141/RAG_Application.git"
branch = "main"

# 共通部品でクローン実行
repo_path = safe_git_clone(clone_url, branch)

loader = GitLoader(
    clone_url=clone_url,
    repo_path=repo_path,
    branch=branch,
    file_filter=file_filter,
)
loader.load()  # Git clone + フィルタリングしたファイルパスをpdf_file_pathsに追加

# pdfplumber でPDFを読み込む
# PyPDFLoaderだと決算資料等に使用されるUniJIS-UTF16-Hをうまく読み込めないため
# 「CropBox missing from /Page, defaulting to MediaBox」のログがページ数分出力されるが抑制できなかったため放置　
# ※動作には問題なし
docs = []
for pdf_path in pdf_file_paths:
    full_text = ""
    with pdfplumber.open(pdf_path) as pdf:
        for page in pdf.pages:
            text = page.extract_text()
            if text:
                full_text += text + "\n"
    if full_text.strip():
        docs.append(Document(page_content=full_text, metadata={"source": pdf_path}))


print(f"Loaded {len(docs)} files.")

CropBox missing from /Page, defaulting to MediaBox
CropBox missing from /Page, defaulting to MediaBox
CropBox missing from /Page, defaulting to MediaBox
CropBox missing from /Page, defaulting to MediaBox
CropBox missing from /Page, defaulting to MediaBox
CropBox missing from /Page, defaulting to MediaBox
CropBox missing from /Page, defaulting to MediaBox
CropBox missing from /Page, defaulting to MediaBox
CropBox missing from /Page, defaulting to MediaBox
CropBox missing from /Page, defaulting to MediaBox
CropBox missing from /Page, defaulting to MediaBox
CropBox missing from /Page, defaulting to MediaBox
CropBox missing from /Page, defaulting to MediaBox
CropBox missing from /Page, defaulting to MediaBox
CropBox missing from /Page, defaulting to MediaBox
CropBox missing from /Page, defaulting to MediaBox
CropBox missing from /Page, defaulting to MediaBox
CropBox missing from /Page, defaulting to MediaBox
CropBox missing from /Page, defaulting to MediaBox
CropBox missing from /Page, def

Loaded 9 files.


In [4]:
# 先頭3件のドキュメント内容を確認（ページごとのテキスト）
for i, doc in enumerate(docs[:1]):
    print(f"\n--- Document {i+1} ---")
    print(doc.page_content[:100])  # 長すぎるときは先頭100文字だけ表示



--- Document 1 ---
2025年3月期 決算短信〔日本基準〕(連結)
2025年5月20日
上 場 会 社 名 ＳＯＭＰＯホールディングス株式会社 上場取引所 東
コ ー ド 番 号 8630 URL https://ww


In [5]:
for i, doc in enumerate(docs):
    print(f"Document {i+1} length: {len(doc.page_content)} characters")


Document 1 length: 28461 characters
Document 2 length: 56864 characters
Document 3 length: 27101 characters
Document 4 length: 119641 characters
Document 5 length: 12576 characters
Document 6 length: 29464 characters
Document 7 length: 6298 characters
Document 8 length: 75755 characters
Document 9 length: 1200 characters


In [6]:
# Document Transformer
from langchain_text_splitters import CharacterTextSplitter
from langchain_text_splitters import RecursiveCharacterTextSplitter

# 1,2ページの範囲を含めることができればテーマごとに網羅することはできそうなのでchunk_size=500
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=10)

texts = text_splitter.split_documents(docs)
print(len(texts))

753


In [7]:
# Embedding model
from langchain_openai import OpenAIEmbeddings

embeddings = OpenAIEmbeddings(model="text-embedding-3-small")

In [8]:
from langchain_core.documents import Document

# RRFの処理の関数
def reciprocal_rank_fusion(
    retriever_outputs: list[list[Document]],
    k: int = 60,
) -> list[str]:
    # 各ドキュメントのコンテンツ（文字列）とそのスコアの対応を保持する辞書を準備
    content_score_mapping = {}

    # 検索クエリごとにループ
    for docs in retriever_outputs:
        # 検索結果のドキュメントごとにループ
        for rank, doc in enumerate(docs):
            content = doc.page_content

            # 初めて登場したコンテンツの場合はスコアを0で初期化
            if content not in content_score_mapping:
                content_score_mapping[content] = 0
            # (1 / (順位 + k)) のスコアを加算
            content_score_mapping[content] += 1 / (rank + k)

    # スコアの大きい順にソート
    ranked = sorted(content_score_mapping.items(), key=lambda x: x[1], reverse=True)
    return [content for content, _ in ranked]

In [9]:
# hybrid_retriever作成
from langchain_chroma import Chroma
from langchain_community.retrievers import BM25Retriever
from langchain_core.runnables import RunnableParallel

# 分割：前半と後半に分ける (一括で保存しようとするとトークン数がオーバーしたので)
midpoint = len(docs) // 2
docs_part1 = docs[:midpoint]
docs_part2 = docs[midpoint:]

# --Chromaのベクターストアを準備-----------
# chromaの永続化保存先
chroma_persist_dir = "./chroma_db"

# 1回目：ベクトルストア初期化＋保存
chroma_db = Chroma.from_documents(
    documents=docs_part1,
    embedding=embeddings,
    persist_directory=chroma_persist_dir
)
# 2回目：既存ベクトルストアに追記（同じ persist_directory に接続）
chroma_db = Chroma(
    embedding_function=embeddings,
    persist_directory=chroma_persist_dir
)
chroma_db.add_documents(docs_part2)

# --chroma_retrieverを準備---------------
chroma_retriever = chroma_db.as_retriever()
chroma_retriever = chroma_retriever.with_config(
    {"run_name": "chroma_retriever"}
)

# --BM25_retrieverを準備-----------------
bm25_retriever = BM25Retriever.from_documents(docs)
bm25_retriever = bm25_retriever.with_config(
    {"run_name": "bm25_retriever"}
)

# --hybrid_retrieverを準備---------------
hybrid_retriever = (
    RunnableParallel({
        "chroma_documents": chroma_retriever,
        "bm25_documents": bm25_retriever,
    })
    | (lambda x: [x["chroma_documents"], x["bm25_documents"]])
    | reciprocal_rank_fusion
)

In [13]:
from pydantic import BaseModel, Field
from langchain_core.output_parsers import PydanticOutputParser

# Pydanticで出力形式を規定
class Answer(BaseModel):
    question: str = Field(description="summary of the question")
    answer: str = Field(description="summary of the answer")
    explanation: str = Field(description="explanation of the answer")
    keywords: list[str] = Field(description="keywords helpful for explaining the question")

# PydanticOutputParserの作成
output_parser = PydanticOutputParser(pydantic_object=Answer)
# 出力フォーマットの作成
format_instructions = output_parser.get_format_instructions()

In [28]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

system_prompt = f"""\
あなたは保険業界についての質問に対する回答者です。
以下の条件を厳守し、参考情報を元に、必ず**JSON形式で**回答してください。

# 条件 :
1. 以下の４項目をすべて含めること :
    - question: 質問の要点の要約
    - answer: 回答の要約
    - explanation: 回答の背景・根拠、具体的説明（400字以内）
    - keywords: 回答に関連する重要語句のリスト
2. 出力全体を与えられたformat_instrcutionsに従って書くこと

# 参考情報 :
{{context}}

"""

prompt = ChatPromptTemplate.from_messages([
    ("system", system_prompt),
    ("human", "{question}\n\n 回答は必ずJSON形式で、指定されたキーをすべて含めてください。")
]).partial(format_instructions=format_instructions)

model = ChatOpenAI(model="gpt-4o-mini", temperature=0)

In [32]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableMap, RunnablePassthrough
from operator import itemgetter

hybrid_rag_chain = (
    RunnableMap({
        "question": RunnablePassthrough(),
        "context": itemgetter("question") | hybrid_retriever,
        "format_instructions": lambda _: format_instructions
    })
    | prompt | model | output_parser
).with_config({"run_name": "hybrid_rag_chain"})

response = hybrid_rag_chain.invoke({"question":"メガ損保３社の近年の業績はどうですか"})
print(response)
# for chunk in hybrid_rag_chain.stream("損害保険業界が直面している代表的な課題を３つ教えてください"):
#     print(chunk, end="", flush=True)

question='メガ損保３社の近年の業績はどうですか' answer='メガ損保３社は、近年の業績が増益傾向にあり、特に2023年度は新型コロナウイルス関連の給付金支払減少が影響し、全体的に利益が増加しています。' explanation='2023年度のメガ損保３社（東京海上ホールディングス、MS&ADインシュアランスグループ、SOMPOホールディングス）の業績は、全体的に増益となっています。特に、東京海上ホールディングスは、経常利益が前年同期比で大幅に増加し、特別利益も計上しています。MS&ADインシュアランスグループも、海外事業の好調により利益が増加しました。SOMPOホールディングスも、政策株式の売却益が増加し、全体の利益率が改善しています。これらの要因により、各社は安定した財務基盤を維持しつつ、持続可能なビジネスモデルの構築に向けた取り組みを進めています。' keywords=['メガ損保', '業績', '増益', '東京海上ホールディングス', 'MS&ADインシュアランスグループ', 'SOMPOホールディングス', '経常利益', '特別利益']


In [31]:
print(response.answer)

損害保険業界は、インフレ、金利上昇、自然災害の頻発、競争の激化、顧客ニーズの変化などの課題に直面しています。


In [1]:
print(type(chroma_db))

NameError: name 'chroma_db' is not defined