# load data into Neptune, optimize and monitor the data loading process

In [None]:
import json
from datetime import datetime
import boto3
import requests
from langchain_community.llms import Ollama
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
import time
import json

# ====================
# Configuration Parameters
# ====================
# S3 details
S3_BUCKET = "your-s3-bucket-name"             # e.g., "my-neptune-bulk-load-bucket"
S3_KEY = "neptune/neptune_bulk_data.json"      # S3 key (path) where the file will be stored
FILE_PATH = "neptune_bulk_data.json"           # Local file generated from our metadata

# Neptune bulk loader settings
NEPTUNE_ENDPOINT = "your-neptune-endpoint:8182" # e.g., "neptune-cluster.cluster-abcdefg.us-east-1.neptune.amazonaws.com:8182"
IAM_ROLE_ARN = "arn:aws:iam::your-account-id:role/your-neptune-s3-role"  # IAM role for Neptune to access S3
AWS_REGION = "your-region"                       # e.g., "us-east-1"

# ====================
# Step 1: Metadata Generation and JSON Creation
# ====================

# Fake metadata from Oracle
oracle_metadata = {
    "database": "OracleDB",
    "schema": "HR",
    "tables": [
        {
            "table_name": "EMPLOYEES",
            "columns": [
                {"name": "EMP_ID", "data_type": "NUMBER", "indexed": True},
                {"name": "FIRST_NAME", "data_type": "VARCHAR2", "indexed": False},
                {"name": "LAST_NAME", "data_type": "VARCHAR2", "indexed": False},
                {"name": "HIRE_DATE", "data_type": "DATE", "indexed": False}
            ],
            "partitioning": "RANGE (HIRE_DATE)",
            "indexes": [
                {"name": "IDX_EMP_ID", "columns": ["EMP_ID"]}
            ]
        }
    ]
}

# Fake metadata from Snowflake
snowflake_metadata = {
    "database": "SnowflakeDB",
    "schema": "PUBLIC",
    "tables": [
        {
            "table_name": "CUSTOMERS",
            "columns": [
                {"name": "CUSTOMER_ID", "data_type": "NUMBER", "indexed": True},
                {"name": "NAME", "data_type": "STRING", "indexed": False},
                {"name": "EMAIL", "data_type": "STRING", "indexed": True}
            ],
            "partitioning": "CLUSTER BY (CUSTOMER_ID)",
            "indexes": [
                {"name": "IDX_CUSTOMER_ID", "columns": ["CUSTOMER_ID"]},
                {"name": "IDX_EMAIL", "columns": ["EMAIL"]}
            ]
        }
    ]
}

# Fake metadata from DuckDB
duckdb_metadata = {
    "database": "DuckDB",
    "schema": "main",
    "tables": [
        {
            "table_name": "SALES",
            "columns": [
                {"name": "SALE_ID", "data_type": "INTEGER", "indexed": True},
                {"name": "AMOUNT", "data_type": "DOUBLE", "indexed": False},
                {"name": "SALE_DATE", "data_type": "DATE", "indexed": False}
            ],
            "partitioning": "NONE",
            "indexes": [
                {"name": "IDX_SALE_ID", "columns": ["SALE_ID"]}
            ]
        }
    ]
}

