# RunnableParallel: 데이터 병렬 조작


## 입력 및 출력 조작
`RunnableParallel` 은 시퀀스 내에서 하나의 `Runnable` 의 출력을 다음 `Runnable` 의 입력 형식에 맞게 조작하는 데 유용하게 사용될 수 있습니다.

여기서 prompt에 대한 입력은 "context"와 "question"이라는 키를 가진 map 형태로 예상됩니다.

사용자 입력은 단순히 질문 내용입니다. 따라서 retriever를 사용하여 컨텍스트를 가져오고, 사용자 입력을 "question" 키 아래에 전달해야 합니다.

In [1]:
from dotenv import load_dotenv
load_dotenv()

True

In [2]:
from langchain_community.vectorstores import Chroma
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings

In [4]:
embedding = OpenAIEmbeddings(model="text-embedding-3-small")
llm = ChatOpenAI(model='gpt-4o-mini')

vectorsotre = Chroma.from_texts(
    ["Jaeho is an AI engineer who loves Jinwon!"], embedding=embedding
)

retriever = vectorsotre.as_retriever()

template = """Answer the question based only on the following context:
{context}

Question: {question}
"""

prompt = ChatPromptTemplate.from_template(template)

# 검색 체인을 구성합니다.
retrieval_chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)

# 검색 체인을 실행하여 질문에 대한 답변을 얻습니다.
retrieval_chain.invoke("What is Jaeho occupation?")

Number of requested results 4 is greater than number of elements in index 2, updating n_results = 2


'Jaeho is an AI engineer.'

In [5]:
retrieval_chain.invoke("Who is that Jaeho love?")

Number of requested results 4 is greater than number of elements in index 2, updating n_results = 2


'Jaeho loves Jinwon.'

In [None]:
# 자체 RunnableParallel 로 래핑됨
# 모두 같음
from langchain_core.runnables import RunnableParallel
{"context": retriever, "question": RunnablePassthrough()}

RunnableParallel({"context": retriever, "question": RunnablePassthrough()})

RunnableParallel(context=retriever, question=RunnablePassthrough())

In [6]:
# Itemgetter

from operator import itemgetter

from langchain_community.vectorstores import Chroma
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI, OpenAIEmbeddings


embedding = OpenAIEmbeddings(model="text-embedding-3-small")
llm = ChatOpenAI(model='gpt-4o-mini')

vectorsotre = Chroma.from_texts(
    ["Jaeho is an AI engineer who loves Jinwon!"], embedding=embedding
)

retriever = vectorsotre.as_retriever()

template = """Answer the question based only on the following context:
{context}

Question: {question}

Answer in the following language: {language}
"""

prompt = ChatPromptTemplate.from_template(template)

chain = (
    {
        "context": itemgetter("question") | retriever,
        "question": itemgetter("question"),
        "language": itemgetter("language"),
    }
    | prompt
    | llm
    | StrOutputParser()
)

chain.invoke({"question": "Who is that Jaeho loves?", "language": "Korean"})

Number of requested results 4 is greater than number of elements in index 3, updating n_results = 3


'재호는 진원을 사랑합니다!'

# 병렬처리: RunnableParallel

In [8]:
# 병렬처리: RunnableParallel
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel
from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="gpt-4o-mini")

# 수도를 묻는 질문에 대한 체인을 정의합니다.
capital_chain = (
    ChatPromptTemplate.from_template("{country} 의 수도는 어디입니까?")
    | model
    | StrOutputParser()
)

# 면적을 묻는 질문에 대한 체인을 정의합니다.
area_chain = (
    ChatPromptTemplate.from_template("{country} 의 면적은 얼마입니까?")
    | model
    | StrOutputParser()
)

# capital_chain, area_chain 을 병렬로 실행할 수 있는 RunnableParallel 객체를 생성합니다.
map_chain = RunnableParallel(capital=capital_chain, area=area_chain)

# map_chain을 호출하여 대한민국의 수도와 면적을 묻습니다.
map_chain.invoke({"country": "대한민국"})

{'capital': '대한민국의 수도는 서울입니다.',
 'area': '대한민국의 면적은 약 100,210 제곱킬로미터입니다. 이는 한반도의 남쪽 부분에 해당하며, 북한과 함께 한반도를 구성하고 있습니다.'}

In [9]:
# 수도를 묻는 질문에 대한 체인을 정의합니다.
capital_chain2 = (
    ChatPromptTemplate.from_template("{country1} 의 수도는 어디입니까?")
    | model
    | StrOutputParser()
)

# 면적을 묻는 질문에 대한 체인을 정의합니다.
area_chain2 = (
    ChatPromptTemplate.from_template("{country2} 의 면적은 얼마입니까?")
    | model
    | StrOutputParser()
)

# capital_chain, area_chain 을 병렬로 실행할 수 있는 RunnableParallel 객체를 생성합니다.
map_chain2 = RunnableParallel(capital=capital_chain2, area=area_chain2)

# map_chain을 호출합니다. 이때 각각의 key에 대한 value를 전달합니다.
map_chain2.invoke({"country1": "대한민국", "country2": "미국"})

{'capital': '대한민국의 수도는 서울입니다.',
 'area': '미국의 면적은 약 9,830,000 평방킬로미터(3,796,000 평방마일)입니다. 이는 미국이 세계에서 세 번째로 큰 나라임을 의미합니다.'}