# Global search

ベースラインRAGは、答えを構成するためにデータセット全体の情報の集約を必要とするクエリに苦戦している。ベースラインRAGは、データセット内の意味的に類似したテキストコンテンツのベクトル検索に依存しているため、「データ内のトップ5のテーマは何か」といったクエリのパフォーマンスは最悪である。クエリには、正しい情報へ誘導するものは何もない。

しかし、GraphRAGを使えば、LLMが生成した知識グラフの構造から、データセット全体の構造（ひいてはテーマ）を知ることができるため、このような質問に答えることができる。これにより、プライベートデータセットを、あらかじめ要約された意味のあるセマンティッククラスタに整理することができる。LLMは、我々の大域的検索法を用いて、ユーザからのクエリに応答する際に、これらのクラスターを使ってこれらのテーマを要約する。

In [1]:
%load_ext dotenv

import os

import pandas as pd
import tiktoken

from graphrag.query.indexer_adapters import read_indexer_entities, read_indexer_reports
from graphrag.query.llm.oai.chat_openai import ChatOpenAI
from graphrag.query.llm.oai.typing import OpenaiApiType
from graphrag.query.structured_search.global_search.community_context import (
    GlobalCommunityContext,
)
from graphrag.query.structured_search.global_search.search import GlobalSearch

ベースとなるLLMの指定

In [None]:
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License

"""Chat-based OpenAI LLM implementation."""

from collections.abc import Callable
from typing import Any

from tenacity import (
    AsyncRetrying,
    RetryError,
    Retrying,
    retry_if_exception_type,
    stop_after_attempt,
    wait_exponential_jitter,
)

from graphrag.query.llm.base import BaseLLM, BaseLLMCallback
from graphrag.query.llm.oai.base import OpenAILLMImpl
from graphrag.query.llm.oai.typing import (
    OPENAI_RETRY_ERROR_TYPES,
    OpenaiApiType,
)
from openai import OpenAI
from graphrag.query.progress import StatusReporter

_MODEL_REQUIRED_MSG = "model is required"


class ChatAssistant(BaseLLM):
    """Wrapper for OpenAI ChatCompletion models."""

    def __init__(
        self,
        api_key: str | None = None,
        model: str | None = None,
        azure_ad_token_provider: Callable | None = None,
        deployment_name: str | None = None,
        api_base: str | None = None,
        api_version: str | None = None,
        api_type: OpenaiApiType = OpenaiApiType.OpenAI,
        organization: str | None = None,
        max_retries: int = 10,
        request_timeout: float = 180.0,
        retry_error_types: tuple[type[BaseException]] = OPENAI_RETRY_ERROR_TYPES,  # type: ignore
        reporter: StatusReporter | None = None,
    ):
        self.client = OpenAI(api_key=api_key)
        self.model = model
        self.retry_error_types = retry_error_types

    def generate(
        self,
        messages: str | list[Any],
        streaming: bool = True,
        callbacks: list[BaseLLMCallback] | None = None,
        **kwargs: Any,
    ) -> str:
        """Generate text."""
        try:
            retryer = Retrying(
                stop=stop_after_attempt(self.max_retries),
                wait=wait_exponential_jitter(max=10),
                reraise=True,
                retry=retry_if_exception_type(self.retry_error_types),
            )
            for attempt in retryer:
                with attempt:
                    return self._generate(
                        messages=messages,
                        streaming=streaming,
                        callbacks=callbacks,
                        **kwargs,
                    )
        except RetryError as e:
            self._reporter.error(
                message="Error at generate()", details={self.__class__.__name__: str(e)}
            )
            return ""
        else:
            # TODO: why not just throw in this case?
            return ""

    async def agenerate(
        self,
        messages: str | list[Any],
        streaming: bool = True,
        callbacks: list[BaseLLMCallback] | None = None,
        **kwargs: Any,
    ) -> str:
        """Generate text asynchronously."""
        try:
            retryer = AsyncRetrying(
                stop=stop_after_attempt(self.max_retries),
                wait=wait_exponential_jitter(max=10),
                reraise=True,
                retry=retry_if_exception_type(self.retry_error_types),  # type: ignore
            )
            async for attempt in retryer:
                with attempt:
                    return await self._agenerate(
                        messages=messages,
                        streaming=streaming,
                        callbacks=callbacks,
                        **kwargs,
                    )
        except RetryError as e:
            self._reporter.error(f"Error at agenerate(): {e}")
            return ""
        else:
            # TODO: why not just throw in this case?
            return ""

    def _generate(
        self,
        messages: str | list[Any],
        streaming: bool = True,
        callbacks: list[BaseLLMCallback] | None = None,
        **kwargs: Any,
    ) -> str:
        model = self.model
        if not model:
            raise ValueError(_MODEL_REQUIRED_MSG)
        response = self.sync_client.chat.completions.create(  # type: ignore
            model=model,
            messages=messages,  # type: ignore
            stream=streaming,
            **kwargs,
        )  # type: ignore
        if streaming:
            full_response = ""
            while True:
                try:
                    chunk = response.__next__()  # type: ignore
                    if not chunk or not chunk.choices:
                        continue

                    delta = (
                        chunk.choices[0].delta.content
                        if chunk.choices[0].delta and chunk.choices[0].delta.content
                        else ""
                    )  # type: ignore

                    full_response += delta
                    if callbacks:
                        for callback in callbacks:
                            callback.on_llm_new_token(delta)
                    if chunk.choices[0].finish_reason == "stop":  # type: ignore
                        break
                except StopIteration:
                    break
            return full_response
        return response.choices[0].message.content or ""  # type: ignore

    async def _agenerate(
        self,
        messages: str | list[Any],
        streaming: bool = True,
        callbacks: list[BaseLLMCallback] | None = None,
        **kwargs: Any,
    ) -> str:
        model = self.model
        if not model:
            raise ValueError(_MODEL_REQUIRED_MSG)
        response = await self.async_client.chat.completions.create(  # type: ignore
            model=model,
            messages=messages,  # type: ignore
            stream=streaming,
            **kwargs,
        )
        if streaming:
            full_response = ""
            while True:
                try:
                    chunk = await response.__anext__()  # type: ignore
                    if not chunk or not chunk.choices:
                        continue

                    delta = (
                        chunk.choices[0].delta.content
                        if chunk.choices[0].delta and chunk.choices[0].delta.content
                        else ""
                    )  # type: ignore

                    full_response += delta
                    if callbacks:
                        for callback in callbacks:
                            callback.on_llm_new_token(delta)
                    if chunk.choices[0].finish_reason == "stop":  # type: ignore
                        break
                except StopIteration:
                    break
            return full_response

        return response.choices[0].message.content or ""  # type: ignore


