## RunnableParallel: 데이터 병렬 조작

### 입력 및 출력 조작

In [1]:
# !pip install -qU langchain langchain-openai

In [3]:
from dotenv import load_dotenv

load_dotenv()

True

In [4]:
from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings

# 텍스트로부터 FAISS 벡터 저장소를 생성합니다.
vectorstore = FAISS.from_texts(
    ["Teddy is an AI engineer who loves programming!"], embedding=OpenAIEmbeddings()
)
# 벡터 저장소를 검색기로 사용합니다.
retriever = vectorstore.as_retriever()
# 템플릿을 정의합니다.
template = """Answer the question based only on the following context:
{context}

Question: {question}
"""
# 템플릿으로부터 채팅 프롬프트를 생성합니다.
prompt = ChatPromptTemplate.from_template(template)

# ChatOpenAI 모델을 초기화합니다.
model = ChatOpenAI()

# 검색 체인을 구성합니다.
retrieval_chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | model
    | StrOutputParser()
)

# 검색 체인을 실행하여 질문에 대한 답변을 얻습니다.
retrieval_chain.invoke("What is Teddy's occupation?")

'Teddy is an AI engineer.'

### itemgetter를 단축어로 사용하기

RunnableParallel과 결합할 때 Python의 itemgetter를 단축어로 사용하여 map에서 데이터를 추출할 수 있습니다.


In [5]:
from operator import itemgetter

from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI, OpenAIEmbeddings

# 텍스트로부터 FAISS 벡터 저장소를 생성합니다.
vectorstore = FAISS.from_texts(
    ["Teddy is an AI engineer who loves programming!"], embedding=OpenAIEmbeddings()
)
# 벡터 저장소를 검색기로 사용합니다.
retriever = vectorstore.as_retriever()

# 템플릿을 정의합니다.
template = """Answer the question based only on the following context:
{context}

Question: {question}

Answer in the following language: {language}
"""
# 템플릿으로부터 채팅 프롬프트를 생성합니다.
prompt = ChatPromptTemplate.from_template(template)

# 체인을 구성합니다.
chain = (
    {
        "context": itemgetter("question") | retriever,
        "question": itemgetter("question"),
        "language": itemgetter("language"),
    }
    | prompt
    | ChatOpenAI()
    | StrOutputParser()
)

# 체인을 호출하여 질문에 답변합니다.
chain.invoke({"question": "What is Teddy's occupation?", "language": "Korean"})

'테디의 직업은 AI 엔지니어입니다.'

### 병렬처리를 단계별로 이해

In [6]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel
from langchain_openai import ChatOpenAI

model = ChatOpenAI()  # ChatOpenAI 모델을 초기화합니다.

# 수도를 묻는 질문에 대한 체인을 정의합니다.
capital_chain = (
    ChatPromptTemplate.from_template("{country} 의 수도는 어디입니까?")
    | model
    | StrOutputParser()
)

# 면적을 묻는 질문에 대한 체인을 정의합니다.
area_chain = (
    ChatPromptTemplate.from_template("{country} 의 면적은 얼마입니까?")
    | model
    | StrOutputParser()
)

# capital_chain, area_chain 을 병렬로 실행할 수 있는 RunnableParallel 객체를 생성합니다.
map_chain = RunnableParallel(capital=capital_chain, area=area_chain)

# map_chain을 호출하여 대한민국의 수도와 면적을 묻습니다.
map_chain.invoke({"country": "대한민국"})

{'capital': '대한민국의 수도는 서울입니다.', 'area': '대한민국의 면적은 약 100,363 제곱 킬로미터 입니다.'}

In [7]:
# 수도를 묻는 질문에 대한 체인을 정의합니다.
capital_chain2 = (
    ChatPromptTemplate.from_template("{country1} 의 수도는 어디입니까?")
    | model
    | StrOutputParser()
)

# 면적을 묻는 질문에 대한 체인을 정의합니다.
area_chain2 = (
    ChatPromptTemplate.from_template("{country2} 의 면적은 얼마입니까?")
    | model
    | StrOutputParser()
)

# capital_chain, area_chain 을 병렬로 실행할 수 있는 RunnableParallel 객체를 생성합니다.
map_chain = RunnableParallel(capital=capital_chain2, area=area_chain2)

# map_chain을 호출합니다. 이때 각각의 key에 대한 value를 전달합니다.
map_chain.invoke({"country1": "대한민국", "country2": "미국"})

{'capital': '대한민국의 수도는 서울입니다.', 'area': '미국의 총 면적은 약 9,833,520 km² 입니다.'}

### 병렬 처리

In [8]:
%%timeit

# 면저을 묻는 체인을 호출하고 실행 시간을 측정합니다.
area_chain.invoke({"country": "대한민국"})

893 ms ± 252 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [9]:
%%timeit

# 수도를 묻는 체인을 호출하고 실행 시간을 측정합니다.
capital_chain.invoke({"country": "대한민국"})

742 ms ± 57.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [12]:
%%timeit

# Parallel 하게 구성된 체인을 호출하고 실행 시간을 측정합니다.
map_chain.invoke({"country1": "대한민국", "country2": "대한민국"})

The slowest run took 4.65 times longer than the fastest. This could mean that an intermediate result is being cached.
1.33 s ± 1.05 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
