In [None]:
#| default_exp 03_fastapi_streaming_tutorial

In [None]:
# | export

from functools import lru_cache
from typing import Callable

from fastapi import Depends, FastAPI
from pydantic import BaseModel
from langchain import ConversationChain
from langchain.callbacks import AsyncCallbackManager
from langchain.chat_models import ChatOpenAI

from fastapi_async_langchain.response import LangchainStreamingResponse

In [None]:
# | export

def conversation_chain_dependency() -> Callable[[], ConversationChain]:
    @lru_cache(maxsize=1)
    def dependency() -> ConversationChain:
        return ConversationChain(
            llm=ChatOpenAI(
                temperature=0,
                streaming=True,
                callback_manager=AsyncCallbackManager([]),
            ),
            verbose=True,
        )

    return dependency

conversation_chain = conversation_chain_dependency()

In [None]:
# | export

app = FastAPI(title="StreamingConversationChainAPI")

In [None]:
# | export

class Request(BaseModel):
    query: str

In [None]:
# | export

@app.post("/chat")
async def chat(
    request: Request,
    chain: ConversationChain = Depends(conversation_chain),
) -> LangchainStreamingResponse:
    return LangchainStreamingResponse(
        chain, request.query, media_type="text/event-stream"
    )