# Step 4: Retriever+Generator構築

本ステップでは、ここまでに経験してきた類似検索と準備したテンプレートを活用して、RetrieverとGeneratorを実装してする過程を経験します。
- 類似検索ができるように、ベクトルデータベースへ接続して類似検索の検索オプションを指定してRetrieverを生成します
- 投げかけられたクエリを基にRetrieverが類似検索を実行します
- GeneratorがHugging FaceからOpen LLMを取得して読み込みます
- Generatorが類似検索で得られた関連情報とテンプレートでプロンプトを生成し、LLMへ回答案の作成を依頼します
![Step4](../image/rag-overview-step4.png)

## 0. 事前準備

### 共通処理/定数定義
全ステップで共通して使用する定数とバナークラスを読み込みます。

In [None]:
from mylib import myconstant
from mylib.MyBanner import MyBanner

### Access Token入力
Open LLMの取得に必要となるHugging FaceのAccess Tokenを入力します。（事前に発行して入手しておきます）

In [None]:
MyBanner.start()

from getpass import getpass
HF_ACCESS_TOKEN = getpass("Hugging Face の Access Token を入力して Enter Key を押してください: ")

MyBanner.finish()

### パッケージインストール
本ステップの処理で依存するパッケージをインストールします。

In [None]:
MyBanner.start()

!python -V
!pip install langchain
!pip install langchain-huggingface
!pip install langchain_milvus
!pip install accelerate

!pip install ipywidgets
!pip install urllib3==1.26.20

MyBanner.finish()

### import
本ステップの処理で依存するモジュールを読み込みます。

In [None]:
MyBanner.start()

from mylib.MyEmbedding import MyEmbedding
from mylib.MyMilvus import MyMilvus
from mylib.MyOpenLlmList import MyOpenLlmList

MyBanner.finish()

## 1. 生成: ①.Retriever

### 【準備】Embedding Model読込
Embedding Modelをメモリに読み込みます。

In [None]:
MyBanner.start()

embeddings = MyEmbedding.get_model()
print(f"{embeddings=}")

MyBanner.finish()

### 【準備】Vector DB接続
エンベディングで利用するEmbedding Modelと接続情報を渡してベクトルデータベースへ接続します。
- 接続失敗した場合は、Milvus(milvus-standalone コンテナ)が起動しているか確認します

In [None]:
MyBanner.start()

# connect to VectorDB
vector_db = MyMilvus(
    myconstant.VDB_HOST, myconstant.VDB_PORT,
    myconstant.VDB_USER, myconstant.VDB_PASS, embeddings)
print(f"{vector_db=}")

MyBanner.finish()

### 【準備】Doc Store接続
RDBのテーブルに該当するドキュメントストアに接続します。

In [None]:
MyBanner.start()

# connect to a store in Vector DB
docstore_list = vector_db.get_collections()
docstore_name = docstore_list[0]
docstore = vector_db.connect(docstore_name)
print(f"{docstore_list=}")
print(f"{docstore_name=}")
print(f"{docstore=}")

MyBanner.finish()

### 【生成】Retriever Object
類似検索の実行に必要なRetrieverオプジェクトを生成します。

In [None]:
MyBanner.start()

# RAG向けのVDB Retriever生成
retriever = vector_db.get_retriever(docstore)
print(f"{retriever=}")

MyBanner.finish()

In [None]:
MyBanner.start()

# 事前確認
query = "パソコンの使い方を学べるセミナーを教えてください"
my_docs = retriever.invoke(query)
print("* " + "\n---------\n* ".join(doc.page_content for doc in my_docs))

MyBanner.finish()

## 2. 生成: ②.Generator

### 【定義】Generator Class
以下の役割を担うGeneratorクラスを定義してクラスファイルに書き出します。
- 一覧で指定されたOpen LLMをHugging Faceから読み込みメモリで管理する
- クエリを基にして類似検索の実行する
- 類似検索で得られた関連情報とテンプレートを用いたプロンプトを生成する
- 生成したプロンプトでOpen LLMへ回答案作成依頼を連携するchainを生成する

