In [None]:
from datetime import datetime
import sqlparse
from functools import lru_cache
from typing import List, Dict, Tuple, Any
import duckdb
from fastapi import FastAPI
from duckdb import DuckDBPyConnection as DuckCall
from typing import List, Union, Dict, Any, AnyStr
from langchain_openai import ChatOpenAI
from langchain_core.pydantic_v1 import BaseModel as ModelResponseObject
from pydantic import BaseModel


class UserLLMRequest(BaseModel):
    input_schema: AnyStr


class ILLMReponse(BaseModel):
    original_prompt: Dict[str, str]
    response: List[Union[Dict[str, Any], None]]


class LLMInterface(object): ...


class OpenAILLMInterface(ChatOpenAI):
    inline_prompt: str
    role_prompt: str


class OneHotEncode(BaseModel): ...


class ModelReponse(ModelResponseObject):
    original_prompt: OpenAILLMInterface
    response: List[Union[Dict[str, Any], None]]


def encoder(_: OneHotEncode) -> ...: ...


v1 = FastAPI()
_init_cache: Dict = dict()
duck_call = duckdb.connect(database=":memory:", read_only=False)


@v1.get("/v1/query")
def invoke_duck_call(query: str) -> List[Any]:
    return preload_exec(duck_call, query)


@v1.post("/v1/experimental/llm", response_model=ILLMReponse)
async def llm_interaction(request: UserLLMRequest) -> ILLMReponse:
    model_interface = OpenAILLMInterface()
    user_request_appendment = request
    contractual_model = model_interface.with_structured_output(ModelReponse)
    llm_response = contractual_model.invoke(user_request_appendment)
    return ILLMReponse(
        original_prompt=llm_response.original_prompt, response=llm_response.response
    )


# @on_start
# def _on_start() -> DuckCall:
#     return duckdb.connect(database=":memory:", read_only=False)

class OneHotEncodingHandler(object):
    @staticmethod
    @v1.post("/v1/experimental/onehot", response_model=...)
    def onehot_encode(data: Any) -> ...:
        # Implement one-hot encoding logic here
        pass


class SubmitTokensHandler(object):
    @staticmethod
    @v1.post("/v1/experimental/submit_tokens", response_model=ILLMReponse)
    async def submit_tokens(request: UserLLMRequest) -> ILLMReponse:
        model_interface = OpenAILLMInterface()
        user_request_appendment = request
        contractual_model = model_interface.with_structured_output(ModelReponse)
        llm_response = contractual_model.invoke(user_request_appendment)
        return ILLMReponse(
            original_prompt=llm_response.original_prompt, response=llm_response.response
        )
    

# @lru_cache(maxsize=480)
def _cache(tables: List[str]) -> List[str]:
    rt: List = list()
    tables = [rt.append(t) for t in tables if not _init_cache.get(t, None)]
    for t in tables:
        if not _init_cache.get(t, None):
            rt.append(t)
        _init_cache[t] = datetime.now()
    return rt


def find_tables(query: str) -> List[str]:
    tables = list()
    parsed = sqlparse.parse(query)[0]
    for t in parsed.tokens:
        if isinstance(t, sqlparse.sql.Identifier):
            tables.append(t.value)
    return tables


def preload_tables(connection: DuckCall, tables: List[str]) -> List[bool]:
    cached = list()
    for t in tables:
        q = f"create table {t} as select * from read_parquet('{t});"
        try:
            connection.execute(q).fetchall()
            cached.append(False)
        except:
            cached.append(True)
    return cached


def preload_exec(connection: DuckCall, query: str) -> List[Tuple]:
    tables = find_tables(query)
    tables = _cache(tables)
    _ = preload_tables(connection, tables)
    return connection.execute(query).fetchall()


if __name__ == "__main__":
    import uvicorn

    uvicorn.run(v1, host="127.0.0.1", port=8000, log_level="info")
