# Neptune Analytics Instance Management With S3 Table Projections

This notebook uses the SessionManager to create projections from S3 Table datalake, load the projection into Neptune Analytics through S3. We will use the Louvain algorithm to find potential fraudulent nodes, and export the mutated graph back into S3 for our datalake.

This notebook demonstrates how to:
1. Create a projection from S3 Tables bucket.
2. Import the projection into Neptune Analytics.
3. Run Louvain algorithm on the provisioned instance to create communities.
4. Export the graph back into S3 Tables bucket.

## Setup

Import the necessary libraries and set up logging.

In [1]:
import logging
import sys
import os
from pprint import pprint
from time import sleep

import kagglehub
import boto3
from pathlib import Path

import dotenv

dotenv.load_dotenv()

from nx_neptune.session_manager import SessionManager

In [2]:
# Configure logging to see detailed information about the instance creation process
logging.basicConfig(
    level=logging.INFO,
    format='%(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S',
    stream=sys.stdout  # Explicitly set output to stdout
)
# Enable debug logging for the instance management module
for logger_name in [
    'nx_neptune.instance_management',
    'nx_neptune.session_manager',
]:
    logging.getLogger(logger_name).setLevel(logging.INFO)
logger = logging.getLogger(__name__)

## Configuration

Check for environment variables necessary for the notebook.

In [4]:
def check_env_vars(var_names):
    values = {}
    for var_name in var_names:
        value = os.getenv(var_name)
        if not value:
            print(f"Warning: Environment Variable {var_name} is not defined")
            print(f"You can set it using: %env {var_name}=your-value")
        else:
            print(f"Using {var_name}: {value}")
        values[var_name] = value
    return values
    
# Check for optional environment variables
env_vars = check_env_vars([
    'NETWORKX_S3_IMPORT_BUCKET_PATH',
    'NETWORKX_S3_EXPORT_BUCKET_PATH',
    'NETWORKX_S3_TABLES_CATALOG',
    'NETWORKX_S3_TABLES_DATABASE',
    'NETWORKX_S3_TABLES_TABLENAME',
])

# Get environment variables
s3_location_import = os.getenv('NETWORKX_S3_IMPORT_BUCKET_PATH')
s3_location_export = os.getenv('NETWORKX_S3_EXPORT_BUCKET_PATH')
s3_tables_catalog = os.getenv('NETWORKX_S3_TABLES_CATALOG')
s3_tables_database = os.getenv('NETWORKX_S3_TABLES_DATABASE')
s3_tables_tablename = os.getenv('NETWORKX_S3_TABLES_TABLENAME')
session_name = "nx-bit-quill-test"

Using NETWORKX_S3_IMPORT_BUCKET_PATH: s3://nx-cit-patents/csv-import/
Using NETWORKX_S3_EXPORT_BUCKET_PATH: s3://nx-cit-patents/csv-export/
Using NETWORKX_S3_TABLES_CATALOG: s3tablescatalog/nx-fraud-detection-data
Using NETWORKX_S3_TABLES_DATABASE: bank_fraud_full
Using NETWORKX_S3_TABLES_TABLENAME: transactions


## Data Setup

