In [None]:
import boto3
import sagemaker
from sagemaker import image_uris
import boto3
import os
import time
import json
import urllib.request
from pathlib import Path
from sagemaker.utils import name_from_base

role = sagemaker.get_execution_role()  # execution role for the endpoint
sess = sagemaker.session.Session()  # sagemaker session for interacting with different AWS APIs
default_bucket = sess.default_bucket()  # bucket to house artifacts
region = sess._region_name

default_bucket


## 初始化数据库参数

In [None]:
neptune_host = 'proteindc-neptune-db-cluster.cluster-cnfpwwtkftli.us-east-1.neptune.amazonaws.com'
neptune_port = '8182'
iamRoleArn = 'arn:aws:iam::629244530291:role/zk-NeptuneLoadFromS3'

## 设置Notebook环境参数

In [None]:

url = f'https://{neptune_host}:{neptune_port}/loader'
graph_notebook_host = f'https://{neptune_host}'

%graph_notebook_host {neptune_host}

## 设置需要加载的文件

files_to_load 列表为需要加载的rdf文件列表；
更多文件信息可以参考uniprot公开数据集：s3://aws-open-data-uniprot-rdf
如果当前执行的区域不在eu-west-3，会将数据存储到sagemaker默认的S3存储桶中，然后再上传到neptune数据库

In [None]:


source_bucket_name = 'aws-open-data-uniprot-rdf'
bucket_name = source_bucket_name
if(region != 'eu-west-3'):
    bucket_name = default_bucket

print(bucket_name)
# define a list of files to be loaded
files_to_load = [
    "2023-03/supporting/taxonomy.rdf.gz",
    "2023-03/supporting/go.rdf.gz",
    "2023-03/uniprot/uniprotkb_reviewed_eukaryota_opisthokonta_metazoa_33208_0.rdf.gz"
]

In [None]:
s3 = boto3.resource('s3')

if(region != 'eu-west-3'):
    for file in files_to_load:
        copy_source = {
            'Bucket': source_bucket_name,
            'Key': file
         }
        s3.meta.client.copy(copy_source, default_bucket, file)


## 加载数据到Neptune数据库

In [None]:
loadids = []
print(url)
def loadfile(filelocation):
    print(f"s3://{bucket_name}/{filelocation}")
    print(iamRoleArn)
    print(region)
    data = {
      "source" : f"s3://{bucket_name}/{filelocation}",
      "format" : "rdfxml",
      "iamRoleArn" : iamRoleArn,
      "region" : region,
      "failOnError" : "FALSE",
      "parallelism" : "OVERSUBSCRIBE",
      "queueRequest" : "TRUE"
    }

    data = json.dumps(data)
    print(data)

    req = urllib.request.Request(url = url, data = bytes(data.encode("utf-8")), method = "POST")

    req.add_header("Content-type", "application/json; charset=UTF-8")

    with urllib.request.urlopen(req) as resp:
        response_data = json.loads(resp.read().decode("utf-8"))
        loadId=response_data['payload']['loadId']
        
    print("load id: {}".format(loadId))
    loadids.append(loadId)
    return loadids
    
for file in files_to_load:
    loadids = loadfile(file)

## 监控数据加载进度

In [None]:
import time
for load_id in loadids:
    req = urllib.request.Request(url = "/".join([url, load_id]), method = "GET")
    req.add_header("Content-type", "application/json; charset=UTF-8")
    
    status = None
    while status != 'LOAD_COMPLETED':
        with urllib.request.urlopen(req) as resp:
            response_data = json.loads(resp.read().decode("utf-8"))['payload']

        status = response_data['overallStatus']['status']
        totalTimeSpent = response_data['overallStatus']['totalTimeSpent']
        totalRecords = response_data['overallStatus']['totalRecords']
        end = '\n' if status == 'LOAD_COMPLETED' else '\r'
        print(f"{load_id}  status: {status} \tload time: {totalTimeSpent}s\trecords: {totalRecords}", end=end)
        time.sleep(1)

##  测试
执行下面代码查询数据

In [None]:
%%sparql
SELECT ?s ?p ?o WHERE {?s ?p ?o} LIMIT 10