# Import Data into Neptune DB

After completing the data generation, you are now ready to import the CSV graph data into Neptune Database.

This notebook demonstrates how to:
1. Upload the data to S3
2. Import the data into Neptune DB using the Neptune Loader API

## Prerequisites

Before running this notebook, ensure you have:
1. A Neptune DB instance
2. An S3 bucket in the same region as your Neptune DB
3. An IAM role for Neptune to access S3
4. An S3 VPC endpoint configured

All this infrastructure is created by the CDK project attached with this example, and saved in the file `cdk_outputs.json`, allowing you to simply run through this notebook

For infrastructure setup instructions, see the README under `neptune-database-graphstorm-online-inference/neptune-db-cdk/`.

In [None]:
import logging
import os
import json
import time

import boto3
import requests
import urllib3
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
from requests.adapters import HTTPAdapter
from urllib3.util import Retry

logging.basicConfig(level=logging.INFO, force=True)
logging.getLogger("boto3").setLevel(logging.WARNING)
logging.getLogger("botocore").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("s3transfer").setLevel(logging.WARNING)

# Disable SSL warnings since we're using localhost
urllib3.disable_warnings()
config_obj = %graph_notebook_config

In [None]:
# Retrieve resources created by CDK
with open(
    f"{os.environ['HOME']}/SageMaker/cdk_outputs.json", "r", encoding="utf-8"
) as f:
    cdk_outputs = json.load(f)

ACCOUNT_ID = cdk_outputs["ACCOUNT_ID"]
AWS_REGION = cdk_outputs["AWS_REGION"]
S3_BUCKET = cdk_outputs["NDB_STACK_S3_BUCKET"]
S3_GRAPH_PREFIX = f"s3://{S3_BUCKET}/neptune-input/ieee-cis-with-text-embeddings"

GRAPH_NAME = "ieee-cis-ndb"

# Neptune configuration
NEPTUNE_ENDPOINT = config_obj.host
NEPTUNE_PORT = 8182
NEPTUNE_HOST = config_obj.host
IAM_ROLE_ARN = config_obj.load_from_s3_arn

## Upload processed data to S3 to start Neptune import

To import the graph into Neptune you will first need to upload to an S3 location that's accessible by the Neptune instance.

During stack creation we created a new S3 bucket and set up the necessary permissions to allow this notebook instance and the Neptune instance
to read and write from it. So to start the import process you will first upload the processed graph data to this S3 bucket.

In [None]:
!aws s3 sync ./ieee-cis-fraud-detection/ {S3_GRAPH_PREFIX}/edges/ --exclude "*" --include "Edge*.csv"
!aws s3 sync ./ieee-cis-fraud-detection/ {S3_GRAPH_PREFIX}/vertices/ --exclude "*" --include "Vertex*.csv"

## Import Data into Neptune DB

Now that we have our data in S3 in the correct format, let's use the Neptune Loader API to import it.

To speed up the process you will import the graph in two separate steps, first importing all the vertices, then performing an optimized load of the edges only.

As a rule of thumb to speed up data imports you should:

* Minimize import size. If your data has columns you don't plan to use in your Neptune graph, it's better to remove them.
* Launch separate import jobs for vertex and edges, with the vertex job first, then for edges provide the `'edgeOnlyLoad': 'TRUE'` parameter to the load request.
* Use `'parallelism': 'OVERSUBSCRIBE'` to allocate entire cluster to loading if not doing other work on the cluster during import.
* Use single files per vertex/edge type (instead of num_chunks=16). NDB uses intra-file parallelism, so many small files are bad, while NA does the opposite
* Use larger writer instances.