PaySim data is available from [kaggle](https://www.kaggle.com/code/kartik2112/fraud-detection-on-paysim-dataset/input?select=PS_20174392719_1491204439457_log.csv).

Data should be uploaded to an S3 bucket, and an athena table created for that bucket.

The PaySim dataset includes a simulated mobile money dataset, that involves transactions between client actors and banks. We can use this dataset to detect fraudulent activities in the simulated data.

In [None]:
paysim_s3_bucket = 'nx-fraud-detection'
paysim_s3_bucket_path = 'data/'

# Download the latest version of paysim data
paysim_path = Path(kagglehub.dataset_download("ealaxi/paysim1"))

print("Path to paysim dataset files:", paysim_path)

# upload CSV to an S3 bucket if necessary
s3_client = boto3.client('s3')

for file_path in paysim_path.iterdir():
    if file_path.is_file():
        # check if the file already exists
        object_list = s3_client.list_objects_v2(
            Bucket=paysim_s3_bucket,
            Prefix=f"{paysim_s3_bucket_path}{file_path.name}"
        )
        found_keys = object_list["KeyCount"]
        print (f"found {found_keys} matching keys for {paysim_s3_bucket_path}{file_path.name}")

        if found_keys == 0:
            print(f"uploading: {file_path.name} to {paysim_s3_bucket_path}{file_path.name}")
            s3_client.upload_file(
                str(file_path),
                paysim_s3_bucket,
                f"{paysim_s3_bucket_path}{file_path.name}"
            )

In [None]:
def _execute_create_table(stmt, catalog, database, s3_logs_location):
    athena_client = boto3.client('athena')

    # run athena query and wait for it to complete
    response = athena_client.start_query_execution(
        QueryString=stmt,
        QueryExecutionContext={
            'Database': database,
            'Catalog': catalog
        },
        ResultConfiguration={
            'OutputLocation': s3_logs_location,
        }
    )
    query_execution_id = response["QueryExecutionId"]
    print(f"Start CREATE TABLE with execution id: {query_execution_id}")

    response_status = 'QUEUED'
    while response_status == 'QUEUED' or response_status == 'RUNNING':
        sleep(1)
        response = athena_client.get_query_execution(QueryExecutionId=query_execution_id)
        response_status = response["QueryExecution"]["Status"]["State"]
        print(f"status: {response_status}")
    if response_status == 'SUCCEEDED':
        print("CREATE TABLE completed")
    else:
        print("CREATE TABLE failed - please check logs")

# create CSV table from the uploaded data

paysim_csv_tablename = 'transactions'
paysim_csv_catalog = 'AWSDataCatalog'
paysim_csv_database = 'bank_fraud'
s3_bucket_location = f"s3://{paysim_s3_bucket}/{paysim_s3_bucket_path}"

create_csv_table_stmt = f"""
CREATE EXTERNAL TABLE IF NOT EXISTS {paysim_csv_tablename} (
    `step` int,
    `type` string,
    `amount` float,
    `nameOrig` string,
    `oldbalanceOrg` float,
    `newbalanceOrig` float,
    `nameDest` string,
    `oldbalanceDest` float,
    `newbalanceDest` float,
    `isFraud` int,
    `isFlaggedFraud` int
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES ('field.delim' = ',')
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '{s3_bucket_location}'
TBLPROPERTIES ('classification' = 'csv', 'skip.header.line.count'='1');
"""

_execute_create_table(create_csv_table_stmt, paysim_csv_catalog, paysim_csv_database, f"s3://{paysim_s3_bucket}")

# create ICEBERG S3 table from the CSV table

create_s3_table_stmt = f"""
CREATE TABLE {s3_tables_tablename}
WITH (
  table_type = 'ICEBERG',
  is_external = false
)
AS SELECT * FROM "{paysim_csv_catalog}"."{paysim_csv_database}"."{paysim_csv_tablename}"
"""

_execute_create_table(create_s3_table_stmt, s3_tables_catalog, s3_tables_database, f"s3://{paysim_s3_bucket}")

## Create a New/Get existing Neptune Analytics Instance

Provision a new Neptune Analytics instance on demand, or retrieve an existing neptune-graph. Creating a new instance may take several minutes to complete.

In [5]:
session = SessionManager.session(session_name)
graph_list = session.list_graphs(with_details=False)
print("The following graphs are available:")
pprint(graph_list)

INFO - Found credentials in environment variables.
The following graphs are available:
[{'id': 'g-ljib66o0b6',
  'name': 'nx-bit-quill-test-5b31952c-82fd-447c-bcfa-c67d190282ac',
  'status': 'AVAILABLE'}]


In [6]:
session = SessionManager.session(session_name)
graph = await session.get_or_create_graph(config={"provisionedMemory": 32})
print("Retrieved graph:")
pprint(graph)

Retrieved graph:
{'id': 'g-ljib66o0b6',
 'name': 'nx-bit-quill-test-5b31952c-82fd-447c-bcfa-c67d190282ac',
 'status': 'AVAILABLE'}


## Import Data from S3

Import data from S3 into the Neptune Analytics graph and wait for the operation to complete. <br>
IAM permisisons required for import: <br>
 - s3:GetObject, kms:Decrypt, kms:GenerateDataKey, kms:DescribeKey

In [11]:
SOURCE_AND_DESTINATION_BANK_CUSTOMERS = f"""
SELECT DISTINCT "~id", 'customer' AS "~label"
FROM (
     SELECT "nameOrig" as "~id"
     FROM {s3_tables_tablename}
     WHERE "nameOrig" IS NOT NULL
     UNION ALL
     SELECT "nameDest" as "~id"
     FROM {s3_tables_tablename}
     WHERE "nameDest" IS NOT NULL
);"""

BANK_TRANSACTIONS = f"""
SELECT
    "nameOrig" as "~from",
    "nameDest" as "~to",
    "type" AS "~label",
    "step" AS "step:Int",
    "amount" AS "amount:Float",
    "oldbalanceOrg" AS "oldbalanceOrg:Float",
    "newbalanceOrig" AS "newbalanceOrig:Float",
    "oldbalanceDest" AS "oldbalanceDest:Float",
    "newbalanceDest" AS "newbalanceDest:Float",
    "isFraud" AS "isFraud:Int",
    "isFlaggedFraud" AS "isFlaggedFraud:Int"
FROM {s3_tables_tablename}
WHERE "nameOrig" IS NOT NULL AND "nameDest" IS NOT NULL AND "type" = ?"""

await session.import_from_table(
    graph["id"],
    s3_location_import,
    [SOURCE_AND_DESTINATION_BANK_CUSTOMERS, BANK_TRANSACTIONS],
    sql_parameters=[None, ["customer"]],
    catalog=s3_tables_catalog,
    database=s3_tables_database
)

INFO - Importing to graph g-ljib66o0b6
INFO - Creating table using statement:
SELECT DISTINCT "~id", 'customer' AS "~label"
FROM (
     SELECT "nameOrig" as "~id"
     FROM transactions
     WHERE "nameOrig" IS NOT NULL
     UNION ALL
     SELECT "nameDest" as "~id"
     FROM transactions
     WHERE "nameDest" IS NOT NULL
);
INFO - Executing query: 5892e5ee-a4fa-430c-b4ac-4e7d86035b75
INFO - Creating table using statement:
SELECT
    "nameOrig" as "~from",
    "nameDest" as "~to",
    "type" AS "~label",
    "step" AS "step:Int",
    "amount" AS "amount:Float",
    "oldbalanceOrg" AS "oldbalanceOrg:Float",
    "newbalanceOrig" AS "newbalanceOrig:Float",
    "oldbalanceDest" AS "oldbalanceDest:Float",
    "newbalanceDest" AS "newbalanceDest:Float",
    "isFraud" AS "isFraud:Int",
    "isFlaggedFraud" AS "isFlaggedFraud:Int"
FROM transactions
WHERE "nameOrig" IS NOT NULL AND "nameDest" IS NOT NULL AND "type" = ?
INFO - Executing query: 3c479c5f-d099-4ee2-a344-8ce95dc5a097
INFO - [2026-01

EndpointConnectionError: Could not connect to the endpoint URL: "https://neptune-graph.us-east-2.amazonaws.com/importtasks/t-7c1gk3dnk3"

## Execute Louvain Algorithm

Create a NetworkX graph and initialize the connection to the Neptune Analytics instance.

We will run the Louvain Community Detection Algorithm and mutate the graph storing the results of the vertex community in the "community" property

You can see the results in the console by removing the `write_property` argument.

In [None]:
from nx_neptune import NeptuneGraph, set_config_graph_id

config = set_config_graph_id(graph["id"])
na_graph = NeptuneGraph.from_config(config)

# sanity check: print out 10 vertices and edges from the Neptune Analytics graph
ALL_NODES = "MATCH (n) RETURN n LIMIT 10"
all_nodes = na_graph.execute_call(ALL_NODES)
print(f"all nodes: {all_nodes}")

ALL_EDGES = "MATCH ()-[r]-() RETURN r LIMIT 10"
all_edges = na_graph.execute_call(ALL_EDGES)
print(f"all edges: {all_edges}")

In [None]:
import networkx as nx

nx.config.backends.neptune.graph_id = graph["id"]

# using Neptune Analytics, run the Louvain Community Detection Algorithm and mutate
# the graph storing the results of the vertex community in the "community" property
result = nx.community.louvain_communities(nx.Graph(), backend="neptune", write_property="community")
print(f"louvain result: \n{result}")


## Export the Neptune Analytics data and add it to S3 Tables as an Iceberg table

Export the Neptune Analytics graph and a CSV export, and convert it to Iceberg format.  Use Athena to add it to S3 Tables Bucket.

In [None]:
# for the CSV table
csv_catalog = 'AwsDataCatalog'
csv_database = 'bank_fraud_full'
csv_table_name = 'transactions_csv'

# for the iceberg table
iceberg_vertices_table_name = 'customers_updated'
iceberg_edges_table_name = 'transactions_updated'
iceberg_catalog = 's3tablescatalog/nx-fraud-detection-data'
iceberg_database = 'bank_fraud_full'

await session.export_to_table(
    graph["id"],
    s3_location_export,
    csv_table_name,
    csv_catalog,
    csv_database,
    iceberg_vertices_table_name,
    iceberg_edges_table_name,
    iceberg_catalog,
    iceberg_database
)

In [None]:
# destroy the session graphs
session.destroy_all_graphs()

## Conclusion

This notebook demonstrated the complete lifecycle of running analytics from a datalake projection into Neptune Analytics instance:

1. **Creation**: We created a new Neptune Analytics instance on demand
2. **Import**: We imported a projection of the datalake
3. **Usage**: We ran graph algorithms (Louvain) on the instance and mutated the data
4. **Deletion**: We exported the updated data back into the datalake into an iceberg table

The session manager (`SessionManager`) provides an easy mechanism to execute general datalake functionality.