def generate_neptune_bulk_data(metadatas):
    """
    Converts metadata from multiple database sources into Neptune bulk loader JSON format.
    Creates vertices and edges for Database, Schema, Table, Column, and Index entities.
    """
    bulk_data = []
    timestamp = datetime.utcnow().isoformat()

    # Create vertices and edges for each data source.
    for source in metadatas:
        db_name = source["database"]
        schema_name = source["schema"]

        # Vertex: Database
        bulk_data.append({
            "id": f"db_{db_name}",
            "label": "Database",
            "properties": {
                "name": db_name,
                "source": source.get("source", "unknown"),
                "created": timestamp
            }
        })

        # Vertex: Schema
        bulk_data.append({
            "id": f"schema_{db_name}_{schema_name}",
            "label": "Schema",
            "properties": {
                "name": schema_name,
                "database": db_name,
                "created": timestamp
            }
        })

        # Process each table in the source schema.
        for table in source["tables"]:
            table_name = table["table_name"]
            table_id = f"table_{db_name}_{schema_name}_{table_name}"

            # Vertex: Table
            bulk_data.append({
                "id": table_id,
                "label": "Table",
                "properties": {
                    "name": table_name,
                    "schema": schema_name,
                    "database": db_name,
                    "partitioning": table["partitioning"],
                    "created": timestamp
                }
            })

            # Edge: Schema contains Table
            bulk_data.append({
                "id": f"edge_{table_id}_in_schema",
                "label": "contains",
                "from": f"schema_{db_name}_{schema_name}",
                "to": table_id,
                "properties": {
                    "created": timestamp
                }
            })

            # Process each column in the table.
            for col in table["columns"]:
                col_name = col["name"]
                col_id = f"column_{db_name}_{schema_name}_{table_name}_{col_name}"
                bulk_data.append({
                    "id": col_id,
                    "label": "Column",
                    "properties": {
                        "name": col_name,
                        "data_type": col["data_type"],
                        "indexed": str(col["indexed"]),
                        "table": table_name,
                        "created": timestamp
                    }
                })

                # Edge: Table has Column
                bulk_data.append({
                    "id": f"edge_{table_id}_{col_id}_has_column",
                    "label": "has_column",
                    "from": table_id,
                    "to": col_id,
                    "properties": {
                        "created": timestamp
                    }
                })

            # Process indexes on the table.
            for idx in table["indexes"]:
                idx_name = idx["name"]
                idx_id = f"index_{db_name}_{schema_name}_{table_name}_{idx_name}"
                bulk_data.append({
                    "id": idx_id,
                    "label": "Index",
                    "properties": {
                        "name": idx_name,
                        "table": table_name,
                        "columns": json.dumps(idx["columns"]),
                        "created": timestamp
                    }
                })

                # Edge: Table has Index
                bulk_data.append({
                    "id": f"edge_{table_id}_{idx_id}_has_index",
                    "label": "has_index",
                    "from": table_id,
                    "to": idx_id,
                    "properties": {
                        "created": timestamp
                    }
                })
    return bulk_data

def save_bulk_data_to_file(bulk_data, file_path):
    """
    Saves the Neptune bulk loader data as JSON to a local file.
    """
    with open(file_path, "w") as f:
        json.dump(bulk_data, f, indent=2)
    print(f"Neptune bulk loader data generated and saved to '{file_path}'.")

# ====================
# Step 2: S3 Upload Functionality
# ====================
def upload_to_s3(file_path, bucket, key):
    """
    Uploads a local file to the specified S3 bucket and key.
    """
    s3_client = boto3.client("s3")
    try:
        s3_client.upload_file(file_path, bucket, key)
        print(f"File successfully uploaded to: s3://{bucket}/{key}")
    except Exception as e:
        print("Error uploading file to S3:", e)
        raise e  # Reraise exception for further handling

# ====================
# Step 3: Trigger Neptune Bulk Loader
# ====================
def trigger_neptune_loader(neptune_endpoint, bucket, key, iam_role_arn, aws_region):
    """
    Triggers the Neptune bulk loader by posting a payload to its HTTP endpoint.
    """
    loader_url = f"https://{neptune_endpoint}/loader"
    loader_payload = {
        "source": f"s3://{bucket}/{key}",
        "format": "json",           # Format is "json" for our bulk loader JSON file
        "iamRoleArn": iam_role_arn,
        "region": aws_region,
        "failOnError": "FALSE",
        "parallelism": "LOW"        # Adjust as necessary: LOW, MEDIUM, or HIGH
    }
    print("Initiating Neptune bulk load...")
    try:
        # In testing, you may disable SSL verification with verify=False.
        response = requests.post(loader_url, json=loader_payload, verify=False)
        response.raise_for_status()  # Raise exception on HTTP error status
        print("Neptune bulk load initiated successfully. Response:")
        print(json.dumps(response.json(), indent=2))
    except Exception as e:
        print("Error initiating Neptune bulk load:", e)
        raise e