See the [Neptune documentation](https://docs.aws.amazon.com/neptune/latest/userguide/bulk-load-optimize.html) for more advice on optimizing imports.
 

In the next notebook we set up functions to load data, with separate handling for edge-only loads, and functions to monitor the load process.

The data loading step should take around 4 minutes to complete on a `r8g.xlarge` instance and is asynchronous so you can proceed to the model training
notebook and come back later to verify the job has finished.

In [None]:
def create_session():
    """Create a requests session with retry configuration and AWS auth."""
    retry_strategy = Retry(
        total=3, backoff_factor=1, status_forcelist=[500, 502, 503, 504]
    )
    session = requests.Session()
    adapter = HTTPAdapter(max_retries=retry_strategy)
    session.mount("https://", adapter)
    session.verify = False  # Disable SSL verification for localhost

    return session


def load_data_to_neptune(
    source_prefix,
    format="csv",
    is_edge_only=False,
):
    """Load data into Neptune DB using the Neptune Loader API.

    Args:
        source_prefix: S3 prefix containing the data files
        format: Data format (csv for Gremlin)
        is_edge_only: Whether the load operation is for edges only
    """
    session = create_session()
    boto3_session = boto3.Session()
    credentials = boto3_session.get_credentials()
    region = boto3_session.region_name

    # Prepare the loader request
    loader_endpoint = f"https://{NEPTUNE_ENDPOINT}:{NEPTUNE_PORT}/loader"

    payload = {
        "source": source_prefix,
        "format": format,
        "iamRoleArn": IAM_ROLE_ARN,
        "region": AWS_REGION,
        "failOnError": "TRUE",
        "parallelism": "OVERSUBSCRIBE",
        "queueRequest": "TRUE",
        "updateSingleCardinalityProperties": "FALSE",
        "edgeOnlyLoad": str(is_edge_only).upper(),
    }

    try:
        # Create request for signing
        request = AWSRequest(
            method="POST",
            url=loader_endpoint,
            data=json.dumps(payload),
            headers={"Content-Type": "application/json", "Host": NEPTUNE_HOST},
        )
        SigV4Auth(credentials, "neptune-db", region).add_auth(request)
        headers = dict(request.headers)

        # Start the load job
        response = session.post(
            loader_endpoint, headers=headers, json=payload, timeout=30
        )

        if response.status_code != 200:
            raise Exception(f"Failed to start load job: {response.text}")

        load_id = response.json()["payload"]["loadId"]
        print(f"Started load job with ID: {load_id}")
        return load_id
    except requests.exceptions.RequestException as e:
        print(f"Error connecting to Neptune: {str(e)}")
        raise


def check_load_status(load_id, include_errors=False, page=1, errors_per_page=10):
    """Check the status of a Neptune load job.

    Args:
        load_id: The ID of the load job to check
        include_errors: Whether to include the list of errors (default: False)
        page: The error page number when include_errors is True (default: 1)
        errors_per_page: Number of errors per page when include_errors is True (default: 10)

    Returns:
        dict: The load status response from Neptune
    """
    session = create_session()
    boto3_session = boto3.Session()
    credentials = boto3_session.get_credentials()
    region = boto3_session.region_name

    # Construct base URL with load_id
    status_endpoint = f"https://{NEPTUNE_ENDPOINT}:{NEPTUNE_PORT}/loader/{load_id}"

    # Add query parameters
    params = []
    if include_errors:
        params.append("errors=TRUE")
        params.append(f"page={page}")
        params.append(f"errorsPerPage={errors_per_page}")

    # Append parameters to URL if any exist
    if params:
        status_endpoint += "?" + "&".join(params)

    try:
        # Create request for signing
        request = AWSRequest(
            method="GET", url=status_endpoint, headers={"Host": NEPTUNE_HOST}
        )
        SigV4Auth(credentials, "neptune-db", region).add_auth(request)
        headers = dict(request.headers)

        response = session.get(status_endpoint, headers=headers, timeout=30)
        if response.status_code != 200:
            raise Exception(f"Failed to get load status: {response.text}")

        status = response.json()
        print(f"Load Status: {json.dumps(status, indent=2)}")
        return status
    except requests.exceptions.RequestException as e:
        print(f"Error checking load status: {str(e)}")
        raise


def wait_for_load_completion(load_id, max_attempts=60, sleep_time=30):
    """Wait for a Neptune load job to complete.

    Args:
        load_id: The ID of the load job to wait for
        max_attempts: Maximum number of status checks (default: 60)
        sleep_time: Time to sleep between checks in seconds (default: 30)

    Returns:
        dict: The final status of the load job

    Raises:
        Exception: If the load job fails or times out
    """
    print(f"Waiting for load job {load_id} to complete...")
    attempts = 0

    while attempts < max_attempts:
        try:
            status = check_load_status(load_id)
            overall_status = (
                status.get("payload", {}).get("overallStatus", {}).get("status")
            )

            if overall_status == "LOAD_COMPLETED":
                print("Load job completed successfully!")
                return status
            elif overall_status in ["LOAD_FAILED", "LOAD_CANCELLED"]:
                error_logs = status.get("payload", {}).get("errors", [])
                raise Exception(
                    f"Load job failed with status {overall_status}. Errors: {error_logs}"
                )

            attempts += 1
            print(
                f"Current status: {overall_status}. Waiting {sleep_time} seconds before next check..."
            )
            time.sleep(sleep_time)
        except Exception as e:
            print(f"Error during status check: {str(e)}")
            attempts += 1
            time.sleep(sleep_time)

    raise Exception(f"Load job timed out after {max_attempts} attempts")

First, start the vertex loading process

In [None]:
vertex_load_id = load_data_to_neptune(f"{S3_GRAPH_PREFIX}/vertices/")

Next, schedule the edge loading job that will be queued after the vertex job, because we use `"queueRequest": "TRUE"` in our request inside `load_data_to_neptune`

In [None]:
edges_load_id = load_data_to_neptune(f"{S3_GRAPH_PREFIX}/edges/", is_edge_only=True)

Optionally, you can wait for the load process to finish and get the output, which should take around 4 minutes on a `db.r8g.4xlarge`, or less if you used a larger instance.

In [None]:
vertex_status = wait_for_load_completion(vertex_load_id)

In [None]:
edges_status = wait_for_load_completion(edges_load_id)

## Next Steps

While Neptune is loading the data, you can proceed to the next notebook, `2-Model-Training.ipynb`, to run model training with GraphStorm locally. In that notebook you will train a GraphStorm model and produce the necessary files to deploy a GraphStorm SageMaker endpoint.