# Building the FastAPI Backend using Langserve

먼저 지금까지 스마트 봇을 배포하면서 어떤 성과를 거두었는지 살펴보겠습니다. 

1) **Notebook 12**: Azure Bot 서비스를 사용하여 백엔드 API를 배포하는 방법에 대한 지침/가이드.
2) **Notebook 13**: POST Request를 사용하여 프로그래밍 방식으로 봇 서비스와 인터페이스하는 방법에 대한 지침/가이드.

다음은 봇 서비스 사용의 장/단점입니다.

**장점**:
- O365 이메일, MS Teams, 웹 채팅 플러그인 등 여러 채널에 쉽게 연결할 수 있습니다.
- 봇 프레임워크 Python SDK는 입력 표시기(typing indicators), 사전(proactive) 메시지, 카드, 파일 업로드 등과 같은 다양한 유틸리티를 제공합니다.
- 인증 및 로깅 메커니즘이 내장되어 있어 최소한의 노력만 필요합니다.
- Python, JavaScript, .NET용 SDK를 제공합니다.
- 애플리케이션 모니터링을 위해 애플리케이션 인사이트 서비스와 쉽게 통합할 수 있습니다.
- 다른 Microsoft 서비스와 마찬가지로 Microsoft 제품 및 지원 팀의 지원을 받습니다.

**단점**:
- 아직 스트리밍을 지원하지 않습니다.
- Private Endpoint(비공개 엔드포인트)에 대한 지원이 부족합니다.
- 서비스로서 컨테이너화하거나 Kubernetes, 컨테이너 앱 등에서 실행할 수 없습니다.
- 모든 기능을 완전히 이해하려면 더 가파른 학습 곡선이 필요합니다.

대안으로, 이 노트북에서는 LangServe와 함께 FastAPI를 사용하여 또 다른 백엔드 API를 구축하겠습니다. <br>이 API는 독립형이므로 Docker 컨테이너에 패키징하여 어디에나 배포할 수 있습니다. 

이 노트북에서는 코드를 압축하여 BotService API가 있는 동일한 Azure 웹 앱 서비스의 새 슬롯에 업로드하겠습니다.


[LANGSERVE DOCUMENTATION](https://python.langchain.com/docs/langserve) 에서..

    LangServe는 개발자가 LangChain 런처블과 체인을 REST API로 배포하는 데 도움을 줍니다.

    이 라이브러리는 FastAPI와 통합되어 있으며 데이터 유효성 검사를 위해 pydantic을 사용합니다.

    또한 서버에 배포된 runnable을 호출하는 데 사용할 수 있는 클라이언트를 제공합니다. JavaScript 클라이언트는 LangChain.js에서 사용할 수 있습니다.

## The main file: Server.py

봇 서비스 API의 메인 코드가 bot.py에 있는 것처럼, 이 FastAPI 백엔드에서는 메인 코드가 `apps/backend/langserve/app/server.py`에 있습니다.

**한 번 살펴보세요!**

`server.py`를 클릭하면, 4개의 엔드포인트가 생성된 것을 확인할 수 있습니다.

- `/docs/` 
  - 이 엔드포인트는 API의 OpenAPI 정의(Swagger)를 보여줍니다.
- `/chatgpt/`
  - 이 엔드포인트는 간단한 LLM을 사용하여 시스템 프롬프트 없이 응답합니다.
- `/joke/`
  - 이 엔드포인트는 LLM + 프롬프트 + 사용자 정의 구조화된 json 출력(서버의 타임스탬프 추가)이 있는 체인을 사용합니다.
- `/agent/`
  - 스마트 GPT 봇 브레인 에이전트의 엔드포인트는 다음과 같습니다.
  
모든 엔드포인트에 대해 이러한 모든 경로를 사용할 수 있습니다: `/invoke/`, `/batch/`, `/stream/` and `/stream_events/`

## Azure App service 에 배포하기

`apps/backend/langserve/README.md`에서 코드를 압축하여 Azure 웹 앱에 업로드하는 방법에 대한 모든 지침을 찾을 수 있습니다. 

봇 서비스 API용으로 만든 것과 동일한 Azure 웹 앱 서비스를 사용하겠습니다.

=> **GO AHEAD NOW AND FOLLOW THE INSTRUCTIONS in `apps/backend/langserve/README.md`**

## (optional) Deploy the server locally

1) `apps/backend/langserve/app/server.py` 파일로 이동하여 다음 코드를 주석 처리하여 로컬에서 테스트합니다.
```python
    ### uncomment this section to run server in local host #########

    # from pathlib import Path
    # from dotenv import load_dotenv
    # # Calculate the path three directories above the current script
    # library_path = Path(__file__).resolve().parents[4]
    # sys.path.append(str(library_path))
    # load_dotenv(str(library_path) + "/credentials.env")
    # os.environ["AZURE_OPENAI_MODEL_NAME"] = os.environ["GPT35_DEPLOYMENT_NAME"]

    ###################################
```
2) 터미널을 열고 올바른 conda 환경을 활성화한 다음, `apps/backend/langserve/app` 폴더로 이동하여 다음 명령을 실행합니다. 
    
