In [None]:
import logging
import os
import json
from pathlib import Path

from llm.factory import LLMInterface
from llm.embedding import get_text_embedding
from knowledge_graph.knowledge import KnowledgeBuilder
from knowledge_graph.graph_builder import KnowledgeGraphBuilder

# Initialize logging
logging.basicConfig(
    level=logging.INFO,
    format="[%(asctime)s] %(levelname)s - %(filename)s:%(lineno)d: %(message)s",
)

logger = logging.getLogger(__name__)


In [2]:
llm_client = LLMInterface("openai_like", "qwen3:32b-fp16")
kb_builder = KnowledgeBuilder(llm_client, get_text_embedding)
graph_builder = KnowledgeGraphBuilder(llm_client, get_text_embedding)

# Define memory files and metadata
memory_files = [
    {
        "path": "docs/memory/2024.md",
        "doc_link": "memory://liming/wechat_moments/2024",
        "topic_name": "Li Ming Personal Memory Timeline",
        "year": "2024",
        "author": "Li Ming",
        "content_type": "WeChat Moments",
        "platform": "WeChat",
        "language": "Chinese",
    },
    {
        "path": "docs/memory/2025.md",
        "doc_link": "memory://liming/wechat_moments/2025",
        "topic_name": "Li Ming Personal Memory Timeline",
        "year": "2025",
        "author": "Li Ming",
        "content_type": "WeChat Moments",
        "platform": "WeChat",
        "language": "Chinese",
    },
]

topic_name = "Li Ming Personal Memory Timeline"

In [None]:
logger.info("=== Step 1: Extracting knowledge from memory files ===")

topic_docs = {}
for file_info in memory_files:
    file_path = file_info["path"]

    # Check if file exists
    if not os.path.exists(file_path):
        logger.error(f"File not found: {file_path}")
        continue

    try:
        # Create custom metadata with temporal and context information
        custom_metadata = {
            "year": file_info["year"],
            "author": file_info["author"],
            "content_type": file_info["content_type"],
            "platform": file_info["platform"],
            "language": file_info["language"],
            "source_type": "personal_memory",
            "data_category": "social_media_posts",
        }

        # Prepare attributes for knowledge extraction
        attributes = {
            "doc_link": file_info["doc_link"],
            "topic_name": file_info["topic_name"],
            **custom_metadata,  # Merge custom metadata
        }

        logger.info(f"Processing file: {file_path}")
        logger.info(
            f"Using attributes: {json.dumps(attributes, indent=2, ensure_ascii=False)}"
        )

        # Extract knowledge using the attributes
        res = kb_builder.extract_knowledge(file_path, attributes)

        if res["status"] == "success":
            topic_docs[res["source_id"]] = {
                "source_id": res["source_id"],
                "source_name": res["source_name"],
                "source_content": res["source_content"],
                "source_link": res["source_link"],
                "source_attributes": res["source_attributes"],
            }
            logger.info(
                f"Successfully processed: {file_path} (source_id: {res['source_id']})"
            )
        else:
            logger.error(f"Failed to process {file_path}: {res['error']}")

    except Exception as e:
        logger.error(f"Error processing {file_path}: {e}", exc_info=True)

if not topic_docs:
    logger.error("No documents were successfully processed. Exiting.")

logger.info(f"Successfully processed {len(topic_docs)} memory documents")
topic_docs

In [None]:
# Step 2: Build knowledge graph
logger.info("\n=== Step 2: Building memory knowledge graph ===")

try:
    result = graph_builder.build_knowledge_graph(
        topic_name,
        list(topic_docs.values()),
    )

    logger.info("\n=== Memory Knowledge Graph Construction Results ===")
    logger.info(f"Topic: {result['topic_name']}")
    logger.info(f"Documents processed: {result['documents_processed']}")
    logger.info(f"Documents failed: {result['documents_failed']}")
    logger.info(f"Cognitive maps generated: {result['cognitive_maps_generated']}")
    logger.info(f"Triplets extracted: {result['triplets_extracted']}")
    logger.info(f"Total entities created: {result['entities_created']}")
    logger.info(f"Total relationships created: {result['relationships_created']}")

    # Print global blueprint information
    blueprint_info = result.get("global_blueprint", {})
    logger.info(f"\nGlobal Blueprint:")
    logger.info(
        f"  - Processing instructions: {blueprint_info.get('processing_instructions', '')}"
    )
    logger.info(
        f"  - Processing items: {blueprint_info.get('processing_items', {})}"
    )

    logger.info("\n🎉 Memory knowledge graph construction completed successfully!")

except Exception as e:
    logger.error(f"Failed to build knowledge graph: {e}", exc_info=True)
    raise

In [None]:
# Step 2: Build knowledge graph
logger.info("\n=== Step 2: Building memory knowledge graph ===")

try:
    result = graph_builder.enhance_knowledge_graph(
        topic_name,
        list(topic_docs.values()),
    )

except Exception as e:
    logger.error(f"Failed to ehance knowledge graph: {e}", exc_info=True)
    raise

result