# Pipeline of Graph Construction


## Initialize Environment


In [None]:
import logging
import os
import json
import nest_asyncio
from dotenv import load_dotenv
from ogmyrag.my_logging import configure_logger
from ogmyrag.storage import AsyncMongoDBStorage
from ogmyrag.util import update_is_parsed_status_of_reports, fetch_reports_along_with_constraints, get_company_reports
from ogmyrag.base import MongoStorageConfig, Neo4jStorageConfig, PineconeStorageConfig
from ogmyrag.llm import OpenAIAsyncClient
from ogmyrag.graph_construction import GraphConstructionSystem
from motor.motor_asyncio import AsyncIOMotorClient

# Patch event loop to support re-entry in Jupyter
nest_asyncio.apply()

# Setup logging
graph_construction_logger = configure_logger(
    name="graph_construction",
    log_level=logging.DEBUG,
    log_file="logs/graph_construction.log",
)

openai_logger = configure_logger(
    name="openai", log_level=logging.DEBUG, log_file="logs/openai.log", to_console=False
)
mongo_logger = configure_logger(
    name="mongodb",
    log_level=logging.DEBUG,
    log_file="logs/mongodb.log",
    to_console=False,
)
pinecone_logger = configure_logger(
    name="pinecone",
    log_level=logging.DEBUG,
    log_file="logs/pinecone.log",
    to_console=False,
)
neo4j_logger = configure_logger(
    name="neo4j", log_level=logging.DEBUG, log_file="logs/neo4j.log", to_console=False
)

# Load environment variables
load_dotenv(override=True)

mongo_db_uri = os.getenv("MONGO_DB_URI", "")
mongo_db_uri_reports = os.getenv("MONGO_DB_URI_REPORTS","")

openai_api_key = os.getenv("OPENAI_API_KEY", "")

pinecone_entities_api_key = os.getenv("PINECONE_ENTITIES_API_KEY", "")
pinecone_entities_environment = os.getenv("PINECONE_ENTITIES_ENVIRONMENT", "")
pinecone_entities_cloud = os.getenv("PINECONE_ENTITIES_CLOUD", "")
pinecone_entities_metric = os.getenv("PINECONE_ENTITIES_METRIC", "")
pinecone_entities_dimensions = os.getenv("PINECONE_ENTITIES_DIMENSIONS")

pinecone_cache_api_key = os.getenv("PINECONE_CACHE_API_KEY", "")
pinecone_cache_environment = os.getenv("PINECONE_CACHE_ENVIRONMENT", "")
pinecone_cache_cloud = os.getenv("PINECONE_CACHE_CLOUD", "")
pinecone_cache_metric = os.getenv("PINECONE_CACHE_METRIC", "")
pinecone_cache_dimensions = os.getenv("PINECONE_CACHE_DIMENSIONS")

neo4j_uri = os.getenv("NEO4J_URI", "")
neo4j_username = os.getenv("NEO4J_USERNAME", "")
neo4j_password = os.getenv("NEO4J_PASSWORD", "")

if not mongo_db_uri:
    graph_construction_logger.error("Please set the MONGO_DB_URI environment variable.")

if not mongo_db_uri_reports:
    graph_construction_logger.error("Please set the MONGO_DB_URI_REPORTS environment variable.")

if not openai_api_key:
    graph_construction_logger.error(
        "Please set the OPENAI_API_KEY environment variable."
    )

if (
    not pinecone_entities_api_key
    or not pinecone_entities_environment
    or not pinecone_entities_cloud
    or not pinecone_entities_metric
    or not pinecone_entities_dimensions
):
    graph_construction_logger.error(
        "Please set the PINECONE_ENTITIES_API_KEY, PINECONE_ENTITIES_ENVIRONMENT, PINECONE_ENTITIES_CLOUD, PINECONE_ENTITIES_METRIC, and PINECONE_ENTITIES_DIMENSIONS environment variables."
    )
    
if (
    not pinecone_cache_api_key
    or not pinecone_cache_environment
    or not pinecone_cache_cloud
    or not pinecone_cache_metric
    or not pinecone_cache_dimensions
):
    graph_construction_logger.error(        
        "Please set the PINECONE_CACHE_API_KEY, PINECONE_CACHE_ENVIRONMENT, PINECONE_CACHE_CLOUD, PINECONE_CACHE_METRIC, and PINECONE_CACHE_DIMENSIONS environment variables."
    )

