# AutoGenとAzure AI Searchを使用したAgentic RAG

この例では、Azure AI SearchをベクトルデータベースとしてAutoGenフレームワークと統合し、Retrieval-Augmented Generation（RAG）を実装する方法を示します。

## 📚 このノートブックで学習できること

### 🎯 主要な学習目標
- **Azure AI Search統合**: クラウドベースの検索サービスをRAGシステムに活用
- **AutoGenフレームワーク**: エンタープライズ級のマルチエージェントシステム構築
- **スケーラブルRAG**: 大規模データセットに対応する実用的な検索・生成システム
- **Azure連携**: GitHub TokenによるAzure AIサービスとの統合

### 🛠️ 実装する技術スタック
- **Azure AI Search API**: エンタープライズ検索サービスとの統合
- **AutoGenエージェント**: AI駆動の知識統合システム
- **RESTful API**: Azure AI Search APIを使用したデータ操作
- **パフォーマンス評価**: レスポンス品質と処理時間の測定

### 💡 実用的なスキル
- 企業向け大規模文書検索システムの構築
- クラウドベースRAGアーキテクチャの設計
- Azure AI Search APIの基本的な活用方法
- 本番環境対応のAIエージェントシステム開発

### 🔄 ChromaDBとの違い
- **スケーラビリティ**: Azure AI Searchは大規模データセットに最適化
- **運用性**: マネージドサービスによる簡単な運用・保守
- **統合性**: Azure エコシステムとのシームレスな連携

In [1]:
import os
import time
import asyncio
import warnings
from typing import List, Dict

# 不要な警告を非表示にする
warnings.filterwarnings("ignore", message=".*Missing required field 'structured_output'.*")

from autogen_agentchat.agents import AssistantAgent
from autogen_core import CancellationToken
from autogen_agentchat.messages import TextMessage
from azure.core.credentials import AzureKeyCredential
from autogen_ext.models.azure import AzureAIChatCompletionClient

from azure.search.documents import SearchClient
from azure.search.documents.indexes import SearchIndexClient
from azure.search.documents.indexes.models import SearchIndex, SimpleField, SearchFieldDataType, SearchableField

from dotenv import load_dotenv

load_dotenv()

True

## クライアントの作成

まず、Azure AI Chat Completion Clientを初期化します。このクライアントは、Azure OpenAIサービスとやり取りしてユーザークエリへの応答を生成するために使用されます。

In [3]:
client = AzureAIChatCompletionClient(
    model="gpt-4o-mini",
    # Azure AI Model Inference API - AutoGenとAzureモデルを統合するための統一エンドポイント
    # GitHub Tokenでの認証をサポートし、複数のAIモデルに統一されたAPIでアクセス可能
    endpoint="https://models.inference.ai.azure.com",
    credential=AzureKeyCredential(os.getenv("GITHUB_TOKEN")),
    model_info={
        "json_output": True,
        "function_calling": True,
        "vision": True,
        "structured_output": True,  # 警告を解決するために追加
        "family": "unknown",
    },
)

## ベクトルデータベースの初期化

永続ストレージを使用してAzure AI Searchを初期化し、拡張されたサンプルドキュメントを追加します。Azure AI Searchは、正確な応答生成のためのコンテキストを提供するドキュメントの保存と検索に使用されます。

In [5]:
# Azure AI Searchを永続ストレージで初期化
search_service_endpoint = os.getenv("AZURE_SEARCH_SERVICE_ENDPOINT")
search_api_key = os.getenv("AZURE_SEARCH_API_KEY")
index_name = "travel-documents-azuresearch"  # ChromaDBノートブックとの競合を避けるため名前を変更

search_client = SearchClient(
    endpoint=search_service_endpoint,
    index_name=index_name,
    credential=AzureKeyCredential(search_api_key)
)

index_client = SearchIndexClient(
    endpoint=search_service_endpoint,
    credential=AzureKeyCredential(search_api_key)
)

# インデックススキーマを定義
fields = [
    SimpleField(name="id", type=SearchFieldDataType.String, key=True),
    SearchableField(name="content", type=SearchFieldDataType.String)
]

index = SearchIndex(name=index_name, fields=fields)

