In [None]:
from docling.document_converter import DocumentConverter
import os
import asyncio
import inspect
import logging
import logging.config
from lightrag import LightRAG, QueryParam
from lightrag.llm.openai import openai_complete_if_cache, openai_embed
from lightrag.utils import EmbeddingFunc, logger, set_verbose_debug
import numpy as np
from lightrag.kg.shared_storage import initialize_pipeline_status


In [None]:
#Extracting Information from PDF Files and Storing it in txt Files
path = './examples/Apple/'

file_list = os.listdir(path)

converter = DocumentConverter()

i = 1

for file in file_list:
    file_path = path + file
    response = converter.convert(file_path)
    markdown_content = response.document.export_to_text()
    output_file = file_path[:-3] + 'txt'
    with open(output_file, 'w', encoding='utf-8') as f:
        f.write(markdown_content)

In [None]:
#Initialize log
def configure_logging():
    """Configure logging for the application"""

    # Reset any existing handlers to ensure clean configuration
    for logger_name in ["uvicorn", "uvicorn.access", "uvicorn.error", "lightrag"]:
        logger_instance = logging.getLogger(logger_name)
        logger_instance.handlers = []
        logger_instance.filters = []

    # Get log directory path from environment variable or use current directory
    log_dir = os.getenv("LOG_DIR", os.getcwd())
    log_file_path = os.path.abspath(
        os.path.join(log_dir, "lightrag_compatible_demo.log")
    )

    print(f"\nLightRAG compatible demo log file: {log_file_path}\n")
    os.makedirs(os.path.dirname(log_dir), exist_ok=True)

    # Get log file max size and backup count from environment variables
    log_max_bytes = int(os.getenv("LOG_MAX_BYTES", 10485760))  # Default 10MB
    log_backup_count = int(os.getenv("LOG_BACKUP_COUNT", 5))  # Default 5 backups

    logging.config.dictConfig(
        {
            "version": 1,
            "disable_existing_loggers": False,
            "formatters": {
                "default": {
                    "format": "%(levelname)s: %(message)s",
                },
                "detailed": {
                    "format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
                },
            },
            "handlers": {
                "console": {
                    "formatter": "default",
                    "class": "logging.StreamHandler",
                    "stream": "ext://sys.stderr",
                },
                "file": {
                    "formatter": "detailed",
                    "class": "logging.handlers.RotatingFileHandler",
                    "filename": log_file_path,
                    "maxBytes": log_max_bytes,
                    "backupCount": log_backup_count,
                    "encoding": "utf-8",
                },
            },
            "loggers": {
                "lightrag": {
                    "handlers": ["console", "file"],
                    "level": "INFO",
                    "propagate": False,
                },
            },
        }
    )

    # Set the logger level to INFO
    logger.setLevel(logging.INFO)
    # Enable verbose debug if needed
    set_verbose_debug(os.getenv("VERBOSE_DEBUG", "false").lower() == "true")

In [None]:
#make work dir
WORKING_DIR = './dickens'

if not os.path.exists(WORKING_DIR):
    os.mkdir(WORKING_DIR)

In [None]:
#define llm_function and embedding function
async def llm_model_func(
    prompt, system_prompt=None, history_messages=[], keyword_extraction=False, **kwargs
) -> str:
    return await openai_complete_if_cache(
        "gpt-4o",
        prompt,
        system_prompt=system_prompt,
        history_messages=history_messages,
        api_key="sk-6tzcdEtUHf3ip3DH8dFa565e8b0f4b52BeEb8578B104D336",
        base_url="http://oneapi.waidev.cc/v1",
        **kwargs,
    )


async def embedding_func(texts: list[str]) -> np.ndarray:
    return await openai_embed(
        texts=texts,
        model="text-embedding-ada-002",
        base_url="http://oneapi.waidev.cc/v1",
        api_key="sk-sTMmF6HjOoQPwBUUCa3aA6C5C7844d189eAb5bAa5bA3CcFc",
    )


async def get_embedding_dim():
    test_text = ["This is a test sentence."]
    embedding = await embedding_func(test_text)
    embedding_dim = embedding.shape[1]
    return embedding_dim

In [None]:
#rag initialize
async def print_stream(stream):
    async for chunk in stream:
        if chunk:
            print(chunk, end="", flush=True)