```bash
python server.py
```

Alternatively, you can go to this folder `apps/backend/langserve/` and run this command:
```bash
langchain serve
```

이렇게 하면 로컬 호스트 포트 8000에서 백엔드 서버 API가 실행됩니다. 

3) Azure ML 컴퓨팅 인스턴스에서 작업하는 경우 이 주소에서 OpenAPI(Swagger) 정의에 액세스할 수 있습니다. 

    https:\<your_compute_name\>-8000.\<your_region\>.instances.azureml.ms/
    
    for example:
    https://pabmar1-8000.australiaeast.instances.azureml.ms/

## Talk to the API using POST requests

In [1]:
import requests
import json
import sys
import time
import random

### API에서 응답을 게시하고 읽는 기능을 제공합니다. 여기애서는 스트리밍을 지원합니다!!

In [2]:
def process_line(line):
    """Process a single line from the stream."""
    # print("line:",line)
    if line.startswith('data: '):
        # Extract JSON data following 'data: '
        json_data = line[len('data: '):]
        try:
            data = json.loads(json_data)
            if "event" in data:
                handle_event(data)
            elif "content" in data:
                # If there is immediate content to print
                print(data["content"], end="", flush=True)
            elif "steps" in data:
                print(data["steps"])
            elif "output" in data:
                print(data["output"])
        except json.JSONDecodeError as e:
            print(f"JSON decoding error: {e}")
    elif line.startswith('event: '):
        pass
    elif ": ping" in line:
        pass
    else:
        print(line)

def handle_event(event):
    """Handles specific events, adjusting output based on event type."""
    kind = event["event"]
    if kind == "on_chain_start" and event["name"] == "AgentExecutor":
        print(f"Starting agent: {event['name']}")
    elif kind == "on_chain_end" and event["name"] == "AgentExecutor":
        print("\n--")
        print(f"Done agent: {event['name']}")
    elif kind == "on_chat_model_stream":
        content = event["data"]["chunk"]["content"]
        if content:  # Ensure content is not None or empty
            print(content, end="", flush=True)
    elif kind == "on_tool_start":
        # Assuming event['data'].get('input') is a dictionary
        tool_inputs = event['data'].get('input')
        if isinstance(tool_inputs, dict):
            # Joining the dictionary into a string format key: 'value'
            inputs_str = ", ".join(f"'{v}'" for k, v in tool_inputs.items())
        else:
            # Fallback if it's not a dictionary or in an unexpected format
            inputs_str = str(tool_inputs)
        print(f"Starting tool: {event['name']} with input: {inputs_str}")
    elif kind == "on_tool_end":
        print(f"Done tool: {event['name']}\n--")

    
