In [1]:
!pip install requests-auth-aws-sigv4



In [2]:
import time 
import requests 
from requests_auth_aws_sigv4 import AWSSigV4
import json
import boto3
import os
from typing import List, Optional
VERIFY_TLS = True

In [35]:
neptune_endpoint = "db-neptune-dev.cluster-criq8uemaejw.us-west-2.neptune.amazonaws.com"
port=8182
region = 'us-west-2'
service = 'neptune-db'
url = f"https://{neptune_endpoint}:{port}/"
load = "loader/"
query = "openCypher/"

##### Convert GR to CSV

In [29]:
data_path = "data/datasets/roads/"
in_gr_path  = data_path+"USA-road-d.USA.gr"
out_csv_path = data_path+"edges.csv"

with open("../"+in_gr_path,'r') as f:
    with open("../"+out_csv_path,'w') as g:
        # ~id,~from,~to,~label,distance:Double
        # Empty string ("") is a valid id, and the edge is created with an empty string as the id.
        # Labels are case sensitive and cannot be empty. A value of "" will result in an error.
        g.write(":START_ID,:END_ID,:TYPE,weight:Long\n")
        j = 1
        for line in f:
            if line.startswith('a '):
                if j % 2 == 0:
                    line = line.replace('a ','').replace(' ',',')
                    g.write(line)
                j+=1
print(j)

58333345


In [20]:
data_path = "data/datasets/roads/"
in_gr_path  = data_path+"USA-road-d.USA.co"
out_csv_path = data_path+"nodes.csv"

with open("../"+in_gr_path,'r') as f:
    with open("../"+out_csv_path,'w') as g:
        g.write(":ID,longitude:Long,latitude:Long\n")
        j=1
        for line in f:
            if line.startswith('v '):
                line = line.replace('v ', '').replace(' ', ',')
                g.write(line)
                j+=1
print(j)

23947348


In [30]:
def upload_csvs_to_s3(bucket: str, prefix: str, 
                      local_files: List[str],
                      region: str) -> None:
    s3 = boto3.client("s3", region_name=region)
    for local_path in local_files:
        key = f"{prefix.rstrip('/')}/{os.path.basename(local_path)}"
        print(f"Uploading {local_path} -> s3://{bucket}/{key}")
        s3.upload_file(local_path, bucket, key)


def poll_bulk_load(neptune_endpoint: str, port: int, load_id: str, 
                   poll_seconds: int = 10) -> dict:
    url = f"https://{neptune_endpoint}:{port}/loader/{load_id}"
    terminal = {"LOAD_COMPLETED", "LOAD_FAILED", "LOAD_CANCELLED"}
    while True:
        aws_auth = AWSSigV4(service, region=region)
        resp = requests.get(url, timeout=30, verify=VERIFY_TLS, auth=aws_auth)
        resp.raise_for_status()
        data = resp.json()
        overall = data.get("payload", {}).get("overallStatus", {})
        status = overall.get("status") or overall.get("overallStatus")  
        # some versions use 'status'
        progress = overall.get("totalRecords") or overall.get("totalTimeSpent")
        print(f"Status: {status} | Progress: {progress}")
        if status in terminal:
            return data
        time.sleep(poll_seconds)

In [31]:
S3_BUCKET='datasets-in-out'
S3_PREFIX='input/road-usa/'
LOCAL_EDGE_FILE = "../data/datasets/roads/edges.csv"
LOCAL_NODE_FILE = "../data/datasets/roads/nodes.csv"
AWS_REGION = 'us-west-2'

upload_csvs_to_s3(
                  bucket=S3_BUCKET,
                  prefix=S3_PREFIX,
                  local_files=("../data/datasets/roads/edges.csv",
                                ),
                  region=AWS_REGION,
                 )

Uploading ../data/datasets/roads/edges.csv -> s3://datasets-in-out/input/road-usa/edges.csv


In [37]:
payload = {
    "source": "s3://datasets-in-out/input/road-usa/",
    "format": "opencypher",  # Neptune CSV (nodes/edges)
    "iamRoleArn": "arn:aws:iam::063299843915:role/service-role/AWSNeptuneNotebookRole-NeptuneNbUser",
    "region": "us-west-2",
    "failOnError": "FALSE",
    "parallelism": "LOW",
    "queueRequest": "TRUE",
    # Set to True if your edge CSV has ~id; otherwise False to auto-generate
    "userProvidedEdgeIds": "FALSE",
    "edgeOnlyLoad": "TRUE",
    # Optional parser tweaks:
    # "parserConfiguration": {"ignoreEmptyStrings": True, "allowNull": True}
}
print(payload)

url = f"https://{neptune_endpoint}:{port}/loader"
aws_auth = AWSSigV4(service, region=region)
resp = requests.post(url, 
                 json=payload, 
                 timeout=180, 
                 verify=VERIFY_TLS,
                 auth=aws_auth
                 # headers=headers
                )                        
#resp.raise_for_status()
data = resp.json()
load_id = data.get("payload", {}).get("loadId")
if not load_id:
    raise RuntimeError(f"Bulk load start did not return loadId: {json.dumps(data, indent=2)}")
print(f"Bulk load started. loadId={load_id}")                                        

{'source': 's3://datasets-in-out/input/road-usa/', 'format': 'opencypher', 'iamRoleArn': 'arn:aws:iam::063299843915:role/service-role/AWSNeptuneNotebookRole-NeptuneNbUser', 'region': 'us-west-2', 'failOnError': 'FALSE', 'parallelism': 'LOW', 'queueRequest': 'TRUE', 'userProvidedEdgeIds': 'FALSE', 'edgeOnlyLoad': 'TRUE'}
Bulk load started. loadId=c653d91f-cfe8-41fa-a0b9-f9c67f6a9dc8


