In [None]:
# | default_exp chat_generator

In [None]:

# | export


from pathlib import Path
from typing import *
from os import environ
import random
import logging
import time

from fastapi import APIRouter
from pydantic import BaseModel

from llama_index import GPTSimpleVectorIndex, SimpleDirectoryReader, LLMPredictor, ServiceContext
from llama_index.readers.schema.base import Document
from langchain.chat_models import ChatOpenAI


In [None]:
import shutil
from contextlib import contextmanager
import unittest.mock
from tempfile import TemporaryDirectory


import pytest

In [None]:
# | export

def load_document_from_directory(directory_path: str) -> List[Document]:
    documents = SimpleDirectoryReader(directory_path).load_data()
    return documents

In [None]:
with TemporaryDirectory() as d:
    data_path = Path(d) / "data"
    data_path.mkdir(parents=True)
    
    shutil.copyfile(
        Path("..") / "data" / "data.txt", data_path / "data.txt"
    )
    
    documents = load_document_from_directory(str(data_path))
    print(documents)
#     index = GPTSimpleVectorIndex.from_documents(documents)
#     response = index.query("What is Fastkafka?")
#     print(response)



In [None]:
# | export


def _get_response_from_model(user_query: str, root_path: str = ".") -> str:
    # LLM Predictor (gpt-3.5-turbo) + service context
    llm_predictor = LLMPredictor(
        llm=ChatOpenAI(temperature=0, model_name="gpt-3.5-turbo")
    )
    service_context = ServiceContext.from_defaults(
        llm_predictor=llm_predictor, chunk_size_limit=512
    )
    
    documents = load_document_from_directory(f"{root_path}/data/")
    index = GPTSimpleVectorIndex.from_documents(
        documents, service_context=service_context
    )
    response = index.query(
        user_query,
        service_context=service_context,
        similarity_top_k=3,
    )
    return response

In [None]:
with TemporaryDirectory() as d:
    data_path = Path(d) / "data"
    data_path.mkdir(parents=True)
    
    shutil.copyfile(
        Path("..") / "data" / "data.txt", data_path / "data.txt"
    )
    user_query = "How to consume Fastkafka messages? Give me an example?"
    response = _get_response_from_model(user_query=user_query, root_path=d)
    print(response)





INFO:llama_index.token_counter.token_counter:> [build_index_from_nodes] Total LLM token usage: 0 tokens
INFO:llama_index.token_counter.token_counter:> [build_index_from_nodes] Total embedding token usage: 3196 tokens
Token indices sequence length is longer than the specified maximum sequence length for this model (1061 > 1024). Running this sequence through the model will result in indexing errors
INFO:llama_index.token_counter.token_counter:> [query] Total LLM token usage: 3398 tokens
INFO:llama_index.token_counter.token_counter:> [query] Total embedding token usage: 14 tokens


To consume FastKafka messages, you can use the @consumes decorator to define a function that consumes messages from a Kafka topic. For example, if you have a Kafka topic called "hello_world", you can consume messages from this topic using the following code:

```
from fastkafka import FastKafka
from pydantic import BaseModel, Field

class HelloWorld(BaseModel):
    msg: str = Field(
        ...,
        example="Hello",
        description="Demo hello world message",
    )

kafka_brokers = {
    "demo_broker": {
        "url": "<url_of_your_kafka_bootstrap_server>",
        "description": "local demo kafka broker",
        "port": "<port_of_your_kafka_bootstrap_server>",
    }
}

app = FastKafka(kafka_brokers=kafka_brokers)

from fastkafka._components.logger import get_logger
logger = get_logger(__name__)

@app.consumes()
async def on_hello_world(msg: HelloWorld):
    logger.info(f"Got msg: {msg}")

```

In this example, the `@consumes()` decorator defines the function `on_hello_world`

In [None]:
# | export

router = APIRouter()


In [None]:
# | export

class GenerateChatRequest(BaseModel):
    user_query: str

In [None]:
# | export


@router.post("/")
def generate_chat_response(
    generate_chat_response_request: GenerateChatRequest,
) -> str:
    model_response = _get_response_from_model(generate_chat_response_request.user_query)
    return model_response.response

In [None]:
with TemporaryDirectory() as d:
    data_path = Path(d) / "data"
    data_path.mkdir(parents=True)
    
    shutil.copyfile(
        Path("..") / "data" / "data.txt", data_path / "data.txt"
    )
    user_query = "How to consume Fastkafka messages? Give me an example?"
    generate_chat_response_request = GenerateChatRequest(
        user_query=user_query, documents=documents
    )
    actual = generate_chat_response(generate_chat_response_request)
    print(actual)

INFO:llama_index.token_counter.token_counter:> [build_index_from_nodes] Total LLM token usage: 0 tokens
INFO:llama_index.token_counter.token_counter:> [build_index_from_nodes] Total embedding token usage: 0 tokens


[]


INFO:llama_index.token_counter.token_counter:> [query] Total LLM token usage: 0 tokens
INFO:llama_index.token_counter.token_counter:> [query] Total embedding token usage: 14 tokens


None