# ====================
# New Step 4: LLM Integration Using Neptune as Backend
# ====================

def query_neptune_data(sparql_query):
    """
    Queries the Neptune SPARQL endpoint with the provided query.
    Returns the results in JSON format.
    """
    query_url = f"https://{NEPTUNE_ENDPOINT}/sparql"
    headers = {"Accept": "application/sparql-results+json"}
    params = {"query": sparql_query}
    try:
        response = requests.get(query_url, headers=headers, params=params, verify=False)
        response.raise_for_status()
        return response.json()
    except Exception as e:
        print("Error during Neptune SPARQL query:", e)
        raise e

def llm_suggest_optimizations(metadata):
    """
    Uses Ollama LLM via LangChain to analyze metadata and suggest database optimizations.
    
    Args:
        metadata (dict): Database metadata from multiple sources.
        
    Returns:
        str: LLM-generated suggestions.
    """
    # Initialize local LLM model
    llm = Ollama(model="llama3:latest")

    # Define the prompt template
    prompt = PromptTemplate(
        input_variables=["metadata"],
        template=(
            "Given the database metadata:\n"
            "{metadata}\n\n"
            "Suggest possible indexing or query optimizations to improve database performance."
        ),
    )

    # Create LangChain LLM chain
    chain = LLMChain(llm=llm, prompt=prompt)

    # Track latency
    start_time = time.perf_counter()
    suggestion = chain.run(metadata=json.dumps(metadata, indent=2))
    elapsed_time = time.perf_counter() - start_time

    print(f"LLM Suggestion Latency: {elapsed_time:.3f}s")
    
    return suggestion



# ====================
# Main Routine to Execute All Steps
# ====================
def main():
    # Combine metadata from all three sources
    all_metadata = [oracle_metadata, snowflake_metadata, duckdb_metadata]
    
    # Generate Neptune bulk loader data
    neptune_bulk_data = generate_neptune_bulk_data(all_metadata)
    
    # Save bulk data to local file
    save_bulk_data_to_file(neptune_bulk_data, FILE_PATH)
    
    # Upload file to S3
    upload_to_s3(FILE_PATH, S3_BUCKET, S3_KEY)
    
    # Trigger Neptune bulk loader using the uploaded file
    trigger_neptune_loader(NEPTUNE_ENDPOINT, S3_BUCKET, S3_KEY, IAM_ROLE_ARN, AWS_REGION)


    # Invoke LLM suggestions after metadata generation
    optimizations = llm_suggest_optimizations(all_metadata)
    print("\nLLM Optimizations:\n", optimizations)
if __name__ == "__main__":
    main()


# load llm back

In [None]:
import ollama
            
def generate_python_function(description): 
    response = ollama.chat(model='deepseek-r1:latest', messages=[
        {'role': 'user', 'content': f'Generate a Python function for: {description}'}    
    ])   
    return response['message']['content']
            
# Generate a Python function for generating the Fibonacci sequence
code = generate_python_function('fibonacci sequence generator')
            

print(code)
              

# input LLM in UI for use

run this:
docker build -t chatbot-ollama .
docker run -p 3000:3000 chatbot-ollama


# input LLM into sandbox for codeact

In [1]:
import builtins
import contextlib
import io
from typing import Any

# ---------------------------
# New Function: sandbox_run
# ---------------------------
def sandbox_run(code: str, env: dict[str, Any]) -> tuple[str, dict[str, Any]]:
    """
    Execute code in a restricted environment and capture its stdout output.
    
    Args:
        code (str): The Python code to execute.
        env (dict): A dictionary representing the environment (functions, variables) available to the code.
    
    Returns:
        tuple: A tuple containing the output string and any new variables created during execution.
    """
    # Create a copy of the environment to avoid side effects.
    _locals = env.copy()
    original_keys = set(_locals.keys())
    try:
        # Capture printed output.
        f = io.StringIO()
        with contextlib.redirect_stdout(f):
            exec(code, {"__builtins__": builtins.__dict__}, _locals)
        result_output = f.getvalue().strip()
        if result_output == "":
            result_output = "<no output>"
    except Exception as e:
        result_output = f"Error during execution: {e}"
    # Capture any new variables created.
    new_vars = {k: _locals[k] for k in _locals.keys() - original_keys}
    return result_output, new_vars