In [3]:
api_key = os.environ["OPENAI_API_KEY"]
llm_model = "gpt-4o"

llm = ChatOpenAI(
    api_key=api_key,
    model=llm_model,
    api_type=OpenaiApiType.OpenAI,  # OpenaiApiType.OpenAI or OpenaiApiType.AzureOpenAI
    max_retries=20,
)

token_encoder = tiktoken.get_encoding("cl100k_base")

In [4]:
system_message="あたなはAIアシスタントです"
user_message="日本の首都は？"
print(llm.generate(
    messages=[
            {"role": "system", "content": system_message},
            {"role": "user", "content": user_message}
        ]
))

日本の首都は東京です。


`python -m graphrag.index --root ./ragtest`で生成された`.parquet`ファイルへのpathを指定

In [5]:
input_dir="ragtest/output/20240801-212257/artifacts"

community_report_table = "create_final_community_reports"
entity_table = "create_final_nodes"
entity_embedding_table = "create_final_entities"

`pd.DataFrame`として読み出し

In [6]:
report_df = pd.read_parquet(f"{input_dir}/{community_report_table}.parquet")
entity_df = pd.read_parquet(f"{input_dir}/{entity_table}.parquet")
entity_embedding_df = pd.read_parquet(f"{input_dir}/{entity_embedding_table}.parquet")

DataFrameからGraphRAGとして使える形に変換

In [7]:
reports = read_indexer_reports(report_df, entity_df, 2)
entities = read_indexer_entities(entity_df, entity_embedding_df, 2)

グローバルな文脈を構築

In [8]:
context_builder = GlobalCommunityContext(
    community_reports=reports,
    entities=entities,  # default to None if you don't want to use community weights for ranking
    token_encoder=token_encoder,
)

In [12]:
context_builder.token_encoder()

TypeError: 'Encoding' object is not callable

グローバルサーチのためのエンジンをインスタンス化

In [13]:
context_builder_params = {
    "use_community_summary": False,  # False means using full community reports. True means using community short summaries.
    "shuffle_data": True,
    "include_community_rank": True,
    "min_community_rank": 0,
    "community_rank_name": "rank",
    "include_community_weight": True,
    "community_weight_name": "occurrence weight",
    "normalize_community_weight": True,
    "max_tokens": 12_000,  # change this based on the token limit you have on your model (if you are using a model with 8k limit, a good setting could be 5000)
    "context_name": "Reports",
}

map_llm_params = {
    "max_tokens": 1000,
    "temperature": 0.0,
    "response_format": {"type": "json_object"},
}

reduce_llm_params = {
    "max_tokens": 2000,  # change this based on the token limit you have on your model (if you are using a model with 8k limit, a good setting could be 1000-1500)
    "temperature": 0.0,
}

