In [168]:
import os
import time
import neo4j
import dotenv
import pprint
import typing

from neo4j_graphrag import types
from neo4j_graphrag import retrievers
from langchain_core import documents
from langchain_huggingface import embeddings

dotenv.load_dotenv(".env")

True

In [None]:
embedder = embeddings.HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")

In [3]:

def dict_to_yaml_str(input_dict: typing.Dict, indent: int = 0) -> str:
    """
    Convert a dictionary to a YAML-like string without using external libraries.

    Parameters:
    - input_dict (dict): The dictionary to convert.
    - indent (int): The current indentation level.

    Returns:
    - str: The YAML-like string representation of the input dictionary.
    """
    yaml_str = ""
    for key, value in input_dict.items():
        padding = "  " * indent
        if isinstance(value, dict):
            yaml_str += f"{padding}{key}:\n{dict_to_yaml_str(value, indent + 1)}"
        elif isinstance(value, list):
            yaml_str += f"{padding}{key}:\n"
            for item in value:
                yaml_str += f"{padding}- {item}\n"
        else:
            yaml_str += f"{padding}{key}: {value}\n"
    return yaml_str

In [191]:
URI = os.environ["DATABASE_HOST"]
DATABASE = os.environ["DATABASE_SMALL"]
AUTH = (os.environ["DATABASE_USERNAME"], os.environ["DATABASE_PASSWORD"])

ARTICLE_VECTOR_INDEX_NAME = "effective_vector_index"
ARTICLE_FULLTEXT_INDEX_NAME = "effective_fulltext_index"

DEFINITION_VECTOR_INDEX_NAME = "definition_vector_index"
DEFINITION_FULLTEXT_INDEX_NAME = "definition_fulltext_index"

ARTICLE_RETRIEVAL_QUERY_1 = """
// Inisialisasi node hasil pencarian awal
WITH collect({node: node, score: score}) AS init_nodes_data
UNWIND init_nodes_data AS item

// Temukan node terkait
OPTIONAL MATCH (:Effective {id: item.node.id})-[:RELATED_TO|REFER_TO]-(related_node)
WITH init_nodes_data, collect(DISTINCT related_node) AS all_related_nodes
WITH init_nodes_data, [
     node IN all_related_nodes
     WHERE NOT node IN [item IN init_nodes_data | item.node.id]
] AS related_nodes

// Hitung skor kemiripan vektor untuk node terkait
UNWIND related_nodes AS candidate_node
WITH init_nodes_data, candidate_node
WITH init_nodes_data, collect({
    node: candidate_node,
    score: vector.similarity.cosine($query_vector, candidate_node.embedding)
}) AS related_nodes_data

// Gabungkan hasil awal dan hasil pencarian node terkait
WITH init_nodes_data + related_nodes_data AS all_nodes_data
UNWIND all_nodes_data AS data
ORDER BY data.score DESC
LIMIT $limit

// Kembalikan hasil akhir
RETURN data.node.text AS text, {
    id: data.node.id,
    type: "Article",
    source: data.node.source,
    score: data.score
} AS metadata
"""

ARTICLE_RETRIEVAL_QUERY_2 = """
WITH node, score
WHERE NOT node.id IN $excluded_ids

RETURN node.text AS text, {
    id: node.id,
    type: "Article",
    source: node.source,
    score: score
} AS metadata

ORDER BY score DESC
LIMIT $limit
"""

DEFINITION_RETRIEVAL_QUERY = """
RETURN node.text AS text, {
    id: node.id,
    type: "Definition",
    source: node.source,
    score: score
} AS metadata

ORDER BY score DESC
LIMIT $limit
"""


def retriever_result_formatter(record: neo4j.Record) -> types.RetrieverResultItem:
    return types.RetrieverResultItem(
        content=record["text"],
        metadata={
            k: v for k, v in record["metadata"].items() if v is not None
        }
    )


def tool_result_formatter(items: list[types.RetrieverResultItem]) -> documents.Document:
    docs = [
        (
            documents.Document(
                page_content=item.content,
                metadata={
                    k: v for k, v in item.metadata.items() if k != "score"
                },
            ),
            item.metadata["score"],
        )
        for item in items
    ]
    return docs