async def initialize_rag():
    embedding_dimension = await get_embedding_dim()
    print(f"Detected embedding dimension: {embedding_dimension}")

    rag = LightRAG(
        working_dir=WORKING_DIR,
        llm_model_func=llm_model_func,
        embedding_func=EmbeddingFunc(
            embedding_dim=embedding_dimension,
            max_token_size=8192,
            func=embedding_func,
        ),
    )

    await rag.initialize_storages()
    await initialize_pipeline_status()

    return rag

In [None]:
async def main():
    try:
        # Initialize RAG instance
        rag = await initialize_rag()

        txt_list = []
        for file in os.listdir("E:/LightRAG-main/examples/Apple"):
            if file[-3:] == "txt":
                file = "E:/LightRAG-main/examples/Apple/" + file
                txt = open(file, "r", encoding="utf-8").read()
                txt_list.append(txt)

        #with open("./book1.txt", "r", encoding="utf-8") as f:
            #await rag.ainsert(f.read())
        await rag.ainsert(txt_list)


        # Perform naive search
        print("\n=====================")
        print("Query mode: naive")
        print("=====================")
        resp = await rag.aquery(
            " Dissect Apple’s gross margin by product category and explain how shifts in product mix impact margin volatility. What trend do you forecast for the next 3 years?",
            param=QueryParam(mode="naive", stream=True),
        )
        if inspect.isasyncgen(resp):
            await print_stream(resp)
        else:
            print(resp)

        print("\n=====================")
        print("Query mode: naive")
        print("=====================")
        resp = await rag.aquery(
            " Dissect Apple’s gross margin by product category and explain how shifts in product mix impact margin volatility. What trend do you forecast for the next 3 years?",
            param=QueryParam(mode="naive", stream=True),
        )
        if inspect.isasyncgen(resp):
            await print_stream(resp)
        else:
            print(resp)

        print("\n=====================")
        print("Query mode: local")
        print("=====================")
        resp = await rag.aquery(
            " Dissect Apple’s gross margin by product category and explain how shifts in product mix impact margin volatility. What trend do you forecast for the next 3 years?",
            param=QueryParam(mode="local", stream=True),
        )
        if inspect.isasyncgen(resp):
            await print_stream(resp)
        else:
            print(resp)

        print("\n=====================")
        print("Query mode: global")
        print("=====================")
        resp = await rag.aquery(
            " Dissect Apple’s gross margin by product category and explain how shifts in product mix impact margin volatility. What trend do you forecast for the next 3 years?",
            param=QueryParam(mode="global", stream=True),
        )
        if inspect.isasyncgen(resp):
            await print_stream(resp)
        else:
            print(resp)

        # Perform local search
        print("\n=====================")
        print("Query mode: hybrid")
        print("=====================")
        resp = await rag.aquery(
            "How do Apple's capital return programs (buybacks and dividends) influence its valuation multiples relative to peers?",
            param=QueryParam(mode="hybrid", stream=True),
        )
        if inspect.isasyncgen(resp):
            await print_stream(resp)
        else:
            print(resp)

        # Perform global search
        print("\n=====================")
        print("Query mode: local")
        print("=====================")
        resp = await rag.aquery(
            "What is the underlying reason behind Apple's relatively low R&D spending as a percentage of revenue compared to other Big Tech firms, and what does that imply about its capital allocation strategy?",
            param=QueryParam(mode="local", stream=True),
        )
        if inspect.isasyncgen(resp):
            await print_stream(resp)
        else:
            print(resp)

        # Perform global search
        print("\n=====================")
        print("Query mode: global")
        print("=====================")
        resp = await rag.aquery(
            "What is the underlying reason behind Apple's relatively low R&D spending as a percentage of revenue compared to other Big Tech firms, and what does that imply about its capital allocation strategy?",
            param=QueryParam(mode="global", stream=True),
        )
        if inspect.isasyncgen(resp):
            await print_stream(resp)
        else:
            print(resp)

        # Perform global search
        print("\n=====================")
        print("Query mode: naive")
        print("=====================")
        resp = await rag.aquery(
            "What is the underlying reason behind Apple's relatively low R&D spending as a percentage of revenue compared to other Big Tech firms, and what does that imply about its capital allocation strategy?",
            param=QueryParam(mode="naive", stream=True),
        )
        if inspect.isasyncgen(resp):
            await print_stream(resp)
        else:
            print(resp)

        # Perform global search
        print("\n=====================")
        print("Query mode: hybrid")
        print("=====================")
        resp = await rag.aquery(
            "What is the underlying reason behind Apple's relatively low R&D spending as a percentage of revenue compared to other Big Tech firms, and what does that imply about its capital allocation strategy?",
            param=QueryParam(mode="hybrid", stream=True),
        )
        if inspect.isasyncgen(resp):
            await print_stream(resp)
        else:
            print(resp)

        # Perform hybrid search
        print("\n=====================")
        print("Query mode: hybrid")
        print("=====================")
        resp = await rag.aquery(
            "How do you value Apple’s Services segment independently of its Hardware business? What multiple would you assign and why?",
            param=QueryParam(mode="hybrid", stream=True),
        )
        if inspect.isasyncgen(resp):
            await print_stream(resp)
        else:
            print(resp)

        print("\n=====================")
        print("Query mode: local")
        print("=====================")
        resp = await rag.aquery(
            "How do you value Apple’s Services segment independently of its Hardware business? What multiple would you assign and why?",
            param=QueryParam(mode="local", stream=True),
        )
        if inspect.isasyncgen(resp):
            await print_stream(resp)
        else:
            print(resp)

        print("\n=====================")
        print("Query mode: global")
        print("=====================")
        resp = await rag.aquery(
            "How do you value Apple’s Services segment independently of its Hardware business? What multiple would you assign and why?",
            param=QueryParam(mode="global", stream=True),
        )
        if inspect.isasyncgen(resp):
            await print_stream(resp)
        else:
            print(resp)

        print("\n=====================")
        print("Query mode: naive")
        print("=====================")
        resp = await rag.aquery(
            "How do you value Apple’s Services segment independently of its Hardware business? What multiple would you assign and why?",
            param=QueryParam(mode="naive", stream=True),
        )
        if inspect.isasyncgen(resp):
            await print_stream(resp)
        else:
            print(resp)

        print("\n=====================")
        print("Query mode: hybrid")
        print("=====================")
        resp = await rag.aquery(
            "Can you explain how Apple’s deferred revenue from Services and Devices (e.g., AppleCare, subscriptions) distorts its GAAP financials and how to adjust for that?",
            param=QueryParam(mode="hybrid", stream=True),
        )
        if inspect.isasyncgen(resp):
            await print_stream(resp)
        else:
            print(resp)

        print("\n=====================")
        print("Query mode: hybrid")
        print("=====================")
        resp = await rag.aquery(
            "Can you explain how Apple’s deferred revenue from Services and Devices (e.g., AppleCare, subscriptions) distorts its GAAP financials and how to adjust for that?",
            param=QueryParam(mode="hybrid", stream=True),
        )
        if inspect.isasyncgen(resp):
            await print_stream(resp)
        else:
            print(resp)

        print("\n=====================")
        print("Query mode: global")
        print("=====================")
        resp = await rag.aquery(
            "Can you explain how Apple’s deferred revenue from Services and Devices (e.g., AppleCare, subscriptions) distorts its GAAP financials and how to adjust for that?",
            param=QueryParam(mode="global", stream=True),
        )
        if inspect.isasyncgen(resp):
            await print_stream(resp)
        else:
            print(resp)

        print("\n=====================")
        print("Query mode: naive")
        print("=====================")
        resp = await rag.aquery(
            "Can you explain how Apple’s deferred revenue from Services and Devices (e.g., AppleCare, subscriptions) distorts its GAAP financials and how to adjust for that?",
            param=QueryParam(mode="naive", stream=True),
        )
        if inspect.isasyncgen(resp):
            await print_stream(resp)
        else:
            print(resp)

        print("\n=====================")
        print("Query mode: local")
        print("=====================")
        resp = await rag.aquery(
            "Can you explain how Apple’s deferred revenue from Services and Devices (e.g., AppleCare, subscriptions) distorts its GAAP financials and how to adjust for that?",
            param=QueryParam(mode="local", stream=True),
        )
        if inspect.isasyncgen(resp):
            await print_stream(resp)
        else:
            print(resp)

    except Exception as e:
        print(f"An error occurred: {e}")
    finally:
        if rag:
            await rag.finalize_storages()

In [None]:
if __name__ == "__main__":
    # Configure logging before running the main function
    configure_logging()
    asyncio.run(main())
    print("\nDone!")