def consume_api(url, payload):
    """Uses requests POST to talkt to the FastAPI backend, supports streaming"""
    
    headers = {'Content-Type': 'application/json'}
    
    with requests.post(url, json=payload, headers=headers, stream=True) as response:
        try:
            response.raise_for_status()  # Raises a HTTPError if the response is not 200
            
            for line in response.iter_lines():
                if line:  # Check if the line is not empty
                    decoded_line = line.decode('utf-8')
                    process_line(decoded_line)
                    
                    
        except requests.exceptions.HTTPError as err:
            print(f"HTTP Error: {err}")
        except Exception as e:
            print(f"An error occurred: {e}")


### Base URL

In [3]:
# base_url = "https://webapp-backend-botid-2znp775rdhyvo-fastapi.azurewebsites.net"  # Note that "-staging" is the Azure App Service slot where the LangServe API was deployed
base_url = "http://localhost:8000" # If you deployed locally

### `/chatgpt/` endpoint

In [4]:
payload = {'input': 'explain long covid in just 2 short sentences'}  # Your POST request payload

In [5]:
# URL of the FastAPI Invoke endpoint
url = base_url + '/chatgpt/invoke'
consume_api(url, payload)

{"output":{"content":"Long COVID refers to a range of symptoms that continue for weeks or months after the acute phase of a SARS-CoV-2 infection has resolved. These symptoms can include fatigue, shortness of breath, cognitive disturbances, and various other health issues that persist and affect daily functioning.","additional_kwargs":{},"response_metadata":{"finish_reason":"stop","logprobs":null,"content_filter_results":{"hate":{"filtered":false,"severity":"safe"},"self_harm":{"filtered":false,"severity":"safe"},"sexual":{"filtered":false,"severity":"safe"},"violence":{"filtered":false,"severity":"safe"}}},"type":"ai","name":null,"id":null,"example":false},"callback_events":[],"metadata":{"run_id":"5d7af483-5958-4812-81b7-39693e2e19cd"}}


In [6]:
# URL of the FastAPI streaming endpoint
url = base_url + '/chatgpt/stream'
consume_api(url, payload)

Long COVID refers to a range of symptoms that continue for weeks or months after the acute phase of a SARS-CoV-2 infection has resolved. These symptoms can include fatigue, shortness of breath, cognitive impairment, and various other health issues that persist and affect daily functioning.

### `/joke` endpoint : chain with custom output

In [7]:
payload = {'input': {"topic": "highschool", "language":"english"}}

url = base_url + '/joke/invoke'

consume_api(url, payload)

{"output":{"content":"Why did the high school teacher go to jail?\n\nBecause he had too many problems with his pupils—they said he couldn't control his class, but he insisted he was just taking \"attendance\" to a new \"cell\" level!","info":{"timestamp":"2024-05-07T23:53:14.754191"}},"callback_events":[],"metadata":{"run_id":"d6236048-3306-49d7-90da-15d4c967db93"}}


In [8]:
# URL of the FastAPI streaming endpoint
url = base_url + '/joke/stream_events'

consume_api(url, payload)

Why did the high school student bring a ladder to class?

Because he wanted to make sure he was in "high" school!

### `/agent` endpoint : our complex smart bot

In [9]:
random_session_id = "session"+ str(random.randint(1, 1000))
ramdom_user_id = "user"+ str(random.randint(1, 1000))

config={"configurable": {"session_id": random_session_id, "user_id": ramdom_user_id}}
print(random_session_id, ramdom_user_id)

session998 user243


In [10]:
payload = {'input': {"question": "Hi, I am Pablo, what is your name?"}, 'config': config}
 
url = base_url + '/agent/invoke'

consume_api(url, payload)

{"output":{"output":"Hello Pablo, my name is Jarvis. How can I assist you today?"},"callback_events":[],"metadata":{"run_id":"476f68fc-0244-4a8c-9fed-3612c25c52af"}}