In [17]:
search_engine = GlobalSearch(
    llm=llm,
    context_builder=context_builder,
    token_encoder=token_encoder,
    max_data_tokens=12_000,  # change this based on the token limit you have on your model (if you are using a model with 8k limit, a good setting could be 5000)
    map_llm_params=map_llm_params,
    reduce_llm_params=reduce_llm_params,
    allow_general_knowledge=False,  # set this to True will add instruction to encourage the LLM to incorporate general knowledge in the response, which may increase hallucinations, but could be useful in some use cases.
    json_mode=True,  # set this to False if your LLM model does not support JSON mode.
    context_builder_params=context_builder_params,
    concurrent_coroutines=32,
    response_type="multiple paragraphs",  # free form text describing the response type and format, can be anything, e.g. prioritized list, single paragraph, multiple paragraphs, multiple-page report
)

グローバルサーチ

In [20]:
result = await search_engine.asearch(
    "ドラゴンは出てきますか？"
)

print(result.response)

### ドラゴンの存在とその役割

ドラゴンは村の中心的な存在であり、ムラのコミュニティにおいて権威とレジリエンスの象徴です。歴史的に村と深い関係を持ち、その存在は村のアイデンティティと物語を形成しています。村人たちはドラゴンの怒りに直面することがあり、そのため恐怖と尊敬が入り混じった複雑なダイナミックが存在します [Data: Reports (3)]。

### 村への脅威と対策

ドラゴンは村にとって重大な脅威でもあり、村全体がその怒りを和らげる方法を見つけることで安全を確保しなければなりません [Data: Reports (1)]。この緊張感は村の生活に大きな影響を与えています。

### 伝統とフォークロアの伝承

ドラゴンの存在はムラのフォークロアや伝統の伝承においても重要な役割を果たしています。ドラゴンはコミュニティのレジリエンスを思い起こさせ、村人たちのアイデンティティや自然との関係を形作る要素となっています [Data: Reports (3)]。

### コミュニケーションの手段

村人の一人であるリナは、石を使ってドラゴンとコミュニケーションを取る能力を持っています。この能力は、コミュニティとドラゴンの間の理解と協力を促進するために不可欠です [Data: Reports (2)]。

### ドラゴンの出現場所

ドラゴンは湖の近くに現れることが多く、この場所はコミュニティとドラゴンの関係に複雑さを加えています [Data: Reports (1)]。

以上の情報から、ドラゴンはムラの生活と文化に深く根付いた存在であり、その影響は多岐にわたります。


In [21]:
result = await search_engine.asearch(
    "もっと詳しく教えて"
)

print(result.response)

### Mura村とドラゴンの関係

Mura村は、ドラゴンとの深い歴史的関係を持ち、その関係が村のアイデンティティと物語を形成しています。村人たちはドラゴンの怒りからの脅威に直面し、恐怖と尊敬の複雑なダイナミクスを持っています。この関係は、村の生存と文化的アイデンティティにとって重要です [Data: Reports (3)]。ドラゴンはMura村において恐怖の源であると同時に、権威と村人の闘争を象徴する崇拝の対象でもあります。ドラゴンの存在は、村の物語において重要な文化的遺物として機能し、村人のアイデンティティと自然との関係を形作ります [Data: Reports (3)]。

### 村の文化保存とリーダーシップ

リオとリナは、Mura村の文化保存において重要な役割を果たしています。リオは村人を保護し、歴史を伝える役割を担い、リナは村人間の調和と平和を促進します。彼らの努力は、村の伝統の継続性と回復力に貢献しています [Data: Reports (3)]。特にリナは、石を使ってドラゴンとコミュニケーションを取る能力を持ち、村人とドラゴンの間の理解と協力を促進する重要な役割を果たしています。彼女の行動は、村とドラゴンの間の潜在的な紛争を解決するために重要です [Data: Reports (2, 1)]。

### 自然回復と持続可能性

Mura村の自然回復努力は、過去のドラゴンとの紛争による破壊に対する応答であり、村人の持続可能性へのコミットメントを反映しています。この努力は、村人の回復力と未来の世代のための環境保護への献身を強調しています [Data: Reports (3)]。

### 結論

Mura村は、ドラゴンとの複雑な関係を通じて独自の文化とアイデンティティを築いてきました。リオとリナのリーダーシップは、村の文化保存と調和を促進し、持続可能な未来を目指す村人たちの努力を支えています。これらの要素が組み合わさることで、Mura村はその独自性と回復力を維持し続けています。


In [19]:
result.context_data["reports"]

Unnamed: 0,id,title,occurrence weight,content,rank
0,3,Mura and the Dragon's Legacy,1.0,# Mura and the Dragon's Legacy\n\nThe communit...,8.5
1,2,Community of Mura: Rina and Rio,1.0,# Community of Mura: Rina and Rio\n\nThe commu...,7.5
2,0,Dragon's Sand and Eld,0.4,# Dragon's Sand and Eld\n\nThe community cente...,6.5
3,1,Dragon and Lake Community,0.2,# Dragon and Lake Community\n\nThe community c...,8.0


In [20]:
print(f"LLM calls: {result.llm_calls}. LLM tokens: {result.prompt_tokens}")

LLM calls: 2. LLM tokens: 4601
