In [1]:
import pandas as pd

from llm_inference.base import LLMInterface
from setting.db import SessionLocal
import logging
from models.entity import get_entity_model
from models.relationship import get_relationship_model


Entity = get_entity_model("entities_150001", 1536)
Relationship = get_relationship_model("relationships_150001", 1536)

logger = logging.getLogger(__name__)

llm_client = LLMInterface("ollama", "deepseek-qwen-32b")
cluster_df = pd.read_pickle("cluster_entities.pkl")
cluster_df

Unnamed: 0,cluster,entity_id,entity_name,entity_description,entity_metadata,processed
0,zr7s5Xo8_iter_1_idx1,55192,--force,Forces an upgrade even if the cluster is not s...,{'behavior': 'Replaces binaries and starts the...,False
1,zr7s5Xo8_iter_1_idx1,55553,--force,Forcibly removes down nodes from the cluster t...,{'condition': 'nodes down and impossible to co...,False
2,zr7s5Xo8_iter_1_idx1,55567,--force,"Ignores errors during cluster destruction, par...","{'data_type': 'Boolean', 'default': 'false', '...",False
3,zr7s5Xo8_iter_1_idx1,55159,--force,An option to ignore errors and start the clust...,{'description': 'Ignores errors and starts the...,False
4,SEJrBNXE_iter_1_idx1,55683,"-N, --node",An option for tiup dm patch command that speci...,"{'data_type': 'STRING', 'default': 'All nodes'...",False
...,...,...,...,...,...,...
1606,bwAvbOaF_iter_1_idx1,361793,utf8mb4,The utf8mb4 character set supports multiple co...,{'additional_info': ['utf8mb4_0900_ai_ci is th...,False
1607,bwAvbOaF_iter_1_idx1,54215,utf8mb4,A character set that supports a wide range of ...,"{'status': 'need-revised', 'topic': 'Example u...",False
1608,bwAvbOaF_iter_1_idx1,362092,utf8mb4,UTF-8 Unicode character set (4-byte) that repr...,"{'CHARACTER_SET_NAME': 'utf8mb4', 'DEFAULT_COL...",False
1609,bwAvbOaF_iter_1_idx1,54236,utf8mb4,A character set listed in the CHARACTER_SET_NA...,"{'example_of': 'CHARACTER_SET_NAME', 'topic': ...",False


In [2]:
import json
import openai

from typing import Mapping, Any

embedding_model = openai.OpenAI()

def get_text_embedding(text: str, model="text-embedding-3-small"):
    text = text.replace("\n", " ")
    return embedding_model.embeddings.create(input = [text], model=model).data[0].embedding


def get_entity_description_embedding(
    name: str, description: str
):
    combined_text = f"{name}: {description}"
    return get_text_embedding(combined_text)


def get_entity_metadata_embedding(
    metadata: dict[Mapping, Any]
):
    combined_text = json.dumps(metadata)
    return get_text_embedding(combined_text)

In [3]:
from entity_agg import merge_entities, should_merge_entities, group_mergeable_entities

cluster_mapping = {}
for _, row in cluster_df.iterrows():
    if row['processed'] == True:
        continue

    cluster_name = row['cluster']
    entity = Entity(
        id=row['entity_id'],
        name=row['entity_name'],
        description=row['entity_description'],
        meta=row['entity_metadata']
    )
    
    if cluster_name not in cluster_mapping:
        cluster_mapping[cluster_name] = set()
    
    cluster_mapping[cluster_name].add(entity)

if cluster_mapping:
    first_cluster = next(iter(cluster_mapping))
    print(f"Cluster: {first_cluster}")
    for entity in cluster_mapping[first_cluster]:
        print(f" - ID: {entity.id}, Name: {entity.name}, Description: {entity.description}")
        print(f"   - Metadata: {entity.meta}")

print(len(cluster_mapping))


Cluster: zr7s5Xo8_iter_1_idx1
 - ID: 55192, Name: --force, Description: Forces an upgrade even if the cluster is not started. This will replace binaries and start the cluster.
   - Metadata: {'behavior': 'Replaces binaries and starts the cluster.', 'data_type': 'BOOLEAN', 'default': False, 'description': 'Forces an upgrade even if the cluster is not started.', 'note': 'Forcing an upgrade of a running cluster might result in service unavailability. Unstarted clusters are started automatically after a successful upgrade.', 'topic': 'Force Upgrade'}
 - ID: 55553, Name: --force, Description: Forcibly removes down nodes from the cluster that cannot be connected to via SSH.
   - Metadata: {'condition': 'nodes down and impossible to connect via SSH', 'data_type': 'BOOLEAN', 'default': False, 'topic': 'force_removal'}
 - ID: 55159, Name: --force, Description: An option to ignore errors and start the cluster during the upgrade process.
   - Metadata: {'description': 'Ignores errors and starts t

In [4]:
for cluster_name, entities in cluster_mapping.items():
    print(f"merge entities cluster {cluster_name}, count {len(entities)}")

    token_count = merge_entities(llm_client, entities, only_count_token=True)
    if token_count > 16384:
        print("prompt token exceeds 16384", token_count)
        continue

    model_args = {}
    if token_count > 7000:
        model_args["options"]={
            "num_ctx": token_count+1500,
            "num_gpu": 80,
            "num_predict": 8192,
            "temperature": 0.1,
        }
    else:
        model_args["options"]={
            "num_ctx": 8092,
            "num_gpu": 80,
            "num_predict": 8192,
            "temperature": 0.1,
        }

    print("prompt token", token_count)
    try:
        check_result =  should_merge_entities(llm_client, entities, **model_args)
        if check_result.get("should_merge", False) is False:
            print(f"skip merge entities cluster {cluster_name}, count {len(entities)}, reason {check_result}")
            continue
        merged_entity = merge_entities(llm_client, entities, **model_args)
    except Exception as e:
        logging.error(f"Error processing cluster {cluster_name}: {e}", exc_info=True)
        continue

    if isinstance(merged_entity,dict) and "name" in merged_entity and "description" in merged_entity and "meta" in merged_entity:
        try:
            with SessionLocal() as session:
                # Step 1: Write the merged entity to the database
                new_entity = Entity(
                    name=merged_entity["name"],
                    description=merged_entity["description"],
                    meta=merged_entity.get("meta", {}),
                    description_vec=get_entity_description_embedding(merged_entity["name"], merged_entity["description"]),
                    meta_vec=get_entity_metadata_embedding(merged_entity.get("meta", {}))
                )
                print(new_entity.name)
                session.add(new_entity)
                session.flush()
                merged_entity_id = new_entity.id
                print(f"Merged entity created with ID: {merged_entity_id}")
                original_entity_ids = {entity.id for entity in entities}
                 # Step 2: Update relationships to reference the merged entity
                # Bulk update source entity IDs
                session.execute(
                    Relationship.__table__.update().where(
                        Relationship.source_entity_id.in_(original_entity_ids)
                    ).values(source_entity_id=merged_entity_id)
                )

                # Bulk update target entity IDs
                session.execute(
                    Relationship.__table__.update().where(
                        Relationship.target_entity_id.in_(original_entity_ids)
                    ).values(target_entity_id=merged_entity_id)
                )

                print(f"Relationships updated for merged entity {merged_entity_id}")

                session.commit()  # Commit the relationship updates
                print(f"Merged entity {cluster_name} processing complete.")
                cluster_df.loc[cluster_df["cluster"] == cluster_name, "processed"] = True
                cluster_df.to_pickle("cluster_entities.pkl")
        except Exception as e:
            logging.error(f"Error processing cluster {cluster_name}: {e}", exc_info=True)

            print(f"Error processing cluster {cluster_name}: {e}")
            session.rollback()
        finally:
            session.close()
    else:
        print(f"Merged entity {cluster_name} is invalid or empty.", merged_entity)

    print("*"* 100)

merge entities cluster zr7s5Xo8_iter_1_idx1, count 4
prompt token 732
skip merge entities cluster zr7s5Xo8_iter_1_idx1, count 4, reason {'should_merge': False, 'reason': "The entities have the same name '--force' but describe different functionalities. One is for upgrading clusters, another for removing nodes, and others relate to ignoring errors during destruction or upgrade."}
merge entities cluster SEJrBNXE_iter_1_idx1, count 2
prompt token 678
-N, --node
Merged entity created with ID: 660155
Relationships updated for merged entity 660155
Merged entity SEJrBNXE_iter_1_idx1 processing complete.
****************************************************************************************************
merge entities cluster SEJrBNXE_iter_1_idx2, count 4
prompt token 1008
-R, --role
Merged entity created with ID: 660156
Relationships updated for merged entity 660156
Merged entity SEJrBNXE_iter_1_idx2 processing complete.
************************************************************************

ERROR:root:Error processing cluster VxmUQa0t_iter_1_idx1: Invalid control character at: line 3 column 543 (char 570)
Traceback (most recent call last):
  File "/var/tmp/ipykernel_217731/2562874261.py", line 31, in <module>
    merged_entity = merge_entities(llm_client, entities, **model_args)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/zhaiyl/graph_toolkit/entity_agg.py", line 479, in merge_entities
    raise e
  File "/home/zhaiyl/graph_toolkit/entity_agg.py", line 476, in merge_entities
    return json.loads(json_str)
           ^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/envs/graph/lib/python3.12/json/__init__.py", line 346, in loads
    return _default_decoder.decode(s)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/envs/graph/lib/python3.12/json/decoder.py", line 338, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/envs/graph/lib/python3.12/json/d

[ERROR in call_llm_to_merge_entities]: Invalid control character at: line 3 column 543 (char 570) {
  "name": "Alertmanager",
  "description": "Alertmanager is an open-source alerting system designed to handle alerts generated by Prometheus. It plays a crucial role in the monitoring infrastructure by routing, grouping, silencing, and managing notifications for timely issue resolution. The system can be deployed multiple times within a single cluster using TiDB Ansible or TiUP, with support for deploying multiple instances. It is typically integrated with other components such as Prometheus, Grafana, DM, and TidbMonitor to form a comprehensive monitoring setup.

  Alertmanager's functionality includes managing alert notifications, handling alert routing, and providing a web interface accessible via port 9093. The system can be upgraded by downloading the latest package from the Prometheus website and applying updates using TiUP commands. Configuration options allow specifying deployment

```sql
SELECT e.id, e.name
FROM entities_150001 e
WHERE NOT EXISTS (
  SELECT 1
  FROM relationships_150001 r
  WHERE r.source_entity_id = e.id
     OR r.target_entity_id = e.id
);
```

```sql
START TRANSACTION;

DELETE FROM entities_150001
WHERE NOT EXISTS (
  SELECT 1
  FROM relationships_150001 r
  WHERE r.source_entity_id = entities_150001.id
     OR r.target_entity_id = entities_150001.id
);

SELECT ROW_COUNT();

COMMIT;
```