In [12]:
payload = {'input': {"question": "booksearch, Can I restore my index or service once it's deleted?"}, 'config': config}
 
url = base_url + '/agent/stream_events'

consume_api(url, payload)

Starting agent: AgentExecutor
Starting tool: booksearch with input: 'Can I restore my index or service once it's deleted?'
Done tool: booksearch
--
No, once you delete an Azure AI Search index or service, it cannot be restored. The deletion of a search service is permanent, and all indexes within that service are also permanently deleted<sup><a href="https://blobstorage2znp775rdhyvo.blob.core.windows.net/techdocs/Azure_Cognitive_Search_Documentation_Overview.pdf?sv=2022-11-02&ss=bfqt&srt=sco&sp=rwdlacupiytfx&se=2024-04-29T18:11:18Z&st=2024-03-17T10:11:18Z&spr=https&sig=qtSFdHgO4IxArZIDQZbvcc2T7Q4INFsy7XZiIjOqWE0%3D" target="_blank">[1]</a></sup>. It's important to be certain before deleting any service or index, as this action is irreversible.
--
Done agent: AgentExecutor


In [None]:
payload = {'input': {"question": "bing, give me the current salary of a dental hygenist in texas"}, 'config': config}
 
url = base_url + '/agent/stream_events'

consume_api(url, payload)

In [None]:
payload = {'input': {"question": "docsearch, How Covid affects obese people? and elderly"}, 'config': config}
 
url = base_url + '/agent/stream_events'

consume_api(url, payload)

In [None]:
payload = {'input': {"question": "sqlsearch, how many people were hospitalized in CA?"}, 'config': config}
 
url = base_url + '/agent/stream_events'

consume_api(url, payload)

In [None]:
payload = {'input': {"question": "thank you!"}, 'config': config}
 
url = base_url + '/agent/stream_events'

consume_api(url, payload)

## 이제 모든 엔드포인트와 경로를 langchain 로컬 RemoteRunnable을 사용하여 시도해 보겠습니다.

이 모든 기능은 TypeScript에서도 사용할 수 있으며, LangServe.JS 문서를 참조하세요.

In [20]:
from langchain.schema import SystemMessage, HumanMessage
from langchain.prompts import ChatPromptTemplate
from langchain.schema.runnable import RunnableMap
from langserve import RemoteRunnable

chatgpt_chain = RemoteRunnable(base_url + "/chatgpt/")
joke_chain = RemoteRunnable(base_url + "/joke/")
agent_chain = RemoteRunnable(base_url + "/agent/")


In [None]:
joke_chain.invoke({"topic": "cars", "language":"english"})

In [None]:
# or async
await joke_chain.ainvoke({"topic": "parrots", "language":"spanish"})

In [None]:
prompt = [
    SystemMessage(content='you are a helpful assistant that responds to the user question.'),
    HumanMessage(content='explain long covid')
]

# Supports astream
async for msg in chatgpt_chain.astream(prompt):
    print(msg.content, end="", flush=True)

In [None]:
async for event in agent_chain.astream_events({"question": " booksearch, what is the story about the stolen kidney, and what book is it in?"}, config=config, version="v1"):
    kind = event["event"]
    if kind == "on_chain_start":
        if (event["name"] == "AgentExecutor"):  
            print(f"Starting agent: {event['name']}")
    elif kind == "on_chain_end":
        if (event["name"] == "AgentExecutor"):
            print()
            print("--")
            print(f"Done agent: {event['name']}")
    if kind == "on_chat_model_stream":
        content = event["data"]["chunk"].content
        if content:
            print(content, end="", flush=True)
    elif kind == "on_tool_start":
        print("--")
        print(f"Starting tool: {event['name']} with inputs: {event['data'].get('input')}")
    elif kind == "on_tool_end":
        print(f"Done tool: {event['name']}")
        # print(f"Tool output was: {event['data'].get('output')}")
        print("--")