# Fraud Detection Knowledge Graph Pipeline

## Overview

This notebook demonstrates a comprehensive pipeline for creating a fraud detection knowledge graph using AWS services. The pipeline leverages advanced technologies to process credit card transaction data, generate a knowledge graph, and perform machine learning-based fraud detection.

The pipeline consists of the following main steps:
1. Data Preparation: Preprocessing of credit card transaction data
2. Knowledge Graph Generation: Using Amazon Bedrock with Claude 3 Sonnet
3. Graph Database Population: Loading data into Amazon Neptune
4. Machine Learning: Utilizing Neptune ML with Graph Neural Networks (GNNs)
5. Fraud Detection: Performing inferences on the graph to predict fraudulent transactions

## AWS Services Used

- **Amazon Bedrock**: Used for generating the knowledge graph structure from transaction data
- **Amazon Neptune**: Graph database for storing and querying the knowledge graph
- **Neptune ML**: Machine learning capabilities integrated with Neptune for training GNNs and performing fraud detection

## Prerequisites

Before running this notebook, ensure you have:

- An AWS account with access to:
  - Amazon Bedrock
  - Amazon Neptune
  - Neptune ML
- Python 3.x installed
- Appropriate IAM roles and permissions for accessing AWS services:
  - IAM role with permissions to invoke Bedrock models
  - Neptune ML IAM role with access to S3, SageMaker, and Neptune
  - [Neptune IAM Authentication](https://docs.aws.amazon.com/neptune/latest/userguide/iam-auth.html) configured
- Familiarity with graph databases and machine learning concepts
- Amazon Neptune cluster set up with ML capabilities enabled:
  - Use the [Neptune ML Quick Start Guide](https://docs.aws.amazon.com/neptune/latest/userguide/machine-learning-quick-start.html) for cluster setup
- Model access on Amazon Bedrock for the Claude 3 Sonnet model

For detailed setup instructions, refer to the [Amazon Neptune ML documentation](https://docs.aws.amazon.com/neptune/latest/userguide/machine-learning.html).

## Setup and Installation

1. Set up AWS credentials and permissions:
   - Ensure your AWS CLI is configured with the appropriate credentials
   - Verify that your IAM user or role has the necessary permissions to access Bedrock, Neptune, and related services

2. Install required Python libraries:
   ```
   pip install boto3 gremlinpython tqdm langchain langchain_experimental nest_asyncio
   ```

3. Set up Amazon Neptune cluster:
   - Use the CloudFormation template provided in the [Neptune ML Quick Start Guide](https://docs.aws.amazon.com/neptune/latest/userguide/machine-learning-quick-start.html)
   - This will create a Neptune cluster with ML capabilities enabled
   - Copy the `neptune_ml_utils.py` file from the path `Neptune/03-Neptune-ML/neptune_ml_utils.py` to the location of this notebook in the Neptune Notebook instance. 

4. Configure Bedrock access:
   - Ensure you have model access on Bedrock for the Claude 3 Sonnet model
   - Set up the necessary IAM permissions for Bedrock API calls

Now that the setup is complete, we can proceed with the pipeline execution.

## Setup and Dependencies

First, let's install the necessary dependencies. We're using a variety of libraries to interact with AWS services and process our data efficiently.

- `boto3`: The AWS SDK for Python, used for interacting with various AWS services
- `langchain` and `langchain_experimental`: For working with large language models and creating chains of operations
- `gremlinpython`: To interact with Neptune using the Gremlin graph traversal language
- `numpy` and `pandas`: For efficient data manipulation and analysis
- `nest_asyncio`: To allow for asynchronous operations in Jupyter notebooks

Installing these dependencies ensures we have all the tools needed for our fraud detection pipeline.

In [None]:
%pip install --upgrade --quiet boto3==1.34.162 langchain langchain-community langchain-experimental langchain_aws nest_asyncio json-repair awscli numpy pandas gremlinpython

## Input Data

Before we begin data preparation, let's understand the structure of our input data:

- The input data is a CSV file containing credit card transaction information.
- Each row represents a single transaction with fields such as transaction amount, date, merchant information, and whether the transaction was fraudulent.
- The data may contain sensitive information, so ensure proper data handling and privacy measures are in place.

### Data Structure Assumptions:
- Transaction IDs are unique
- Timestamps are in a consistent format
- Monetary amounts are in a single currency

### Potential Data Quality Issues:
- Missing values in non-essential fields
- Outliers in transaction amounts
- Imbalanced classes (fraudulent vs. non-fraudulent transactions)

We'll address these issues in our data preparation steps to ensure high-quality input for our knowledge graph generation.

## 1. Data Preparation

In this section, we'll load and preprocess the credit card transaction data. Data preparation is a crucial step in any machine learning pipeline, especially for fraud detection. We need to ensure our data is clean, properly formatted, and ready for knowledge graph generation.

We'll perform the following steps:
1. Load the raw transaction data from a CSV file
2. Clean and preprocess the data (handling missing values, encoding categorical variables, etc.)
3. Create features that might be relevant for fraud detection
4. Prepare the data for input into our knowledge graph generation process

This preparation will help us create a more accurate and informative knowledge graph, which is essential for effective fraud detection.

In [None]:
import pandas as pd
df = pd.read_csv("card_transaction.v1.csv").sample(n=100000, random_state=42)
df

In [None]:
df [df['Is Fraud?'] == 'Yes'].shape

In [None]:
import matplotlib.pyplot as plt 

percent_missing=(df.isnull().sum()*100/df.shape[0]).sort_values(ascending=True)
plt.title("Missing Value Analysis")
plt.xlabel("Features")
plt.ylabel("% of missing values")
plt.bar(percent_missing.sort_values(ascending=False).index,percent_missing.sort_values(ascending=False),color=(0.1, 0.1, 0.1, 0.1),edgecolor='blue')
plt.xticks(rotation=90)

### Data Cleaning and Feature Engineering

Now that we've loaded our data, let's clean it up and create some useful features for our fraud detection model. This process involves:

1. Handling missing values
2. Converting data types (e.g., string to datetime)
3. Creating new features that might be indicative of fraudulent activity
4. Encoding categorical variables

These steps will help us prepare a rich dataset for our knowledge graph and subsequent machine learning model.

In [None]:
df["card_id"] = df["User"].astype(str) + "_" + df["Card"].astype(str)
df.Amount.head(5)

In [None]:
df["Amount"]=df["Amount"].str.replace("$","").astype(float)
df["Hour"] = df["Time"].str [0:2]
df["Minute"] = df["Time"].str [3:5]

In [None]:
df.Hour.head(5)

In [None]:
df.Minute.head(5)

In [None]:
df = df.drop(["Time","User","Card"],axis=1)
df.info()

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

df["Errors?"].unique()
df["Errors?"]= df["Errors?"].fillna("No error")
df = df.drop(columns=["Merchant State","Zip"],axis=1)
df["Is Fraud?"] = df["Is Fraud?"].apply(lambda x: 1 if x == 'Yes' else 0)
df["Merchant City"]=LabelEncoder().fit_transform(df["Merchant City"])
df["Use Chip"]=LabelEncoder().fit_transform(df["Use Chip"])
df["Errors?"]=LabelEncoder().fit_transform(df["Errors?"])
df["Errors?"].unique()

In [None]:
df = df.rename(columns={'Transaction_Type': 'Transaction Type'})
df.info()

In [None]:
df_reset = df.reset_index()
df_reset.rename(columns={'index': 'Transaction_ID'}, inplace=True)
df_reset.to_csv('purchases.csv', index=False)

In [None]:
df_reset

## 2. Knowledge Graph Generation with Amazon Bedrock and Claude 3 Sonnet

In this section, we use Amazon Bedrock with the Claude 3 Sonnet model to generate our knowledge graph. Here's how it works:

1. Data Processing: We send batches of preprocessed transaction data to Bedrock.
2. Natural Language Understanding: Claude 3 Sonnet analyzes the data, understanding the relationships between entities.
3. Graph Structure Generation: The model outputs a structured representation of nodes and relationships.

The resulting knowledge graph structure typically includes:
- Nodes: Representing entities like transactions, users, and merchants
- Relationships: Connecting nodes (e.g., "user made transaction", "transaction occurred at merchant")
- Properties: Additional attributes on nodes and relationships

Limitations and Considerations:
- The quality of the graph depends on the input data quality and the model's understanding.
- Large datasets may require significant processing time.
- The model may occasionally generate inconsistent or irrelevant relationships, requiring post-processing.

We'll use batch processing to handle large volumes of data efficiently.

In [None]:
import getpass
import os
import boto3
from langchain_experimental.graph_transformers import LLMGraphTransformer
import nest_asyncio
from langchain_aws import ChatBedrock

nest_asyncio.apply()

# MODEL_ID = "meta.llama3-1-8b-instruct-v1:0"
MODEL_ID = "anthropic.claude-3-5-sonnet-20240620-v1:0"
# MODEL_ID = "anthropic.claude-3-haiku-20240307-v1:0"

from botocore.config import Config

# Create a configuration object with custom retry settings
config = Config(
    region_name = 'us-east-1',
    retries={
        'max_attempts': 30  # Set your desired retry count here
    }
)

# Initialize your Bedrock client with the custom config
bedrock_client = boto3.client("bedrock-runtime", config=config)
llm = ChatBedrock(model_id=MODEL_ID, client=bedrock_client)

In [None]:
llm_transformer = LLMGraphTransformer(
    llm=llm,
    allowed_nodes=[
        "Transaction", 
        "Merchant",
        "Card", 
    ],
    allowed_relationships=[
        "Purchased_At", 
        "Purchased_By",  
    ],
    node_properties=[
        "Transaction_Type",
        "Transaction_Date", 
        "Transaction_Time",
        "Amount",
        "Error_Count",
        "Fraud_Status",
        "Merchant_City", 
        "Merchant_Category",
    ]
)

### Process Data and Generate Knowledge Graph

Now that we have our Bedrock client and graph transformer set up, we'll process our data to generate the knowledge graph. This involves:

1. Loading our processed transaction data
2. Splitting the data into manageable batches
3. Using Bedrock's batch inference capabilities to process these batches in parallel
4. Collecting and combining the results to form our complete knowledge graph

This approach allows us to efficiently process large amounts of data and create a comprehensive knowledge graph representing our transaction network.

In [None]:
from langchain_community.document_loaders.csv_loader import CSVLoader

file_path = ("purchases.csv")

loader = CSVLoader(file_path=file_path)
documents = loader.load()

print(len(documents))

In [None]:
import inspect
from typing import Any, Dict, List
import json
import uuid

def extract_prompts(steps: List[Any]) -> List[Dict[str, Any]]:
    def extract_prompt_template(item: Any) -> Any:
        if hasattr(item, 'template'):
            return item.template
        elif hasattr(item, 'prompt'):
            return extract_prompt_template(item.prompt)
        elif hasattr(item, 'messages'):
            return [extract_prompt_template(msg) for msg in item.messages]
        elif isinstance(item, list):
            return [extract_prompt_template(subitem) for subitem in item]
        else:
            return str(item)

    def analyze_runnable(runnable: Any, depth: int = 0) -> Dict[str, Any]:
        if depth > 5:  # Prevent infinite recursion
            return {"type": type(runnable).__name__, "note": "Max depth reached"}
        
        info = {"type": type(runnable).__name__}
        
        if hasattr(runnable, 'func'):
            info["func"] = str(runnable.func)
            if callable(runnable.func):
                try:
                    info["func_source"] = inspect.getsource(runnable.func)
                except Exception:
                    info["func_source"] = "Source code not available"

        if hasattr(runnable, 'bound'):
            info["bound"] = analyze_runnable(runnable.bound, depth + 1)
        
        if hasattr(runnable, 'kwargs'):
            info["kwargs"] = analyze_kwargs(runnable.kwargs)
        
        if hasattr(runnable, 'steps'):
            info["steps"] = {k: analyze_runnable(v, depth + 1) for k, v in runnable.steps.items()}
        
        if hasattr(runnable, 'mapper'):
            info["mapper"] = analyze_runnable(runnable.mapper, depth + 1)
        
        return info

    def analyze_kwargs(kwargs: Dict[str, Any]) -> Dict[str, Any]:
        analyzed_kwargs = {}
        for k, v in kwargs.items():
            if isinstance(v, (str, int, float, bool)):
                analyzed_kwargs[k] = v
            elif isinstance(v, dict):
                analyzed_kwargs[k] = analyze_kwargs(v)
            elif isinstance(v, list):
                analyzed_kwargs[k] = [analyze_kwargs(item) if isinstance(item, dict) else str(item) for item in v]
            else:
                analyzed_kwargs[k] = analyze_runnable(v)
        return analyzed_kwargs

    results = []
    for step in steps:
        step_info = {"name": step.__class__.__name__}
        
        if hasattr(step, 'steps__'):
            step_info["type"] = "Complex Step"
            step_info["substeps"] = {k: analyze_runnable(v) for k, v in step.steps__.items()}
        elif hasattr(step, 'get_prompts'):
            step_info["type"] = "Prompt Step"
            step_prompts = step.get_prompts()
            step_info["prompts"] = extract_prompt_template(step_prompts)
        else:
            step_info["type"] = "Unknown Step"
        
        if hasattr(step, '__dict__'):
            for key, value in step.__dict__.items():
                if key not in ['steps__', 'prompt']:
                    if isinstance(value, (str, int, float, bool)):
                        step_info[key] = value
                    elif hasattr(value, '__dict__'):
                        step_info[key] = analyze_runnable(value)
                    else:
                        step_info[key] = str(value)
        
        results.append(step_info)
    
    return results

In [None]:
def prepare_prompt(extracted_info: List[Dict[str, Any]], input_text: str) -> str:
    system_content = ""
    human_content = ""

    for step in extracted_info:
        if step["type"] == "Prompt Step" and "prompts" in step:
            for prompt in step["prompts"]:
                if isinstance(prompt, list):
                    system_content += prompt[0] + "\n\n"
                    human_content += prompt[1] + "\n\n"
                elif isinstance(prompt, str):
                    system_content += prompt + "\n\n"
        elif step["type"] == "Complex Step" and "substeps" in step:
            for substep in step["substeps"].values():
                if substep["type"] == "RunnableBinding" and "kwargs" in substep:
                    tools = substep["kwargs"].get("tools", [])
                    for tool in tools:
                        system_content += f"Use the following tool: {json.dumps(tool)}\n\n"

    # Prepare messages as a list of dictionaries (JSON array)
    messages = [
        {"role": "user", "content": human_content.strip().format(input=input_text)},
        {"role": "assistant", "content": "Certainly, I'll extract the information from the provided input and create a knowledge graph according to the instructions. I'll use the specified tools to structure the output. Here's the extracted information:"}
    ]

    return system_content.strip(), messages

def process_langchain_to_prompt(llm_transformer_filtered, input_text):
    extracted_info = extract_prompts(llm_transformer_filtered.chain.steps)
    prompt_messages = prepare_prompt(extracted_info, input_text)
    return prompt_messages

def call_bedrock_claude(system_message, prompt_messages):
    # Initialize the Bedrock client
    bedrock = boto3.client('bedrock-runtime')

    # Prepare the request body
    body = {
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens": 2000,
        "system":system_message,
        "messages": prompt_messages,  # Use the list of message dictionaries directly
        "temperature": 0,
        "top_p": 0.9
    }

    # Make the API call
    try:
        response = bedrock.invoke_model(
            modelId='anthropic.claude-3-sonnet-20240229-v1:0',  # Use the appropriate model ID
            contentType='application/json',
            accept='application/json',
            body=json.dumps(body)
        )
        
        # Parse the response
        response_body = json.loads(response['body'].read())
        return response_body['content'][0]['text']
    
    except Exception as e:
        print(f"An error occurred: {str(e)}")
        return None


# Example of how to use the functions together (commented out)
input_text = "transaction_id: 16966740\nYear: 2006\nMonth: 5\nDay: 13\nAmount: 96.0\nTransaction Type: 2\nMerchant Name: 1799189980464955940\nMerchant City: 4954\nMCC: 5499\nErrors?: 10\nIs Fraud?: 0\ncard_id: 1378_2\ntime: 09:08:00"

system_message, prompt_messages = process_langchain_to_prompt(llm_transformer, input_text)
print(f"Generated JSON:\n{system_message, prompt_messages}")
result = call_bedrock_claude(system_message, prompt_messages)

if result:
    print("Generated Knowledge Graph:")
    print(result)
else:
    print("Failed to generate knowledge graph.")

In [None]:
def prepare_prompt_jsonl(extracted_info: List[Dict[str, Any]], input_text: str) -> Dict[str, Any]:
    messages = []
    tools = []
    system_content = ""

    for step in extracted_info:
        if step["type"] == "Prompt Step" and "prompts" in step:
            for prompt in step["prompts"]:
                if isinstance(prompt, list):
                    system_content += prompt[0] + "\n\n"
                    messages.append({
                        "role": "user",
                        "content": [{"type": "text", "text": prompt[1].format(input=input_text)}]
                    })
                elif isinstance(prompt, str):
                    system_content += prompt + "\n\n"
        elif step["type"] == "Complex Step" and "substeps" in step:
            for substep in step["substeps"].values():
                if substep["type"] == "RunnableBinding" and "kwargs" in substep:
                    tools.extend(substep["kwargs"].get("tools", []))

    # Create the output in the specified format
    output = {
        "recordId": str(uuid.uuid4()).replace('-','')[:11],  # Generate a unique 11-character ID
        "modelInput": {
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 4000,
            "system": system_content.strip(),
            "messages": messages,
            "temperature": 0,
            "top_p": 0.9,
            "top_k": 50,
        }
    }

    # Only add tools if they exist
    if tools:
        output['modelInput']["tools"] = [
            {
                "name": tool.get("name", ""),
                "description": tool.get("description", ""),
                "input_schema": tool.get("input_schema", {})  
            }
            for tool in tools
        ]
        # Adding a default tool_choice
        output['modelInput']["tool_choice"] = {"type": "auto"}

    return output

def process_langchain_documents_to_prompt_jsonl(llm_transformer_filtered, documents, output_file):
    extracted_info = extract_prompts(llm_transformer_filtered.chain.steps)
    with open(output_file, 'w') as f:
        for doc in documents:
            prompt_message = prepare_prompt_jsonl(extracted_info, doc.page_content)
            
            # Write the JSON to the file, followed by a newline
            json.dump(prompt_message, f)
            f.write('\n')

    print(f"Prompts have been saved to {output_file}")

# process_langchain_documents_to_prompt_jsonl(llm_transformer, documents[:1000], 'batch-1k.jsonl')
# process_langchain_documents_to_prompt_jsonl(llm_transformer, documents[:30000], 'batch-30k.jsonl')
# process_langchain_documents_to_prompt_jsonl(llm_transformer, documents[:45000], 'batch-45k.jsonl')
process_langchain_documents_to_prompt_jsonl(llm_transformer, documents[:50000], 'batch-50k.jsonl')


In [None]:
%pip install --upgrade boto3

In [None]:
import os
os._exit(00)

## 3. Graph Database Population

After generating our knowledge graph, the next step is to load this data into Amazon Neptune, our graph database. This process involves:

1. Preparing the generated nodes and relationships for Neptune ingestion
2. Configuring the Neptune connection
3. Using the Neptune bulk loader to efficiently insert large amounts of data
4. Verifying the data load and performing basic queries

Loading our knowledge graph into Neptune allows us to leverage its powerful query capabilities and sets the stage for our machine learning tasks.

In [None]:
import boto3
import json
import time
import os
import uuid
from collections import deque

# Initialize clients
s3 = boto3.client('s3')
bedrock = boto3.client('bedrock')

# S3 bucket names
input_bucket = 'bedrock-batch-inference-fraud-detection'
output_bucket = 'bedrock-batch-inference-fraud-detection'

# IAM role ARN
role_arn = 'arn:aws:iam::590183881541:role/Bedrock-S3-FullAccess'

def read_jsonl_file(filename):
    with open(filename, 'r') as f:
        return [json.loads(line) for line in f]

def create_jsonl_shard(data, filename):
    with open(filename, 'w') as f:
        for item in data:
            json.dump(item, f)
            f.write('\n')

def upload_to_s3(filename, bucket, s3_key):
    s3.upload_file(filename, bucket, s3_key)

def download_from_s3(bucket, s3_key, filename):
    s3.download_file(bucket, s3_key, filename)

def check_job_status(job_identifier):
    response = bedrock.get_model_invocation_job(jobIdentifier=job_identifier)
    return response['status'] # 'Submitted'|'InProgress'|'Completed'|'Failed'|'Stopping'|'Stopped'|'PartiallyCompleted'|'Expired'|'Validating'|'Scheduled'

def submit_job(shard_data, shard_index):
    unique_job_name = f"batch-job-{uuid.uuid4()}-shard-{shard_index}"
    shard_filename = f'input_shard_{shard_index}.jsonl'
    
    create_jsonl_shard(shard_data, shard_filename)
    s3_input_key = f'input/{shard_filename}'
    upload_to_s3(shard_filename, input_bucket, s3_input_key)
    
    input_data_config = {
        "s3InputDataConfig": {
            "s3Uri": f"s3://{input_bucket}/{s3_input_key}"
        }
    }
    output_data_config = {
        "s3OutputDataConfig": {
            "s3Uri": f"s3://{output_bucket}/"
        }
    }
    
    response = bedrock.create_model_invocation_job(
        roleArn=role_arn,
        modelId="anthropic.claude-3-sonnet-20240229-v1:0",
        jobName=unique_job_name,
        inputDataConfig=input_data_config,
        outputDataConfig=output_data_config
    )
    job_identifier = response['jobArn']
    
    print(f"Job submitted for shard {shard_index}. Name: {unique_job_name}, Identifier: {job_identifier}")
    
    os.remove(shard_filename)
    
    return job_identifier, unique_job_name

In [None]:
# Shard configuration
shard_size = 10000  # Number of items per shard

# Input JSONL file
input_jsonl_file = 'batch-50k.jsonl'  # Replace with your input file name

# Read the input JSONL file
data = read_jsonl_file(input_jsonl_file)
print(f"Read {len(data)} items from {input_jsonl_file}")

# Shard the data
shards = [data[i:i + shard_size] for i in range(0, len(data), shard_size)]
print(f"Created {len(shards)} shards")

# Queue to hold all shards
shard_queue = deque(enumerate(shards))

# List to hold active jobs
active_jobs = []

# List to hold completed jobs
completed_jobs = []

# Main loop
while shard_queue or active_jobs:
    # Submit new jobs if there are fewer than 3 active jobs and shards are available
    while len(active_jobs) < 3 and shard_queue:
        shard_index, shard = shard_queue.popleft()
        job_identifier, job_name = submit_job(shard, shard_index)
        active_jobs.append((shard_index, job_identifier, job_name))

    # Check status of active jobs
    for job in active_jobs[:]:  # Iterate over a copy of the list
        shard_index, job_identifier, job_name = job
        status = check_job_status(job_identifier)
        if status in ['Completed', 'Failed', 'Stopped']:
            print(f"Job for shard {shard_index} (Name: {job_name}) finished with status: {status}")
            completed_jobs.append((shard_index, status, job_identifier, job_name))
            active_jobs.remove(job)
        else:
            print(f"Job for shard {shard_index} (Name: {job_name}) with status: {status}")

    # Wait before checking again
    if active_jobs:
        print(f"Waiting for {len(active_jobs)} jobs to complete...")
        time.sleep(60)  # Wait for 60 seconds before checking again

# Save completed jobs to a JSON file
with open('completed_jobs.json', 'w+') as f:
    json.dump(completed_jobs, f)

In [None]:
import json
import boto3
import json
import time
import os
import uuid
from collections import deque

# Initialize clients
s3 = boto3.client('s3')
bedrock = boto3.client('bedrock')
completed_jobs = []


output_bucket = 'bedrock-batch-inference-fraud-detection'

def download_from_s3(bucket, s3_key, filename):
    s3.download_file(bucket, s3_key, filename)

# Load completed jobs from a JSON file
try:
    with open('completed_jobs.json', 'r') as f:
        completed_jobs = json.load(f)
    print(f"Loaded {len(completed_jobs)} completed jobs from 'completed_jobs.json'")
except FileNotFoundError:
    print("No 'completed_jobs.json' file found, starting with an empty list of completed jobs.")


In [None]:
count = 0
nodes = []
relationships = []
for shard_index, status, job_identifier, job_name in completed_jobs:
    if status == 'Completed':
        output_file = f'input_shard_{shard_index}.jsonl.out'
        s3_output_key = f'{job_identifier.split("/")[-1]}/{output_file}'
        download_from_s3(output_bucket, s3_output_key, output_file)
        print(f"Results for shard {shard_index} (Job: {job_name}) downloaded to {output_file}")
        with open(output_file, 'r') as f:
            for line in f:
                try:
                    jsonl = json.loads(line)
                    modelOutput = jsonl['modelOutput']['content']
                    for e in modelOutput:
                        tool_input = e['input']
                        if isinstance(tool_input, dict) and 'nodes' in tool_input and 'relationships' in tool_input:
                            nodes += tool_input['nodes']
                            relationships += tool_input['relationships']
                    count+=1
                except (ValueError, KeyError):
                    # Ignore lines not in the expected format
                    pass
        os.remove(output_file)
    else:
        print(f"Job for shard {shard_index} (Name: {job_name}, Identifier: {job_identifier}) did not complete successfully. Check the AWS console for more details.")

print(f"{count} records transformed")
print(f"Total Nodes: {len(nodes)}", f"Node Example: {nodes[0]}")
print(f"Total Relationships: {len(relationships)}", f"Relationship Example: {relationships[0]}")

In [None]:
import nest_asyncio

nest_asyncio.apply()

In [None]:
%pip install --quiet gremlinpython tqdm

In [None]:
import boto3
from gremlin_python.driver import client, serializer
# Your Neptune endpoint and port
neptune_endpoint = 'neptunedbcluster-dhcc2z0nwwe2.cluster-cl6ea8ky45tr.us-east-1.neptune.amazonaws.com'
neptune_port = '8182'

# Create a Neptune client
neptune_client = client.Client(
    f'wss://{neptune_endpoint}:{neptune_port}/gremlin',
    'g',
    message_serializer=serializer.GraphSONSerializersV2d0()
)

print(neptune_client.submit("g.E().drop()").all().result())
print(neptune_client.submit("g.V().drop()").all().result())

In [None]:
import boto3
from gremlin_python.driver import client, serializer
import logging
from tqdm import tqdm

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Create a Neptune client
neptune_client = client.Client(
    f'wss://{neptune_endpoint}:{neptune_port}/gremlin',
    'g',
    message_serializer=serializer.GraphSONSerializersV2d0()
)

def add_to_neptune(nodes, relationships):
    try:
        # Upsert nodes
        for node in tqdm(nodes):
            query = (
                f"g.V('{node['id']}').fold().coalesce("
                f"unfold(), "
                f"addV('{node['type']}').property(id, '{node['id']}')"
                f")"
            )
            
            for property in node.get('properties',[]):
                query += f".property(single, '{property['key']}', {repr(property['value'])})"
            
            # logger.info(f"Upserting node: {node['id']}")
            result = neptune_client.submit(query).all().result()
            logger.debug(f"Node upsert result: {result}")

        # Upsert relationships
        for rel in tqdm(relationships):
            query = (
                f"g.V('{rel['source_node_id']}').as('source')"
                f".V('{rel['target_node_id']}').as('target')"
                f".coalesce("
                f"__.inE('{rel['type']}').where(outV().as('source')),"
                f"addE('{rel['type']}').from('source').to('target')"
                f")"
            )
            for property in rel.get('properties',[]):
                query += f".property('{property['key']}', {repr(property['value'])})"

            # logger.info(f"Upserting relationship: {rel['type']} from {rel['source_node_id']} to {rel['target_node_id']}")
            result = neptune_client.submit(query).all().result()
            logger.debug(f"Relationship upsert result: {result}")

        logger.info(f"Graph data upserted successfully.")
    except Exception as e:
        logger.error(f"An error occurred while upserting graph data: {str(e)}", exc_info=True)

In [None]:
add_to_neptune(nodes, relationships)

In [None]:
%%gremlin
g.V().groupCount().by(label).unfold().order().by(keys)

In [None]:
%%gremlin
g.E().groupCount().by(label).unfold().order().by(keys)

## Graph Neural Networks (GNNs) and Fraud Detection

Before we set up Neptune ML, let's understand why Graph Neural Networks are particularly suitable for fraud detection:

### What are GNNs?
Graph Neural Networks are a class of deep learning models designed to work directly on graph-structured data. They can capture complex patterns and relationships within interconnected data.

### Why GNNs for Fraud Detection?
1. Relational Information: GNNs can leverage the relationships between entities (e.g., users, transactions, merchants) to identify suspicious patterns.
2. Feature Propagation: They can aggregate information from neighboring nodes, capturing broader context beyond individual transactions.
3. Structural Patterns: GNNs can learn and identify subgraph patterns that may indicate fraudulent activity.

### Neptune ML and SageMaker Integration
Neptune ML integrates with Amazon SageMaker to train and deploy GNN models:
1. Neptune exports graph data to a format suitable for ML training.
2. SageMaker runs the GNN training process using the exported data.
3. Trained models are deployed as SageMaker endpoints for inference.

### Our Specific ML Problem
We're tackling a node classification problem:
- Goal: Predict whether a transaction node is fraudulent or not.
- Input: Transaction node features and its local graph structure.
- Output: Probability of the transaction being fraudulent.

This approach allows us to leverage both the transaction details and the broader context of the user's and merchant's transaction history for more accurate fraud detection.

## 4. Neptune ML Setup

With our data loaded into Neptune, we can now leverage Neptune ML to perform machine learning tasks on our graph. Neptune ML integrates with Amazon SageMaker to train and deploy Graph Neural Network (GNN) models. In this section, we'll:

1. Configure Neptune ML settings
2. Prepare our data for machine learning tasks
3. Define our ML problem (node classification for fraud detection)
4. Set up the necessary IAM roles and permissions

This setup lays the groundwork for training our fraud detection model using the structure and features of our knowledge graph.

NOTE: Copy the `neptune_ml_utils.py` file from the path `Neptune/03-Neptune-ML/neptune_ml_utils.py` to the location of this notebook in the Neptune Notebook instance.

In [None]:
%%gremlin
g.V().hasLabel('Transaction').has('fraud_status', "0").limit(100).property('actual_fraud_status', "0").iterate()

In [None]:
%%gremlin
g.V().hasLabel('Transaction').has('fraud_status', "1").limit(100).property('actual_fraud_status', "1").iterate()

In [None]:
%%gremlin
g.V().hasLabel('Transaction').has('actual_fraud_status').properties('fraud_status').drop()

In [None]:
%%gremlin
g.V().hasLabel('Transaction').hasNot('fraud_status').count()

In [None]:
s3_bucket_uri="s3://neptune-data-fraud-detection"
# remove trailing slashes
s3_bucket_uri = s3_bucket_uri[:-1] if s3_bucket_uri.endswith('/') else s3_bucket_uri

In [None]:
import neptune_ml_utils as neptune_ml
neptune_ml.check_ml_enabled()

In [None]:
export_params={ 
"command": "export-pg", 
"params": { "endpoint": neptune_ml.get_host(),
            "profile": "neptune_ml",
            "useIamAuth": neptune_ml.get_iam(),
            "cloneCluster": True,
            "cloneClusterInstanceType": "r5.12xlarge",
            "nodeLabels": ["Card","Transaction", "Merchant"],
            "edgeLabels": ["PURCHASED_BY", "PURCHASED_AT"] 
            }, 
"outputS3Path": f'{s3_bucket_uri}/neptune-export',
"additionalParams": {
        "neptune_ml": {
          "version": "v2.0",
          "targets": [
            {
              "node": "Transaction",
              "property": "fraud_status",
              "type": "classification"
            }
          ]
        }
      },
"jobSize": "medium"}

In [None]:

%%neptune_ml export start --export-url {neptune_ml.get_export_service_host()} --export-iam --wait --store-to export_results
${export_params}

In [None]:

training_job_name = neptune_ml.get_training_job_name('node-classification')+'-3'
outputS3Uri = export_results['outputS3Uri']

In [None]:
processing_params = f"""
--config-file-name training-data-configuration.json
--job-id {training_job_name} 
--s3-input-uri {outputS3Uri} 
--s3-processed-uri {str(s3_bucket_uri)}/preloading """

In [None]:

%neptune_ml dataprocessing start --wait --store-to processing_results {processing_params}

## 5. Model Training and Deployment

In this section, we'll train our Graph Neural Network model for fraud detection and deploy it for inference. The process includes:

1. Initiating the Neptune ML training job
2. Monitoring the training process and evaluating results
3. Deploying the trained model as an endpoint
4. Testing the deployed model with sample queries

This step transforms our knowledge graph into an actionable fraud detection system, capable of identifying potentially fraudulent transactions based on complex patterns and relationships in the data.

In [None]:
training_params=f"""
--job-id {training_job_name} 
--data-processing-id {training_job_name}
--instance-type ml.m5.large
--s3-output-uri {str(s3_bucket_uri)}/training 
--max-hpo-number 2
--max-hpo-parallel 2 """

In [None]:

%neptune_ml training start --wait --store-to training_results {training_params}

In [None]:
endpoint_params=f"""
--id {training_job_name}
--model-training-job-id {training_job_name}"""

In [None]:
%neptune_ml endpoint create --wait --store-to endpoint_results {endpoint_params}

In [None]:
endpoint=endpoint_results['endpoint']['name']

## 6. Fraud Detection Inference

Now that our model is trained and deployed, we can use it to detect potentially fraudulent transactions. In this section, we'll:

1. Prepare sample transaction data for inference
2. Send queries to our deployed model endpoint
3. Interpret the model's predictions
4. Discuss how to integrate this system into a real-time fraud detection pipeline

We'll also explore how to analyze the model's performance and iterate on our approach to improve fraud detection accuracy.

In [None]:
%%gremlin --store-to transductive_id

g.V().hasLabel('Transaction').hasNot('fraud_status').limit(1).id()

In [None]:
%%gremlin 

g.V(${transductive_id}).properties("fraud_status")

In [None]:
%%gremlin

g.with("Neptune#ml.endpoint", "${endpoint}")
    .V(${transductive_id}).properties('fraud_status', 'Neptune#ml.score')
    .with("Neptune#ml.classification")
    .value()

In [None]:
%%gremlin

g.with("Neptune#ml.endpoint", "${endpoint}")
    .V(${transductive_id}).project('actual_fraud', 'predictedFraud')
    .by(values('actual_fraud_status').fold())
    .by(properties('fraud_status')
    .with("Neptune#ml.classification").value().fold())

In [None]:
%%gremlin --store-to predictions

g.with("Neptune#ml.endpoint", "${endpoint}")
    .V().hasLabel('Transaction').hasNot('fraud_status').project('actual_fraud', 'predictedFraud')
    .by(values('actual_fraud_status').fold())
    .by(properties('fraud_status').with("Neptune#ml.classification").value().fold())

In [None]:
# neptune_ml.delete_endpoint(training_job_name)

## Conclusion and Next Steps

In this notebook, we've walked through the entire process of building a fraud detection system using knowledge graphs and graph neural networks. We've covered:

1. Data preparation and feature engineering
2. Knowledge graph generation using Amazon Bedrock
3. Loading data into Amazon Neptune
4. Setting up and using Neptune ML for fraud detection
5. Model training, deployment, and inference

To further improve this system, consider:

- Incorporating additional data sources to enrich the knowledge graph
- Experimenting with different GNN architectures and hyperparameters
- Implementing a real-time data ingestion and fraud detection pipeline
- Developing a user interface for fraud analysts to interact with the system

Remember to monitor your AWS resource usage and clean up any unnecessary resources to avoid unexpected costs.