## LCEL 인터페이스


사용자 정의 체인을 가능한 쉽게 만들 수 있도록, [`Runnable`](https://api.python.langchain.com/en/stable/runnables/langchain_core.runnables.base.Runnable.html#langchain_core.runnables.base.Runnable) 프로토콜을 구현했습니다. 

`Runnable` 프로토콜은 대부분의 컴포넌트에 구현되어 있습니다.

이는 표준 인터페이스로, 사용자 정의 체인을 정의하고 표준 방식으로 호출하는 것을 쉽게 만듭니다.
표준 인터페이스에는 다음이 포함됩니다.

- [`stream`](#stream): 응답의 청크를 스트리밍합니다.
- [`invoke`](#invoke): 입력에 대해 체인을 호출합니다.
- [`batch`](#batch): 입력 목록에 대해 체인을 호출합니다.

비동기 메소드도 있습니다.

- [`astream`](#async-stream): 비동기적으로 응답의 청크를 스트리밍합니다.
- [`ainvoke`](#async-invoke): 비동기적으로 입력에 대해 체인을 호출합니다.
- [`abatch`](#async-batch): 비동기적으로 입력 목록에 대해 체인을 호출합니다.
- [`astream_log`](#async-stream-intermediate-steps): 최종 응답뿐만 아니라 발생하는 중간 단계를 스트리밍합니다.

In [1]:
# API KEY를 환경변수로 관리하기 위한 설정 파일
from dotenv import load_dotenv

# API KEY 정보로드
load_dotenv()

True

In [2]:
# LangSmith 추적을 설정합니다. https://smith.langchain.com
# !pip install -qU langchain-teddynote
from langchain_teddynote import logging

# 프로젝트 이름을 입력합니다.
logging.langsmith("CH01-Basic-04-LCEL-Advanced-Gemini")

LangSmith 추적을 시작합니다.
[프로젝트명]
project_name='CH01-Basic-04-LCEL-Advanced-Gemini'
LANGSMITH_PROJECT: CH01-Basic-04-LCEL-Advanced-Gemini


LCEL 문법을 사용하여 chain 을 생성합니다.

In [3]:
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_google_genai import ChatGoogleGenerativeAI
from pydantic import SecretStr
import os


# 키 로드
api_key = os.getenv("GEMINI_API_KEY")

# LLM 생성
# 모델 종류: https://ai.google.dev/gemini-api/docs/models?hl=ko
# gemini-2.5-pro
# gemini-2.5-flash
# gemini-2.5-flash-lite
model = ChatGoogleGenerativeAI(
    model="gemini-2.5-flash",
    google_api_key=SecretStr(str(api_key)),
    temperature=0.5,
)
# 주어진 토픽에 대한 농담을 요청하는 프롬프트 템플릿을 생성합니다.
prompt = PromptTemplate.from_template("{topic} 에 대하여 3문장으로 설명해줘.")
# 프롬프트와 모델을 연결하여 대화 체인을 생성합니다.
chain = prompt | model | StrOutputParser()

## stream: 실시간 출력


이 함수는 `chain.stream` 메서드를 사용하여 주어진 토픽에 대한 데이터 스트림을 생성하고, 이 스트림을 반복하여 각 데이터의 내용(`content`)을 즉시 출력합니다. `end=""` 인자는 출력 후 줄바꿈을 하지 않도록 설정하며, `flush=True` 인자는 출력 버퍼를 즉시 비우도록 합니다. 

In [4]:
# chain.stream 메서드를 사용하여 '멀티모달' 토픽에 대한 스트림을 생성하고 반복합니다.
for token in chain.stream({"topic": "멀티모달"}):
    # 스트림에서 받은 데이터의 내용을 출력합니다. 줄바꿈 없이 이어서 출력하고, 버퍼를 즉시 비웁니다.
    print(token, end="", flush=True)

멀티모달 AI는 텍스트, 이미지, 오디오 등 여러 형태의 데이터를 함께 이해하고 처리하는 인공지능 기술입니다. 이는 마치 인간이 보고, 듣고, 읽는 등 다양한 감각을 통해 세상을 종합적으로 인지하는 방식과 유사합니다. 따라서 더 풍부하고 정확한 정보를 바탕으로 상황을 깊이 이해하고, 복잡한 문제 해결이나 자연스러운 인간-AI 상호작용을 가능하게 합니다.

## invoke: 호출


`chain` 객체의 `invoke` 메서드는 주제를 인자로 받아 해당 주제에 대한 처리를 수행합니다.

In [5]:
# chain 객체의 invoke 메서드를 호출하고, 'ChatGPT'라는 주제로 딕셔너리를 전달합니다.
chain.invoke({"topic": "ChatGPT"})

'ChatGPT는 OpenAI가 개발한 **인공지능 챗봇**입니다. 방대한 데이터를 학습하여 **사람처럼 자연스러운 대화를 나누고 텍스트를 생성**하는 능력을 가졌습니다. 질문 답변, 글쓰기, 요약, 번역 등 **다양한 작업을 수행하며 사용자에게 도움**을 제공합니다.'

## batch: 배치(단위 실행)


함수 `chain.batch`는 여러 개의 딕셔너리를 포함하는 리스트를 인자로 받아, 각 딕셔너리에 있는 `topic` 키의 값을 사용하여 일괄 처리를 수행합니다.

In [6]:
# 주어진 토픽 리스트를 batch 처리하는 함수 호출
answer = chain.batch([{"topic": "Gemini"}, {"topic": "Instagram"}])

In [7]:
len(answer)
answer[0]

"Gemini는 구글이 개발한 최신 인공지능 모델입니다. 텍스트, 이미지, 오디오 등 다양한 형태의 정보를 이해하고 생성할 수 있는 '멀티모달' 능력을 갖춘 것이 특징입니다. 이는 복잡한 추론과 문제 해결 능력을 바탕으로 구글의 다양한 제품과 서비스에 활용되고 있습니다."

In [8]:
answer[1]

"Instagram은 사진과 짧은 동영상을 공유하며 소통하는 시각 중심의 소셜 미디어 플랫폼입니다. 사용자들은 피드, 스토리, 릴스 등을 통해 자신의 일상이나 관심사를 공유하고, 다른 사람들의 게시물을 탐색하며 '좋아요'나 댓글로 교류합니다. 이를 통해 친구, 가족과 연결되고, 새로운 콘텐츠를 발견하며, 다양한 커뮤니티와 교류할 수 있습니다."

`max_concurrency` 매개변수를 사용하여 동시 요청 수를 설정할 수 있습니다

`config` 딕셔너리는 `max_concurrency` 키를 통해 동시에 처리할 수 있는 최대 작업 수를 설정합니다. 여기서는 최대 3개의 작업을 동시에 처리하도록 설정되어 있습니다.

In [9]:
chain.batch(
    [
        {"topic": "ChatGPT"},
        {"topic": "Instagram"},
        {"topic": "멀티모달"},
        {"topic": "프로그래밍"},
        {"topic": "머신러닝"},
    ],
    config={"max_concurrency": 3},
)

['ChatGPT는 OpenAI가 개발한 대규모 언어 모델(LLM) 기반의 인공지능 챗봇입니다. 사용자의 질문이나 지시에 따라 사람과 유사한 텍스트를 생성하며 자연스러운 대화가 가능합니다. 정보 탐색, 글쓰기, 요약, 번역 등 광범위한 분야에서 사용자에게 유용한 도움을 제공합니다.',
 'Instagram은 사진과 짧은 동영상을 공유하는 데 중점을 둔 세계적인 소셜 미디어 플랫폼입니다. 사용자들은 자신의 일상을 시각적으로 공유하고, 친구나 관심사를 팔로우하며 소통합니다. 피드, 스토리, 릴스 등 다양한 기능을 통해 콘텐츠를 탐색하고 상호작용할 수 있습니다.',
 '멀티모달은 텍스트, 이미지, 음성, 영상 등 여러 가지 형태의 데이터를 동시에 이해하고 처리하는 인공지능 기술을 말합니다. 이는 마치 사람이 시각, 청각 등 다양한 감각을 활용하여 세상을 인지하듯, AI가 더 풍부한 정보를 통합하여 상황을 파악하게 합니다. 이를 통해 AI는 더욱 복합적인 질문에 답하거나 현실 세계를 더 정확하게 인식하며, 사람과 자연스럽게 소통하는 능력을 향상시킵니다.',
 '프로그래밍은 컴퓨터에게 특정 작업을 수행하도록 지시하는 과정입니다. 이는 파이썬, 자바 등 프로그래밍 언어라는 약속된 규칙을 사용하여 코드를 작성하는 방식으로 이루어집니다. 최종적으로는 이러한 지시를 통해 컴퓨터가 문제 해결, 자동화 등 원하는 기능을 수행하는 소프트웨어나 애플리케이션을 만드는 것입니다.',
 '머신러닝은 컴퓨터가 **명시적인 프로그래밍 없이** 데이터를 분석하여 **스스로 학습**하고 패턴을 찾아내는 인공지능의 한 분야입니다. 이는 대량의 데이터 속에서 숨겨진 규칙이나 경향을 파악하여 미래를 예측하거나 새로운 결정을 내리는 데 활용됩니다. 스팸 메일 분류, 이미지 인식, 추천 시스템 등 다양한 분야에서 복잡한 문제 해결에 기여하고 있습니다.']

## async stream: 비동기 스트림


함수 `chain.astream`은 비동기 스트림을 생성하며, 주어진 토픽에 대한 메시지를 비동기적으로 처리합니다.

비동기 for 루프(`async for`)를 사용하여 스트림에서 메시지를 순차적으로 받아오고, `print` 함수를 통해 메시지의 내용(`s.content`)을 즉시 출력합니다. `end=""`는 출력 후 줄바꿈을 하지 않도록 설정하며, `flush=True`는 출력 버퍼를 강제로 비워 즉시 출력되도록 합니다.


In [10]:
# 비동기 스트림을 사용하여 'YouTube' 토픽의 메시지를 처리합니다.
async for token in chain.astream({"topic": "YouTube"}):
    # 메시지 내용을 출력합니다. 줄바꿈 없이 바로 출력하고 버퍼를 비웁니다.
    print(token, end="", flush=True)

YouTube는 구글 소유의 세계 최대 동영상 공유 플랫폼입니다. 사용자들은 엔터테인먼트, 교육, 뉴스 등 다양한 주제의 영상을 업로드하고 시청하며, 댓글로 소통할 수 있습니다. 전 세계 누구나 자유롭게 콘텐츠를 제작하고 소비하는 글로벌 미디어 허브 역할을 합니다.

## async invoke: 비동기 호출


`chain` 객체의 `ainvoke` 메서드는 비동기적으로 주어진 인자를 사용하여 작업을 수행합니다. 여기서는 `topic`이라는 키와 `NVDA`(엔비디아의 티커) 라는 값을 가진 딕셔너리를 인자로 전달하고 있습니다. 이 메서드는 특정 토픽에 대한 처리를 비동기적으로 요청하는 데 사용될 수 있습니다.


In [11]:
# 비동기 체인 객체의 'ainvoke' 메서드를 호출하여 'NVDA' 토픽을 처리합니다.
my_process = chain.ainvoke({"topic": "NVDA"})

In [12]:
# 비동기로 처리되는 프로세스가 완료될 때까지 기다립니다.
await my_process

Retrying langchain_google_genai.chat_models._achat_with_retry.<locals>._achat_with_retry in 2.0 seconds as it raised ResourceExhausted: 429 You exceeded your current quota, please check your plan and billing details. For more information on this error, head to: https://ai.google.dev/gemini-api/docs/rate-limits. [violations {
  quota_metric: "generativelanguage.googleapis.com/generate_content_free_tier_requests"
  quota_id: "GenerateRequestsPerMinutePerProjectPerModel-FreeTier"
  quota_dimensions {
    key: "location"
    value: "global"
  }
  quota_dimensions {
    key: "model"
    value: "gemini-2.5-flash"
  }
  quota_value: 10
}
, links {
  description: "Learn more about Gemini API quotas"
  url: "https://ai.google.dev/gemini-api/docs/rate-limits"
}
, retry_delay {
  seconds: 41
}
].


ResourceExhausted: 429 You exceeded your current quota, please check your plan and billing details. For more information on this error, head to: https://ai.google.dev/gemini-api/docs/rate-limits. [violations {
  quota_metric: "generativelanguage.googleapis.com/generate_content_free_tier_requests"
  quota_id: "GenerateRequestsPerMinutePerProjectPerModel-FreeTier"
  quota_dimensions {
    key: "location"
    value: "global"
  }
  quota_dimensions {
    key: "model"
    value: "gemini-2.5-flash"
  }
  quota_value: 10
}
, links {
  description: "Learn more about Gemini API quotas"
  url: "https://ai.google.dev/gemini-api/docs/rate-limits"
}
, retry_delay {
  seconds: 39
}
]

## async batch: 비동기 배치


함수 `abatch`는 비동기적으로 일련의 작업을 일괄 처리합니다.

이 예시에서는 `chain` 객체의 `abatch` 메서드를 사용하여 `topic` 에 대한 작업을 비동기적으로 처리하고 있습니다.

`await` 키워드는 해당 비동기 작업이 완료될 때까지 기다리는 데 사용됩니다.


In [13]:
# 주어진 토픽에 대해 비동기적으로 일괄 처리를 수행합니다.
my_abatch_process = chain.abatch(
    [{"topic": "YouTube"}, {"topic": "Instagram"}, {"topic": "Facebook"}]
)

In [14]:
# 비동기로 처리되는 일괄 처리 프로세스가 완료될 때까지 기다립니다.
await my_abatch_process

['유튜브는 전 세계에서 가장 큰 동영상 공유 플랫폼입니다. 사용자들은 교육, 엔터테인먼트, 뉴스 등 매우 다양한 주제의 영상을 업로드하고 시청하며 공유할 수 있습니다. 이를 통해 전 세계 수많은 사람들이 정보를 얻고 소통하며 즐거움을 누리는 주요 채널로 자리매김했습니다.',
 'Instagram은 사진과 짧은 동영상을 공유하며 소통하는 대표적인 소셜 미디어 플랫폼입니다. 사용자들은 자신의 일상을 시각적으로 표현하고, 친구나 관심사를 가진 사람들과 팔로우하며 교류합니다. 다양한 콘텐츠를 탐색하고 새로운 정보를 얻거나 영감을 받을 수 있는 공간입니다.',
 '페이스북은 전 세계 수십억 명이 사용하는 가장 큰 소셜 네트워킹 플랫폼 중 하나입니다. 사용자들은 친구 및 가족과 연결하고, 개인적인 소식, 사진, 동영상 등을 공유하며 소통할 수 있습니다. 또한 관심사 기반의 그룹에 참여하거나, 다양한 뉴스 및 정보를 접하고, 비즈니스 페이지를 통해 정보를 얻는 등 다채로운 활동이 가능합니다.']

## Parallel: 병렬성

LangChain Expression Language가 병렬 요청을 지원하는 방법을 살펴봅시다.
예를 들어, `RunnableParallel`을 사용할 때, 각 요소를 병렬로 실행합니다.


`langchain_core.runnables` 모듈의 `RunnableParallel` 클래스를 사용하여 두 가지 작업을 병렬로 실행하는 예시를 보여줍니다.

`ChatPromptTemplate.from_template` 메서드를 사용하여 주어진 `country`에 대한 **수도** 와 **면적** 을 구하는 두 개의 체인(`chain1`, `chain2`)을 만듭니다.

이 체인들은 각각 `model`과 파이프(`|`) 연산자를 통해 연결됩니다. 마지막으로, `RunnableParallel` 클래스를 사용하여 이 두 체인을 `capital`와 `area`이라는 키로 결합하여 동시에 실행할 수 있는 `combined` 객체를 생성합니다.


In [15]:
from langchain_core.runnables import RunnableParallel

# {country} 의 수도를 물어보는 체인을 생성합니다.
chain1 = (
    PromptTemplate.from_template("{country} 의 수도는 어디야?")
    | model
    | StrOutputParser()
)

# {country} 의 면적을 물어보는 체인을 생성합니다.
chain2 = (
    PromptTemplate.from_template("{country} 의 면적은 얼마야?")
    | model
    | StrOutputParser()
)

# 위의 2개 체인을 동시에 생성하는 병렬 실행 체인을 생성합니다.
combined = RunnableParallel(capital=chain1, area=chain2)

`chain1.invoke()` 함수는 `chain1` 객체의 `invoke` 메서드를 호출합니다.

이때, `country`이라는 키에 `대한민국`라는 값을 가진 딕셔너리를 인자로 전달합니다.


In [16]:
# chain1 를 실행합니다.
chain1.invoke({"country": "대한민국"})

'대한민국의 수도는 **서울**입니다.'

이번에는 `chain2.invoke()` 를 호출합니다. `country` 키에 다른 국가인 `미국` 을 전달합니다.


In [17]:
# chain2 를 실행합니다.
chain2.invoke({"country": "미국"})

'미국의 총 면적은 약 **983만 4천 제곱킬로미터 (km²)** 입니다.\n\n이는 약 **380만 제곱마일 (sq mi)** 에 해당합니다.\n\n이 수치는 육지 및 내수면을 포함한 총 면적이며, 자료 출처에 따라 약간의 차이가 있을 수 있습니다.'

`combined` 객체의 `invoke` 메서드는 주어진 `country`에 대한 처리를 수행합니다.

이 예제에서는 `대한민국`라는 주제를 `invoke` 메서드에 전달하여 실행합니다.


In [18]:
# 병렬 실행 체인을 실행합니다.
combined.invoke({"country": "대한민국"})

{'capital': '대한민국의 수도는 **서울**입니다.',
 'area': '대한민국의 면적은 **약 100,363 제곱킬로미터(km²)** 입니다.\n\n일반적으로는 **약 10만 제곱킬로미터**라고도 많이 알려져 있습니다.'}

### 배치에서의 병렬 처리

병렬 처리는 다른 실행 가능한 코드와 결합될 수 있습니다.
배치와 병렬 처리를 사용해 보도록 합시다.


`chain1.batch` 함수는 여러 개의 딕셔너리를 포함하는 리스트를 인자로 받아, 각 딕셔너리에 있는 "topic" 키에 해당하는 값을 처리합니다. 이 예시에서는 "대한민국"와 "미국"라는 두 개의 토픽을 배치 처리하고 있습니다.


In [19]:
# 배치 처리를 수행합니다.
chain1.batch([{"country": "대한민국"}, {"country": "미국"}])

['대한민국의 수도는 **서울**입니다.', '미국의 수도는 **워싱턴 D.C.** 입니다.']

`chain2.batch` 함수는 여러 개의 딕셔너리를 리스트 형태로 받아, 일괄 처리(batch)를 수행합니다.

이 예시에서는 `대한민국`와 `미국`라는 두 가지 국가에 대한 처리를 요청합니다.


In [20]:
# 배치 처리를 수행합니다.
chain2.batch([{"country": "대한민국"}, {"country": "미국"}])

['대한민국(남한)의 면적은 약 **100,363 제곱킬로미터(km²)**입니다.\n\n보통 **약 10만 제곱킬로미터**라고도 많이 이야기합니다.\n\n이는 한반도 전체 면적(약 22만 제곱킬로미터)의 절반이 조금 안 되는 수준입니다.',
 '미국의 총 면적은 약 **983만 4천 제곱킬로미터 (km²)** 입니다.\n\n이는 약 **380만 5천 제곱마일 (mi²)** 에 해당합니다.\n\n이 면적에는 육지와 내수면(강, 호수 등)이 모두 포함됩니다. 정확한 수치는 측정 기준에 따라 약간의 차이가 있을 수 있습니다.']

`combined.batch` 함수는 주어진 데이터를 배치로 처리하는 데 사용됩니다. 이 예시에서는 두 개의 딕셔너리 객체를 포함하는 리스트를 인자로 받아 각각 `대한민국`와 `미국` 두 나라에 대한 데이터를 배치 처리합니다.


In [21]:
# 주어진 데이터를 배치로 처리합니다.
combined.batch([{"country": "대한민국"}, {"country": "미국"}])

[{'capital': '대한민국의 수도는 **서울**입니다.',
  'area': '대한민국의 면적은 약 **100,363 제곱킬로미터 (km²)** 입니다.\n\n이는 남한만의 면적을 기준으로 하며, 측정 시기나 방법에 따라 약간의 차이가 있을 수 있습니다.\n\n참고로, 한반도 전체 면적(남한+북한)은 약 22만 제곱킬로미터입니다.'},
 {'capital': '미국의 수도는 **워싱턴 D.C.**입니다.',
  'area': '미국의 면적은 약 **9,834,000 제곱 킬로미터 (km²)** 입니다.\n\n이는 약 **3,797,000 제곱 마일 (mi²)** 에 해당합니다.\n\n이 면적은 주로 육지 면적과 내수면(호수, 강 등)을 포함한 총 면적을 기준으로 합니다.'}]