In [1]:
import random
import string
import networkx as nx
from typing import Dict, Any, List, Tuple
import json
import uuid

class NetworkGenerator:
    def __init__(self, schema_json: Dict[str, Any]):
        """Initialize the generator with schema definition."""
        self.schema = schema_json
        self.graph = nx.DiGraph()
        self.node_instances: Dict[str, Dict[str, Dict[str, Any]]] = {}  # Track created nodes by type with their create operations
        self.timestamp = 0
        
    def generate_random_value(self, feature_type: str) -> Any:
        """Generate random value based on feature type."""
        if feature_type == "string":
            return ''.join(random.choices(string.ascii_letters, k=8))
        elif feature_type == "float":
            return round(random.uniform(1, 1000), 2)
        elif feature_type == "integer":
            return random.randint(1, 100)
        else:
            return None

    def generate_node_payload(self, node_type: str) -> Dict[str, Any]:
        """Generate create payload for a node (without edges)."""
        node_id = f"{node_type.lower()}_{uuid.uuid4().hex[:8]}"
        
        properties = {}
        for feature_name, feature_type in self.schema["nodes"][node_type]["features"].items():
            if feature_name == "id":
                properties[feature_name] = node_id
            else:
                properties[feature_name] = self.generate_random_value(feature_type)
        
        operation = {
            "action": "create",
            "type": "schema",
            "payload": {
                "node_id": node_id,
                "node_type": node_type,
                "properties": properties
            },
            "timestamp": self.timestamp
        }
        self.timestamp += 1
        return node_id, operation

    def generate_edge_payload(self, source_id: str, target_id: str, edge_type: str) -> Dict[str, Any]:
        """Generate create payload for an edge."""
        edge_schema = self.schema["edges"][edge_type]["features"]
        
        properties = {}
        for feature_name, feature_type in edge_schema.items():
            properties[feature_name] = self.generate_random_value(feature_type)
            
        operation = {
            "action": "create",
            "type": "schema",
            "payload": {
                "source_id": source_id,
                "target_id": target_id,
                "edge_type": edge_type,
                "properties": properties
            },
            "timestamp": self.timestamp
        }
        self.timestamp += 1
        return operation

    def create_random_network(self, 
                            min_nodes_per_type: int = 2,
                            max_nodes_per_type: int = 5,
                            edge_probability: float = 0.7) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]:
        """
        Generate a random network based on schema.
        Returns lists of create, update, and delete operations.
        """
        all_operations = []
        update_operations = []
        delete_operations = []
        
        # Initialize node tracking
        for node_type in self.schema["nodes"].keys():
            self.node_instances[node_type] = {}
        
        # First create all nodes
        for node_type in self.schema["nodes"].keys():
            num_nodes = random.randint(min_nodes_per_type, max_nodes_per_type)
            
            for _ in range(num_nodes):
                node_id, node_operation = self.generate_node_payload(node_type)
                self.node_instances[node_type][node_id] = node_operation
                all_operations.append(node_operation)
        
        # Then create edges after all nodes exist
        for edge_type, edge_info in self.schema["edges"].items():
            source_type = edge_info["source"]
            target_type = edge_info["target"]
            
            for source_id in self.node_instances[source_type].keys():
                for target_id in self.node_instances[target_type].keys():
                    if random.random() < edge_probability:
                        edge_operation = self.generate_edge_payload(source_id, target_id, edge_type)
                        all_operations.append(edge_operation)

        # Sort all operations by timestamp
        create_operations = sorted(all_operations, key=lambda x: x["timestamp"])


        # Generate updates and deletions as before
        for node_type, nodes in self.node_instances.items():
            if nodes:
                # Randomly select a node to update
                node_id = random.choice(list(nodes.keys()))
                
                update_payload = {
                    "node_id": node_id,
                    "updates": {
                        "properties": {
                            feature_name: self.generate_random_value(feature_type)
                            for feature_name, feature_type in 
                            random.sample(list(self.schema["nodes"][node_type]["features"].items()),
                                        k=random.randint(1, 3))
                            if feature_name != "id"
                        }
                    }
                }
                
                update_operations.append({
                    "action": "update",
                    "type": "schema",
                    "payload": update_payload,
                    "timestamp": self.timestamp
                })
                self.timestamp += 1

            # Random deletions
            if nodes and random.random() < 0.3:
                node_id = random.choice(list(nodes.keys()))
                
                delete_payload = {
                    "node_id": node_id,
                    "cascade": random.choice([True, False])
                }
                
                delete_operations.append({
                    "action": "delete",
                    "type": "schema",
                    "payload": delete_payload,
                    "timestamp": self.timestamp
                })
                self.timestamp += 1

        return create_operations, update_operations, delete_operations

def generate_test_network(schema_file_path: str) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]:
    """Utility function to generate test network from schema file."""
    with open(schema_file_path, 'r') as f:
        schema = json.load(f)
    
    generator = NetworkGenerator(schema)
    return generator.create_random_network()

In [2]:
# Example schema is provided as a dictionary
import os
os.chdir("..")
print(os.getcwd())

schema = json.load(open("metadata/relations.json", "r"))

generator = NetworkGenerator(schema)
create_ops, update_ops, delete_ops = generator.create_random_network()

print(f"Generated {len(create_ops)} create operations")
print(f"Generated {len(update_ops)} update operations")
print(f"Generated {len(delete_ops)} delete operations")

/Users/dev/LAM/graph-server
Generated 102 create operations
Generated 7 update operations
Generated 2 delete operations


In [3]:
API_URL = "http://localhost:8000"

timestamp = 0
version = "v1"

In [4]:
import requests
import time

for op in create_ops:
    op["version"] = version
    requests.post(f"{API_URL}/schema/live/update", json=op)
    timestamp += 1
    time.sleep(0.4)

In [5]:
for op in update_ops:
    op["version"] = version
    requests.post(f"{API_URL}/schema/live/update", json=op)
    timestamp += 1
    time.sleep(0.4)
    
for op in delete_ops:
    op["version"] = version
    requests.post(f"{API_URL}/schema/live/update", json=op)
    timestamp += 1
    time.sleep(0.4)