In [38]:
# load_id = "ad935740-bc76-402c-95ca-8bbef195adf6" #uncomment for testing/hardcoding
poll_bulk_load(neptune_endpoint, port, load_id, poll_seconds=15)

Status: LOAD_IN_PROGRESS | Progress: 4
Status: LOAD_IN_PROGRESS | Progress: 30000
Status: LOAD_IN_PROGRESS | Progress: 90000
Status: LOAD_IN_PROGRESS | Progress: 150000
Status: LOAD_IN_PROGRESS | Progress: 210000
Status: LOAD_IN_PROGRESS | Progress: 270000
Status: LOAD_IN_PROGRESS | Progress: 360000
Status: LOAD_IN_PROGRESS | Progress: 450000
Status: LOAD_IN_PROGRESS | Progress: 540000
Status: LOAD_IN_PROGRESS | Progress: 690000
Status: LOAD_IN_PROGRESS | Progress: 810000
Status: LOAD_IN_PROGRESS | Progress: 930000
Status: LOAD_IN_PROGRESS | Progress: 1080000
Status: LOAD_IN_PROGRESS | Progress: 1230000
Status: LOAD_IN_PROGRESS | Progress: 1380000
Status: LOAD_IN_PROGRESS | Progress: 1530000
Status: LOAD_IN_PROGRESS | Progress: 1650000
Status: LOAD_IN_PROGRESS | Progress: 1800000
Status: LOAD_IN_PROGRESS | Progress: 1950000
Status: LOAD_IN_PROGRESS | Progress: 2100000
Status: LOAD_IN_PROGRESS | Progress: 2250000
Status: LOAD_IN_PROGRESS | Progress: 2400000
Status: LOAD_IN_PROGRESS | Pr

{'status': '200 OK',
 'payload': {'feedCount': [{'LOAD_COMPLETED': 1}],
  'overallStatus': {'fullUri': 's3://datasets-in-out/input/road-usa/',
   'runNumber': 7,
   'retryNumber': 1,
   'status': 'LOAD_COMPLETED',
   'totalTimeSpent': 3115,
   'startTime': 1761518371,
   'totalRecords': 29166672,
   'totalDuplicates': 0,
   'parsingErrors': 0,
   'datatypeMismatchErrors': 0,
   'insertErrors': 0}}}

In [39]:
aws_auth = AWSSigV4(service, region=region)
params = {
           "details":True,
           "errors":True,
           "page":1,
           "errorsPerPage":5
          }
resp = requests.get(url + load + load_id, 
                 #json=payload, 
                 params=params,
                 timeout=180, 
                 verify=VERIFY_TLS,
                 auth=aws_auth
                )                        
#resp.raise_for_status()
print(resp.status_code)
data = resp.text
print(data)

400
{"code":"BadRequestException","requestId":"80cd1175-4fb5-bc6d-8fe2-acd61c1fdca0","detailedMessage":"Bad route: /loaderloader/c653d91f-cfe8-41fa-a0b9-f9c67f6a9dc8","message":"Bad route: /loaderloader/c653d91f-cfe8-41fa-a0b9-f9c67f6a9dc8"}


In [None]:
#q2o5680

In [23]:
!curl "https://"$neptune_endpoint":8182/openCypher/status" \
  --data-urlencode "includeWaiting=true"

{"code":"AccessDeniedException","requestId":"6ccd0f53-295c-57d3-81db-dddd13c681b3","detailedMessage":"Missing Authentication Token","message":"Missing Authentication Token"}

In [13]:
payload = {
    "query": "MATCH (n) WHERE n.longitude=$lon RETURN n",
    "parameters": {
        "lon": -86436719
    }
}


aws_auth = AWSSigV4(service, region=region)
resp = requests.post(url + query, 
                 json=payload, 
                 params=params,
                 timeout=180, 
                 verify=VERIFY_TLS,
                 auth=aws_auth
                 # headers=headers
                )                        
#resp.raise_for_status()
print(resp.status_code)
data = resp.text
print(data)

200
{
  "results": [{
      "n": {
        "~id": "1",
        "~entityType": "node",
        "~labels": ["vertex"],
        "~properties": {
          "longitude": -86436719,
          "latitude": 32469271
        }
      }
    }]
}


In [40]:
#count the nodes and vertexes

payload = {
    "query": "MATCH (n) RETURN count(n) AS totalNodes",
}

url = f"https://{neptune_endpoint}:{port}/openCypher" 
aws_auth = AWSSigV4(service, region=region)
resp = requests.post(url, 
                 json=payload, 
                 params=params,
                 timeout=180, 
                 verify=VERIFY_TLS,
                 auth=aws_auth
                 # headers=headers
                )                        
#resp.raise_for_status()
print(resp.status_code)
data = resp.text
print(data)

200
{
  "results": [{
      "totalNodes": 23947347
    }]
}


In [41]:
payload = {
    "query": "MATCH ()-[r]->() RETURN count(r) AS totalEdges",
}
url = f"https://{neptune_endpoint}:{port}/openCypher" 
aws_auth = AWSSigV4(service, region=region)
resp = requests.post(url, 
                 json=payload, 
                 params=params,
                 timeout=180, 
                 verify=VERIFY_TLS,
                 auth=aws_auth
                 # headers=headers
                )                        
#resp.raise_for_status()
print(resp.status_code)
data = resp.text
print(data)

200
{
  "results": [{
      "totalEdges": 35619482
    }]
}