実装に使っているtransformersモジュールの仕様は、Hugging Faceから以下を参照してください。
-  https://huggingface.co/docs/transformers/v4.46.3/ja/model_doc/auto#transformers.AutoTokenizer
-  https://huggingface.co/docs/transformers/ja/model_doc/auto#transformers.AutoTokenizer.from_pretrained
-  https://huggingface.co/docs/transformers/ja/model_doc/auto#transformers.AutoModelForCausalLM.from_pretrained

In [None]:
%%writefile mylib/MyGenerator.py
import torch
from transformers import (AutoModelForCausalLM, AutoTokenizer, pipeline)
from langchain_huggingface import HuggingFacePipeline
from langchain.prompts import PromptTemplate
from langchain.schema.runnable import RunnablePassthrough
import inspect
import json
import sys

from mylib.MyTemplateImpl4Gemma import MyTemplateImpl4Gemma
from mylib.MyTemplateImpl4MsPhi import MyTemplateImpl4MsPhi
from mylib.MyTemplateImpl4OpenCalm import MyTemplateImpl4OpenCalm
from mylib.MyTemplateImpl4RinnaJpGpt import MyTemplateImpl4RinnaJpGpt
from mylib.MyTemplateImpl4Llama3 import MyTemplateImpl4Llama3
from mylib.MyTemplateImpl4DeepSeek import MyTemplateImpl4DeepSeek

class MyGenerator:

    def __init__(self, model_name_list, access_token):
        my_method = inspect.currentframe().f_code.co_name
        prompts = {}
        for index, model_name in enumerate(model_name_list):
            print("\nStart @ [%s] >>> index=%d: [%s]" % (my_method, index, model_name))
            prompts[model_name] = {}
            prompts[model_name]["llm"] = self.__get_custom_llm(model_name, access_token)
            prompts[model_name]["template"] = self.__create_template(model_name)
        print("")
        self.prompts = prompts

    def create_chain(self, retriever, model_name, template = None):
        prompt = self.__get_prompt(model_name)
        if template is None:
            template = prompt["template"].get_template_for_use_retriever()
        template = PromptTemplate.from_template(template)
        chain = (
            {"context": retriever | MyGenerator.__format_docs, "question": RunnablePassthrough()}
            | template
            | prompt["llm"]
        )
        print(f"{model_name=}" + "\n" + ("-" * 30))
        print(f"{chain=}" + "\n" + ("-" * 30))
        return chain

    def create_chain_not_retriever(self, model_name, template = None):
        prompt = self.__get_prompt(model_name)
        if template is None:
            template = prompt["template"].get_template_for_not_retriever()
        template = PromptTemplate.from_template(template)
        chain = (
            {"question": RunnablePassthrough()}
            | template
            | prompt["llm"]
        )
        print(f"{model_name=}" + "\n" + ("-" * 30))
        print(f"{chain=}" + "\n" + ("-" * 30))
        return chain

    def extract_answer_from_response(self, model_name, response):
        prompt = self.__get_prompt(model_name)
        answer = prompt["template"].extract_answer_from_response(response)
        return answer

    def make_template_for_conversation(self, model_name, conversation):
        prompt = self.__get_prompt(model_name)
        addition = prompt["template"].get_additional_template_for_conversation()
        conversation = conversation.replace('{', '{{')
        conversation = conversation.replace('}', '}}')
        conversation += addition
        return conversation

    def get_template(self, model_name):
        prompt = self.__get_prompt(model_name)
        template = prompt["template"].get_template_for_use_retriever()
        return template

    def get_template_not_retriever(self, model_name):
        prompt = self.__get_prompt(model_name)
        template = prompt["template"].get_template_for_not_retriever()
        return template

    def __get_prompt(self, model_name):
        return self.prompts[model_name]

    # Replace similarity informations retrieved from vector db into a placeholder of context.
    @staticmethod
    def __format_docs(docs):
        # return "* " + "\n* ".join(doc.page_content for doc in docs)
        index = 0
        content = ""
        for doc in docs:
            index += 1
            try:
                jobj = json.loads(doc.page_content)
                content += ("- %d:\n" % (index))
                for mykey in jobj.keys():
                    content += ("\t- %s: %s\n" % (mykey, jobj[mykey]))
            except json.JSONDecodeError as e:
                print(sys.exc_info())
                print(e)
                content += ("- %s\n" % (doc.page_content))
        return content

    def __get_custom_llm(self, trained_model_name, access_token):
        my_method = inspect.currentframe().f_code.co_name
        print(">>> 1/4[%s]: model = AutoModelForCausalLM.from_pretrained()" % (my_method))
        model = AutoModelForCausalLM.from_pretrained(
            trained_model_name,
            device_map = "auto",
            low_cpu_mem_usage = True,
            torch_dtype = "auto",
            trust_remote_code = True,
            token = access_token,
        )
        print(">>> 2/4[%s]: tokenizer = AutoTokenizer.from_pretrained()" % (my_method))
        tokenizer = AutoTokenizer.from_pretrained(
            trained_model_name,
            token = access_token
        )
        print(">>> 3/4[%s]: pipe = pipeline()" % (my_method))
        pipe = pipeline(
            'text-generation',
            model = model,
            tokenizer = tokenizer,
            max_new_tokens = 1024,
            torch_dtype = "auto",
        )
        print(">>> 4/4[%s]: llm = HuggingFacePipeline()" % (my_method))
        llm = HuggingFacePipeline(
            pipeline=pipe
        )
        return llm

    def __create_template(self, model_name):
        template_obj = None;
        if 'google/gemma' in model_name:
            template_obj = MyTemplateImpl4Gemma()
        elif 'microsoft/Phi' in model_name:
            template_obj = MyTemplateImpl4MsPhi()
        elif 'cyberagent/open-calm' in model_name:
            template_obj = MyTemplateImpl4OpenCalm()
        elif 'rinna/japanese-gpt' in model_name:
            template_obj = MyTemplateImpl4RinnaJpGpt()
        elif 'meta-llama/Llama' in model_name:
            template_obj = MyTemplateImpl4Llama3()
        elif '/DeepSeek' in model_name:
            template_obj = MyTemplateImpl4DeepSeek()
        return template_obj