# インデックスが存在しない場合のみ作成
try:
    # 既存インデックスの確認を試みる
    existing_index = index_client.get_index(index_name)
    print(f"インデックス '{index_name}' は既に存在します。")
except Exception:
    # インデックスが存在しない場合、新規作成
    index_client.create_index(index)
    print(f"新しいインデックス '{index_name}' を作成しました。")

# 拡張されたサンプルドキュメント（日本語版）
documents = [
    {"id": "azuresearch-1", "content": "Contoso旅行では、世界各地のエキゾチックな目的地への高級バケーションパッケージを提供しています。"},
    {"id": "azuresearch-2", "content": "当社のプレミアム旅行サービスには、個人向け旅程計画と24時間365日のコンシェルジュサポートが含まれています。"},
    {"id": "azuresearch-3", "content": "Contosoの旅行保険は、医療緊急事態、旅行キャンセル、手荷物紛失をカバーしています。"},
    {"id": "azuresearch-4", "content": "人気の目的地には、モルディブ、スイスアルプス、アフリカサファリなどがあります。"},
    {"id": "azuresearch-5", "content": "Contoso旅行では、ブティックホテルやプライベートガイドツアーへの限定アクセスを提供しています。"}
]

# ドキュメントが既に存在するかチェックして、存在しない場合のみアップロード
try:
    # 最初のドキュメントの存在確認
    search_result = search_client.get_document(key="azuresearch-1")
    print("ドキュメントは既に存在します。アップロードをスキップします。")
except Exception:
    # ドキュメントが存在しない場合、アップロード
    search_client.upload_documents(documents)
    print("新しいドキュメントをインデックスに追加しました。")

インデックス 'travel-documents-azuresearch' は既に存在します。
ドキュメントは既に存在します。アップロードをスキップします。
ドキュメントは既に存在します。アップロードをスキップします。


## データ取得関数の定義

ここでは、RAGシステムの**データアクセス層**となる2つの重要な関数を定義します：

### 🔍 `get_retrieval_context(query)`
- **役割**: ユーザークエリに基づいて Azure AI Search から関連文書を検索
- **依存**: 前のセルで設定した `search_client` を使用
- **出力**: 検索結果を統合したコンテキスト文字列

### 🌤️ `get_weather_data(location)`
- **役割**: 指定された場所の天気情報を取得（シミュレート）
- **用途**: 旅行関連の質問で気象情報を組み合わせた回答生成
- **拡張性**: 実際のAPIと置き換え可能な設計

これらの関数は、後のセルの `ask_unified_rag` 統合関数で呼び出され、AIエージェントが回答生成に使用するコンテキスト情報を提供します。

In [18]:
def get_retrieval_context(query: str) -> str:
    """クエリに基づいてAzure AI Searchから関連ドキュメントを取得します。"""
    results = search_client.search(query)
    context_strings = []
    for result in results:
        context_strings.append(f"Document: {result['content']}")
    return "\n\n".join(context_strings) if context_strings else "検索結果が見つかりませんでした"

def get_weather_data(location: str) -> str:
    """
    指定された場所の天気データ取得をシミュレートします。
    実際のシナリオでは、天気APIを呼び出すことになります。
    """
    # 主要都市のシミュレートされた天気データ（寒い目的地を追加）
    weather_database = {
        "new york": {"temperature": 22, "condition": "曇り時々晴れ", "humidity": 65, "wind": "時速16km"},
        "london": {"temperature": 16, "condition": "雨", "humidity": 80, "wind": "時速24km"},
        "tokyo": {"temperature": 24, "condition": "晴れ", "humidity": 50, "wind": "時速8km"},
        "sydney": {"temperature": 27, "condition": "快晴", "humidity": 45, "wind": "時速19km"},
        "paris": {"temperature": 20, "condition": "曇り", "humidity": 70, "wind": "時速13km"},
        # 寒い目的地を追加（旅行先として人気）
        "swiss alps": {"temperature": -2, "condition": "雪", "humidity": 90, "wind": "時速12km"},
        "switzerland": {"temperature": 3, "condition": "曇り雪", "humidity": 85, "wind": "時速15km"},
        "iceland": {"temperature": 1, "condition": "雪時々晴れ", "humidity": 75, "wind": "時速20km"},
        "norway": {"temperature": -5, "condition": "雪", "humidity": 88, "wind": "時速18km"},
        "finland": {"temperature": -8, "condition": "曇り雪", "humidity": 82, "wind": "時速14km"},
    }
    
    # 場所文字列を正規化
    location_key = location.lower()
    
    # この場所のデータがあるかチェック
    if location_key in weather_database:
        data = weather_database[location_key]
        return f"{location.title()}の天気:\n" \
               f"気温: {data['temperature']}°C\n" \
               f"天候: {data['condition']}\n" \
               f"湿度: {data['humidity']}%\n" \
               f"風速: {data['wind']}"
    else:
        return f"{location}の天気データは利用できません。"