driver = neo4j.GraphDatabase.driver(URI, auth=AUTH)

article_retriever_1 = retrievers.HybridCypherRetriever(
    driver=driver,
    vector_index_name=ARTICLE_VECTOR_INDEX_NAME,
    fulltext_index_name=ARTICLE_FULLTEXT_INDEX_NAME,
    retrieval_query=ARTICLE_RETRIEVAL_QUERY_1,
    embedder=embedder,
    result_formatter=retriever_result_formatter,
    neo4j_database=DATABASE,
)

article_retriever_2 = retrievers.HybridCypherRetriever(
    driver=driver,
    vector_index_name=ARTICLE_VECTOR_INDEX_NAME,
    fulltext_index_name=ARTICLE_FULLTEXT_INDEX_NAME,
    retrieval_query=ARTICLE_RETRIEVAL_QUERY_2,
    embedder=embedder,
    result_formatter=retriever_result_formatter,
    neo4j_database=DATABASE,
)

definition_retriever = retrievers.VectorCypherRetriever(
    driver=driver,
    index_name=DEFINITION_VECTOR_INDEX_NAME,
    retrieval_query=DEFINITION_RETRIEVAL_QUERY,
    embedder=embedder,
    result_formatter=retriever_result_formatter,
    neo4j_database=DATABASE,
)


def search(
    query_text: str,
    top_k_initial_article: int = 5,
    article_limit: int = 15,
    definition_limit: int = 5
):
    
    article_result = article_retriever_1.search(
        query_text=query_text,
        top_k=top_k_initial_article,
        query_params={"limit": article_limit}
    )

    print(len(article_result.items))

    if len(article_result.items) < article_limit:
        additional_article_result = article_retriever_2.search(
            query_text=query_text,
            top_k=top_k_initial_article + article_limit,
            query_params={
                "excluded_ids": [item.metadata["id"] for item in article_result.items],
                "limit": article_limit - len(article_result.items)
            }
        )

        for item in additional_article_result.items:
            article_result.items.append(item)
    
        sorted_items = sorted(article_result.items, key=lambda item: item.metadata["score"], reverse=True)
        article_result.items = sorted_items
    
    print(len(article_result.items))
    
    definition_result = definition_retriever.search(
        query_text=query_text,
        top_k=definition_limit,
        query_params={"limit": definition_limit}
    )

    docs = tool_result_formatter(article_result.items + definition_result.items)

    print(len(docs))
    
    return docs
    

In [192]:
start = time.time()

query_text = "Apa kewajiban dari penyelenggara sistem elektronik?"
result = search(query_text=query_text, top_k_initial_article=5, article_limit=15, definition_limit=5)

end = time.time()

12
15
20


In [193]:
print((end - start) * 1000)
result

159.53302383422852


[(Document(metadata={'id': 201902071501200, 'source': 'PP No. 71 Tahun 2019 Pasal 12', 'type': 'Article'}, page_content='Peraturan Pemerintah (PP) Nomor 71 Tahun 2019 tentang Penyelenggaraan Sistem dan Transaksi Elektronik, BAB II - PENYELENGGARAAN SISTEM ELEKTRONIK, Bagian Keenam - Tata Kelola Sistem Elektronik, Pasal 12:\nPenyelenggara Sistem Elektronik harus menerapkan manajemen risiko terhadap kerusakan atau kerugian yang ditimbulkan.'),
  1.0),
 (Document(metadata={'id': 200801011503000, 'source': 'UU No. 11 Tahun 2008 Pasal 30', 'type': 'Article'}, page_content='Undang-undang (UU) Nomor 11 Tahun 2008 tentang Informasi dan Transaksi Elektronik, BAB VII - PERBUATAN YANG DILARANG, Pasal 30:\n(1) Setiap Orang dengan sengaja dan tanpa hak atau melawan hukum mengakses Komputer dan/atau Sistem Elektronik milik Orang lain dengan cara apa pun.\n(2) Setiap Orang dengan sengaja dan tanpa hak atau melawan hukum mengakses Komputer dan/atau Sistem Elektronik dengan cara apa pun dengan tujuan u