### 【生成】Open LLM一覧
取り扱うOpen LLM名の一覧を管理するオブジェクトを生成します。

In [None]:
MyBanner.start()

openllm_list = MyOpenLlmList([2])

MyBanner.finish()

### 【生成】Generator Ojbect
プロンプトを使ってLLMへ回答案の作成依頼を連携するGeneratorオプジェクトを生成します。

In [None]:
MyBanner.start()
from mylib.MyGenerator import MyGenerator

generator = MyGenerator(openllm_list.getAll(), HF_ACCESS_TOKEN)
print(f"{generator=}")

MyBanner.finish()

## 4. 生成: chain (=①+②)

In [None]:
MyBanner.start()

# chain作成
chain = generator.create_chain(retriever, openllm_list.get(0))
# chain = generator.create_chain_not_retriever(openllm_list.get(0))

MyBanner.finish()

## 5. 拡張検索(RAG)実行

In [None]:
MyBanner.start()

#query="Excelを使いこなしたい"
#query="AIに関するセッションの詳細を教えてください"
query = "パソコンの使い方を学べるセミナーはありますか？"

print(f"{chain.invoke(query)=}")

MyBanner.finish()

## 6. 本ステップを終えて

ここまでの手順でRAGの一通りの実装を経験しました。次のステップではここまでに経験してきたナレッジを活用して、簡易的なRAGアプリケーションの構築を経験します。
- 次のStep ≫ [Step 5: Web UI (Chatting with Open LLM)](./rag-step05-web_ui_to_chat_with_llm.ipynb)
- 今のStep ≫ Step 4: Retriever+Generator構築
- 前のStep ≫ [Step 3: LLM Template作成](./rag-step03-llm_template.ipynb)