## エージェント設定

AIアシスタントエージェントを設定します。このエージェントは、提供されたコンテキスト情報のみを使用して詳細な応答を生成することに特化しています。

In [19]:
# 拡張機能を持つエージェントを作成
assistant = AssistantAgent(
    name="assistant",
    model_client=client,
    system_message=(
        "あなたは提供されたコンテキスト情報を使用して回答する有用なAIアシスタントです。"
        "提供されたコンテキストには、文書情報と天気情報の両方が含まれる場合があります。"
        "利用可能なすべての情報を活用して、正確で有用な回答を提供してください。"
        "外部の知識は使用せず、提供された情報のみに基づいて回答してください。"
    ),
)

## RAG評価クラス

レスポンスの長さ、ソース引用、レスポンス時間、コンテキスト関連性などの様々な指標に基づいて応答を評価する `RAGEvaluator` クラスを定義します。

In [20]:
class RAGEvaluator:
    def __init__(self):
        self.responses: List[Dict] = []

    def evaluate_response(self, query: str, response: str, context: List[Dict]) -> Dict:
        # 基本的な指標: レスポンス長、引用数、シンプルな関連性スコア
        start_time = time.time()
        metrics = {
            'response_length': len(response),
            'source_citations': sum(1 for doc in context if doc["content"] in response),
            'evaluation_time': time.time() - start_time,
            'context_relevance': self._calculate_relevance(query, context)
        }
        self.responses.append({
            'query': query,
            'response': response,
            'metrics': metrics
        })
        return metrics

    def _calculate_relevance(self, query: str, context: List[Dict]) -> float:
        # シンプルな関連性スコア: クエリが出現するドキュメントの割合
        return sum(1 for c in context if query.lower() in c["content"].lower()) / len(context)

## RAGによるクエリ処理

クエリをアシスタントに送信し、レスポンスを処理・評価する `ask_unified_rag` 関数を定義します。この関数はアシスタントとのやり取りを処理し、`RAGEvaluator`を使用してレスポンスの品質を測定します。

In [21]:
async def ask_unified_rag(query: str, evaluator: RAGEvaluator, location: str = None):
    """
    ドキュメント検索と天気データの両方を組み合わせた統合RAG関数
    クエリと任意の場所パラメータに基づいて処理します。
    
    Args:
        query: ユーザーの質問
        evaluator: レスポンス品質を測定するRAG評価器
        location: 天気クエリ用のオプションの場所
    """
    try:
        # 両方のソースからコンテキストを取得
        retrieval_context = get_retrieval_context(query)
        
        # 場所が提供されている場合、天気データを追加
        weather_context = ""
        if location:
            weather_context = get_weather_data(location)
            weather_intro = f"\n{location}の天気情報:\n"
        else:
            weather_intro = ""
        
        # 利用可能な場合、両方のコンテキストでクエリを拡張
        augmented_query = (
            f"取得されたコンテキスト:\n{retrieval_context}\n\n"
            f"{weather_intro}{weather_context}\n\n"
            f"ユーザーのクエリ: {query}\n\n"
            "上記のコンテキストのみに基づいて、回答を提供してください。"
        )

        # 拡張されたクエリをユーザーメッセージとして送信
        start_time = time.time()
        response = await assistant.on_messages(
            [TextMessage(content=augmented_query, source="user")],
            cancellation_token=CancellationToken(),
        )
        processing_time = time.time() - start_time

        # 評価用の統合コンテキストを作成
        combined_context = documents.copy()  # 旅行ドキュメントから開始
        
        # 天気が存在する場合、ドキュメントとして追加
        if location and weather_context:
            combined_context.append({"id": f"weather-{location}", "content": weather_context})
        
        # レスポンスを評価
        metrics = evaluator.evaluate_response(
            query=query,
            response=response.chat_message.content,
            context=combined_context
        )
        
        result = {
            'response': response.chat_message.content,
            'processing_time': processing_time,
            'metrics': metrics,
        }
        
        # 提供されている場合、結果に場所を追加
        if location:
            result['location'] = location
            
        return result
    except Exception as e:
        print(f"統合クエリ処理エラー: {e}")
        return None