if not neo4j_uri or not neo4j_username or not neo4j_password:
    graph_construction_logger.error(
        "Please set the NEO4J_URI, NEO4J_USERNAME, and NEO4J_PASSWORD environment variables."
    )

## Set Up Graph Construction Pipeline


### Initialize Variables for Database Connection


In [None]:
ontology_config: MongoStorageConfig = {
    "database_name": "ogmyrag",
    "collection_name": "ontology",
}

entity_config: MongoStorageConfig = {
    "database_name": "ogmyrag",
    "collection_name": "entities",
}

relationship_config: MongoStorageConfig = {
    "database_name": "ogmyrag",
    "collection_name": "relationships",
}

disclosure_config: MongoStorageConfig = {
    'database_name': 'FYP',
    'collection_name': 'company_disclosures'
}

constraints_config: MongoStorageConfig = {
    'database_name': 'FYP',
    'collection_name': 'constraints'
}

entities_cache_config: MongoStorageConfig = {
    "database_name": "ogmyrag_entities_cache",
}

relationships_cache_config: MongoStorageConfig={
    "database_name": "ogmyrag_relationships_cache",
}

entities_deduplication_pending_tasks_config: MongoStorageConfig = {
    "database_name": "ogmyrag",
    "collection_name": "entities_deduplication_pending_tasks",
}

relationships_deduplication_pending_tasks_config: MongoStorageConfig = {
    "database_name": "ogmyrag",
    "collection_name": "relationships_deduplication_pending_tasks",
}

graphdb_config: Neo4jStorageConfig = {
    "uri": neo4j_uri,
    "user": neo4j_username,
    "password": neo4j_password,
}

entity_vector_config: PineconeStorageConfig = {
    "index_name": "ogmyrag",
    "pinecone_api_key": pinecone_entities_api_key,
    "pinecone_environment": pinecone_entities_environment,
    "pinecone_cloud": pinecone_entities_cloud,
    "pinecone_metric": pinecone_entities_metric,
    "pinecone_dimensions": pinecone_entities_dimensions,
    "openai_api_key": openai_api_key,
}

entity_cache_vector_config: PineconeStorageConfig = {
    "index_name": "ogmyrag-entities-cache",
    "pinecone_api_key": pinecone_cache_api_key,
    "pinecone_environment": pinecone_cache_environment,
    "pinecone_cloud": pinecone_cache_cloud,
    "pinecone_metric": pinecone_cache_metric,
    "pinecone_dimensions": pinecone_cache_dimensions,
    "openai_api_key": openai_api_key,
}

async_mongo_client_reports = AsyncIOMotorClient(
    mongo_db_uri_reports,
    serverSelectionTimeoutMS=5000,
)

async_mongo_client = AsyncIOMotorClient(
    mongo_db_uri,
    serverSelectionTimeoutMS=5000,
)

async_mongo_storage_reports = AsyncMongoDBStorage(client=async_mongo_client_reports)

### Initialize Graph Construction System

In [3]:
try:
    graph_system = GraphConstructionSystem(
        async_mongo_client=async_mongo_client,
        async_mongo_client_reports=async_mongo_client_reports,
        ontology_config=ontology_config,
        disclosure_config=disclosure_config,
        constraints_config=constraints_config,
        entity_config=entity_config,
        relationship_config=relationship_config,
        entity_vector_config=entity_vector_config,
        graphdb_config=graphdb_config,
        entity_cache_config=entities_cache_config,
        relationship_cache_config=relationships_cache_config,
        entity_cache_vector_config=entity_cache_vector_config,
        entities_deduplication_pending_tasks_config=entities_deduplication_pending_tasks_config,
        llm_client=OpenAIAsyncClient(api_key=openai_api_key),
        agent_configs={
            "EntityRelationshipExtractionAgent": {
                "model": "o4-mini",
                "text": {"format": {"type": "text"}},
                "reasoning": {"effort": "medium"},
                "max_output_tokens": 100000,
                "stream": False,
            },
            "EntityDeduplicationAgent": {
                "model": "gpt-5-mini",
                "text": {"format": {"type": "text"}},
                "reasoning": {"effort": "medium"},
                "max_output_tokens": 100000,
                "stream": False,
            },
            "RelationshipDeduplicationAgent": {
                "model": "gpt-5-mini",
                "text": {"format": {"type": "text"}},
                "reasoning": {"effort": "medium"},
                "max_output_tokens": 100000,
                "stream": False,
            },
        },
    )