# ---------------------------
# New Function: format_graph_result
# ---------------------------
def format_graph_result(result: Any) -> str:
    """
    Convert a Neptune query result into a user-friendly graph format (e.g., Cypher-like nodes and edges).
    
    For demonstration purposes, this function simply converts the result to a string.
    You can extend it to parse the result and produce formatted node-edge patterns.
    
    Args:
        result (Any): The raw result from a Neptune query.
    
    Returns:
        str: A user-friendly representation of the graph query result.
    """
    # Placeholder: Convert result to string; expand with custom formatting as needed.
    return str(result)

# ---------------------------
# New Function: create_neptune_optimizer_agent
# ---------------------------
def create_neptune_optimizer_agent():
    """
    Creates and compiles a CodeAct agent that integrates with Amazon Neptune.
    
    The agent uses the following tools:
      - query_neptune_data: To run SPARQL/Gremlin queries.
      - llm_suggest_optimizations: To provide optimization suggestions.
      - generate_neptune_bulk_data: (Optional) To generate bulk data.
      - format_graph_result: To format raw query results in a user-friendly, Cypher-like format.
      
    The agent is built using the LangGraph CodeAct framework with an in-memory checkpointer for session state.
    
    Returns:
        agent: The compiled CodeAct agent.
    """
    # Import necessary modules from LangGraph (ensure these libraries are installed)
    from langgraph_codeact import create_codeact
    from langgraph.checkpoint.memory import MemorySaver
    # Import the chat model. We assume you have access to ChatOllama; otherwise, you may use your existing Ollama import.
    from langchain_community.chat_models import ChatOllama

    # Initialize the chat model for Llama3 (adjust the model tag as needed)
    chat_model = ChatOllama(model="llama3:70b")  # This uses a 70B instruct variant.

    # List of available tools for the agent.
    # (Make sure these functions are defined in your codebase.)
    tools = [
        query_neptune_data,         # Executes Neptune queries.
        llm_suggest_optimizations,  # Provides optimization suggestions.
        generate_neptune_bulk_data,   # Generates bulk load data (if needed by the agent).
        format_graph_result         # Formats query results.
    ]

    # Create the CodeAct graph agent with our sandbox_run for code execution.
    code_act_graph = create_codeact(chat_model, tools, sandbox_run, prompt_template=None)
    # Compile the agent using an in-memory checkpointer for state persistence.
    agent = code_act_graph.compile(checkpointer=MemorySaver())
    return agent

# ---------------------------
# (Optional) Example Function: neptune_agent_interface
# ---------------------------
def neptune_agent_interface():
    """
    An example interface to interact with the Neptune Optimizer Agent.
    
    This function simulates a conversation loop where a user can send queries
    or optimization requests. The agent preserves context between turns.
    """
    # Create the agent.
    agent = create_neptune_optimizer_agent()
    
    # Example conversation configuration (using thread_id to preserve session state).
    thread_config = {"configurable": {"thread_id": 1}}
    
    # Example 1: Data Query Request
    user_query = "Find all authors who collaborated with 'Alice' within the last year."
    messages = [{"role": "user", "content": user_query}]
    result_state = agent.invoke({"messages": messages}, config=thread_config)
    print("Agent Answer (Data Query):", result_state.get("output"))
    
    # Example 2: Optimization Request
    user_query2 = "How can I optimize queries that look up collaborations on the Sales table?"
    messages = [{"role": "user", "content": user_query2}]
    result_state = agent.invoke({"messages": messages}, config=thread_config)
    print("Agent Answer (Optimization):", result_state.get("output"))
