In [0]:
# !pip install "git+https://github.com/iAli61/lightrag.git@b80a8bb93618801d66a061e2e143734a82a48a90#egg=lightrag-hku[api]"
# !pip install -r ../../requirement.txt


In [0]:
from dotenv import load_dotenv
load_dotenv()

In [0]:

import os
import asyncio
from flamingo_client import AsyncFlamingoLLMClient
from lightrag import LightRAG, QueryParam
from lightrag.utils import EmbeddingFunc
import numpy as np
from lightrag.kg.shared_storage import initialize_pipeline_status
from sentence_transformers import SentenceTransformer
from lightrag.utils import logger, set_verbose_debug



In [0]:
os.getenv("TENANT_ID")

In [0]:
WORKING_DIR = "./markdown_files"

if not os.path.exists(WORKING_DIR):
    os.mkdir(WORKING_DIR)


In [0]:
async def flamingo_llm_model_func(
    prompt, system_prompt=None, history_messages=[], **kwargs
) -> str:
    client = AsyncFlamingoLLMClient(
        subscription_id=os.getenv("SUBSCRIPTION_ID"),
        base_url=os.getenv("BASE_URL"),
        client_id=os.getenv("CLIENT_ID"),
        client_secret=os.getenv("CLIENT_SECRET"),
        subscription_key=os.getenv("SUBSCRIPTION_KEY"),
        tenant=os.getenv("TENANT_ID"),
    )
    response = await client.chat.completions.create(
        model="llama3",
        messages=[{"role": "user", "content": prompt}],
    )
    
    content = response.choices[0].message.content

    if not content or content.strip() == "":
        logger.error("Received empty content from flamingo API")
        raise InvalidResponseError("Received empty content from flamingo API")

    if r"\u" in content:
        content = safe_unicode_decode(content.encode("utf-8"))
    return content



async def embedding_func(texts: list[str]) -> np.ndarray:
    model = SentenceTransformer("all-MiniLM-L6-v2")
    embeddings = model.encode(texts, convert_to_numpy=True)
    return embeddings


async def get_embedding_dim():
    test_text = ["This is a test sentence."]
    embedding = await embedding_func(test_text)
    embedding_dim = embedding.shape[1]
    return embedding_dim


# function test
async def test_funcs():
    result = await flamingo_llm_model_func("How are you?")
    print("flamingo_llm_model_func: ", result)

    result = await embedding_func(["How are you?"])
    print("flamingo_embedding_func: ", result)


async def initialize_flamingo_rag():
    embedding_dimension = await get_embedding_dim()
    print(f"Detected embedding dimension: {embedding_dimension}")

    rag = LightRAG(
        working_dir=WORKING_DIR,
        llm_model_func=flamingo_llm_model_func,
        embedding_func=EmbeddingFunc(
            embedding_dim=embedding_dimension,
            max_token_size=8192,
            func=embedding_func,
        ),
    )

    await rag.initialize_storages()
    await initialize_pipeline_status()

    return rag


In [0]:
await test_funcs()

In [0]:
# Setup logging
logger.setLevel("DEBUG")
set_verbose_debug(True)

# Initialize RAG instance
rag = await initialize_flamingo_rag()

with open("./markdown_files/20241119Placing Slip.md", "r", encoding="utf-8") as f:
    await rag.ainsert(f.read())

# Perform naive search
print(
    await rag.aquery(
        "What are the top themes in this story?", param=QueryParam(mode="naive")
    )
)

# Perform local search
print(
    await rag.aquery(
        "What are the top themes in this story?", param=QueryParam(mode="local")
    )
)

# Perform global search
print(
    await rag.aquery(
        "What are the top themes in this story?",
        param=QueryParam(mode="global"),
    )
)

# Perform hybrid search
print(
    await rag.aquery(
        "What are the top themes in this story?",
        param=QueryParam(mode="hybrid"),
    )
)