except Exception as e:
    graph_construction_logger.error(
        f"GraphConstructionSystem\nError while creating graph construction system: {e}"
    )

In [None]:
try:
    node_count = await graph_system.get_entity_count({"status": "UPSERTED_INTO_GRAPH_DB"})
    relationship_count = await graph_system.get_relationship_count(
        {"status": "UPSERTED_INTO_GRAPH_DB"}
    )
    graph_construction_logger.info(
        f"GraphConstructionSystem\nEntity count based on given filter: {node_count}\nRelationship count based on given filter: {relationship_count}"
    )
except Exception as e:
    graph_construction_logger.error(
        f"GraphConstructionSystem\nError while creating graph construction system: {e}"
    )

### Extract Entities and Relationships from the Reports


In [None]:
try:
    await graph_system.extract_entities_relationships_from_unparsed_documents(
        from_company="VETECE_HOLDINGS_BERHAD",
        document_type="PROSPECTUS",
        published_at="08 Aug 2024",
        num_of_relationships_per_onto=6,
        exclude_documents=[],
    )
except Exception as e:
    graph_construction_logger.error(
        f"GraphConstructionSystem\nError while processing entities and relationships from unparsed documents:\n {e}"
    )

### Deduplicate Extracted Entities and Relationships

In [None]:
try:
    await graph_system.deduplicate_entities(
        from_company="VETECE_HOLDINGS_BERHAD",
        num_of_entities_per_batch=8,
        max_cache_size=1000,
        similarity_threshold=0.6,
        num_of_relationships_to_fetch=10,
        max_wait_time_per_task=5
    )
except Exception as e:
    graph_construction_logger.error(
        f"GraphConstructionSystem\nError while deduplicating entities:\n {e}"
    )
    

In [None]:
try:
    await graph_system.deduplicate_relationships(
        from_company="VETECE_HOLDINGS_BERHAD",
        num_of_relationships_per_batch=8,
        max_cache_size=1000,
        max_wait_time_per_task=5
    )
except Exception as e:
    graph_construction_logger.error(
        f"GraphConstructionSystem\nError while deduplicating relationships:\n {e}"
    )

### Upsert the Deduplicated Entities into Pinecone

In [None]:
try:
   await graph_system.upsert_entities_into_pinecone(
      from_company="VETECE_HOLDINGS_BERHAD", batch_size=100
   )
except Exception as e:
    graph_construction_logger.error(f"GraphConstructionSystem\nError:\n {e}")

### Upserted the Deduplicated Entities and Relationships into Neo4j

In [None]:
try:
    await graph_system.upsert_entities_and_relationships_into_neo4j(
        from_company="VETECE_HOLDINGS_BERHAD", batch_size=100
    )
except Exception as e:
    graph_construction_logger.error(f"GraphConstructionSystem\nError:\n {e}")

In [None]:
# try:
#     await graph_system.revert_deduplication_status(
#         from_company="ICT_ZONE_ASIA_BERHAD",
#         from_status="UPSERTED_INTO_GRAPH_DB",
#         to_status="TO_BE_UPSERTED_INTO_GRAPH_DB",
#         collection_to_revert="ENTITIES"
#     )
# except Exception as e:
#     graph_construction_logger.error(f"GraphConstructionSystem\nError:\n {e}")

In [None]:
# try:
#     await graph_system.resolve_entities_deduplication_pending_tasks(
#         from_company="AUTOCOUNT_DOTCOM_BERHAD",
#     )
# except Exception as e:
#     graph_construction_logger.error(f"GraphConstructionSystem\nError:\n {e}")

In [None]:
# result = await graph_system.get_formatted_similar_entities_from_pinecone(
#     query_texts=["what is autocount"],
#     top_k=20,
#     query_filter={"type": {"$eq": "LegalEntity"}},
#     score_threshold=0.0,
# )

# graph_construction_logger.info(result)