# 使用例

`RAGEvaluator`を初期化し、処理・評価したいクエリを定義します。

In [22]:
async def main():
    evaluator = RAGEvaluator()
    
    # Semantic Kernelの例と同様のユーザークエリを定義
    user_inputs = [
        # 旅行のみのクエリ
        {"query": "Contosoの旅行保険について説明してください"},
        
        # 天気のみのクエリ
        {"query": "ロンドンの現在の天気状況は？", "location": "london"},
        
        # 組み合わせクエリ（寒い目的地のスイスアルプスを使用）
        {"query": "Contosoが提供する寒い目的地で、気温はどのくらいですか？", "location": "swiss alps"},
    ]
    
    print("クエリを処理中:")
    for query_data in user_inputs:
        query = query_data["query"]
        location = query_data.get("location")
        
        if location:
            print(f"\n{location}のクエリを処理中: {query}")
        else:
            print(f"\nクエリを処理中: {query}")
        
        # 透明性のためにRAGコンテキストを取得して表示（Semantic Kernelの例と同様）
        retrieval_context = get_retrieval_context(query)
        weather_context = get_weather_data(location) if location else ""
        
        # 透明性のためにRAGコンテキストを表示
        print("\n--- RAGコンテキスト ---")
        print(retrieval_context)
        if weather_context:
            print(f"\n--- {location}の天気コンテキスト ---")
            print(weather_context)
        print("-------------------\n")
            
        result = await ask_unified_rag(query, evaluator, location)
        if result:
            print("レスポンス:", result['response'])
            print("\n指標:", result['metrics'])
        print("\n" + "="*60 + "\n")

## スクリプトの実行

インタラクティブ環境または標準スクリプトで実行されているかを確認し、それに応じてmain関数を実行します。

In [23]:
if __name__ == "__main__":
    if asyncio.get_event_loop().is_running():
        await main()
    else:
        asyncio.run(main())

クエリを処理中:

クエリを処理中: Contosoの旅行保険について説明してください

--- RAGコンテキスト ---
Document: Contosoの旅行保険は、医療緊急事態、旅行キャンセル、手荷物紛失をカバーしています。

Document: 当社のプレミアム旅行サービスには、個人向け旅程計画と24時間365日のコンシェルジュサポートが含まれています。

Document: Contoso旅行では、ブティックホテルやプライベートガイドツアーへの限定アクセスを提供しています。

Document: Contoso旅行では、世界各地のエキゾチックな目的地への高級バケーションパッケージを提供しています。

Document: 人気の目的地には、モルディブ、スイスアルプス、アフリカサファリなどがあります。
-------------------

レスポンス: Contosoの旅行保険は、医療緊急事態、旅行キャンセル、手荷物紛失をカバーしています。これにより、旅行中に予期せぬ事態が発生した際に安心を提供します。

指標: {'response_length': 76, 'source_citations': 1, 'evaluation_time': 1.33514404296875e-05, 'context_relevance': 0.0}



londonのクエリを処理中: ロンドンの現在の天気状況は？

--- RAGコンテキスト ---
Document: 人気の目的地には、モルディブ、スイスアルプス、アフリカサファリなどがあります。

Document: 当社のプレミアム旅行サービスには、個人向け旅程計画と24時間365日のコンシェルジュサポートが含まれています。

Document: Contoso旅行では、ブティックホテルやプライベートガイドツアーへの限定アクセスを提供しています。

Document: Contoso旅行では、世界各地のエキゾチックな目的地への高級バケーションパッケージを提供しています。

Document: Contosoの旅行保険は、医療緊急事態、旅行キャンセル、手荷物紛失をカバーしています。

--- londonの天気コンテキスト ---
Londonの天気:
気温: 16°C
天候: 雨
湿度: 80%
風速: 時