In [22]:
# For debugging purpose
updated_result = await update_is_parsed_status_of_reports(
    async_mongo_storage_reports=async_mongo_storage_reports,
    company_disclosures_config=disclosure_config,
    file_names=[
        "CABNET_HOLDINGS_BERHAD_ANNUAL_2024_SECTION_1",
        "CABNET_HOLDINGS_BERHAD_ANNUAL_2024_SECTION_2",
        "CABNET_HOLDINGS_BERHAD_ANNUAL_2024_SECTION_3",
        "CABNET_HOLDINGS_BERHAD_ANNUAL_2024_SECTION_4",
        "CABNET_HOLDINGS_BERHAD_ANNUAL_2024_SECTION_5",
        "CABNET_HOLDINGS_BERHAD_ANNUAL_2024_SECTION_6",
        "CABNET_HOLDINGS_BERHAD_ANNUAL_2024_SECTION_7",
        "CABNET_HOLDINGS_BERHAD_ANNUAL_2024_SECTION_8",
        "CABNET_HOLDINGS_BERHAD_ANNUAL_2024_SECTION_9",
        "CABNET_HOLDINGS_BERHAD_ANNUAL_2024_SECTION_10",
        "CABNET_HOLDINGS_BERHAD_ANNUAL_2024_SECTION_11",
        "CABNET_HOLDINGS_BERHAD_ANNUAL_2024_SECTION_12",
        "CABNET_HOLDINGS_BERHAD_ANNUAL_2024_SECTION_13",
        "CABNET_HOLDINGS_BERHAD_ANNUAL_2024_SECTION_14",
        "CABNET_HOLDINGS_BERHAD_ANNUAL_2024_SECTION_15",
        "CABNET_HOLDINGS_BERHAD_ANNUAL_2024_SECTION_16",
        "CABNET_HOLDINGS_BERHAD_ANNUAL_2024_SECTION_17",
        "CABNET_HOLDINGS_BERHAD_ANNUAL_2024_SECTION_18",
    ],
    updated_is_parsed=False,
)

graph_construction_logger.info(
    f"Updated the 'is_parsed' status of {updated_result.modified_count} documents"
)

2025-09-13 16:59:58,682 - graph_construction - INFO - Updated the 'is_parsed' status of 18 documents


In [48]:
# For debugging purpose
# Fetch the reports to be processed
files_to_process = await get_company_reports(
    async_mongo_storage_reports=async_mongo_storage_reports,
    company_disclosures_config=disclosure_config,
    from_company="EDELTEQ_HOLDINGS_BERHAD",
    type="ANNUAL_REPORT",
    is_parsed=False,
)

graph_construction_logger.info(
    f"Files to be processed:\n{json.dumps(files_to_process,indent=2)}"
)

2025-09-13 18:25:20,861 - graph_construction - INFO - Files to be processed:
[
  {
    "file_name": "EDELTEQ_HOLDINGS_BERHAD_ANNUAL_2024_SECTION_1",
    "is_parsed": false,
    "published_at": "29 Apr 2025"
  },
  {
    "file_name": "EDELTEQ_HOLDINGS_BERHAD_ANNUAL_2024_SECTION_5",
    "is_parsed": false,
    "published_at": "29 Apr 2025"
  },
  {
    "file_name": "EDELTEQ_HOLDINGS_BERHAD_ANNUAL_2024_SECTION_2",
    "is_parsed": false,
    "published_at": "29 Apr 2025"
  },
  {
    "file_name": "EDELTEQ_HOLDINGS_BERHAD_ANNUAL_2024_SECTION_4",
    "is_parsed": false,
    "published_at": "29 Apr 2025"
  },
  {
    "file_name": "EDELTEQ_HOLDINGS_BERHAD_ANNUAL_2024_SECTION_3",
    "is_parsed": false,
    "published_at": "29 Apr 2025"
  },
  {
    "file_name": "EDELTEQ_HOLDINGS_BERHAD_ANNUAL_2023_SECTION_2",
    "is_parsed": false,
    "published_at": "29 Apr 2024"
  },
  {
    "file_name": "EDELTEQ_HOLDINGS_BERHAD_ANNUAL_2023_SECTION_4",
    "is_parsed": false,
    "published_at": "29 Apr 2