# Amazon SageMaker Anomaly Detection using Amazon Security Lake data

## 1. Introduction

In this sample notebook, you will train, build, and deploy a model using the IP Insights algorithm and Amazon Security Lake Data. You will query the Amazon Security Lake managed AWS LakeFormation table. You will perform data transformation on the results from the VPC Flow Log and Route 53 table. Train an IP Insights model with this data. Deploy your model to a SageMaker endpoint and ultimately test your model.

In [None]:
# 1. install 
%conda install openjdk -y
%pip install pyspark 
%pip install sagemaker_pyspark
%pip install awswrangler

## 2. Setup your environment

In [23]:
# 2. setup, config .. imports

import boto3
import botocore
import os
import sagemaker
import pandas as pd
import awswrangler as wr

from datetime import datetime
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

dt_today = datetime.now()
str_today = dt_today.strftime("%m_%d_%Y_%H_%M_%S")

bucket = sagemaker.Session().default_bucket()
prefix = "sagemaker/ipinsights-vpcflowlogs"
execution_role = sagemaker.get_execution_role()
region = boto3.Session().region_name
seclakeregion = region.replace("-","_")

# check if the bucket exists
try:
    boto3.Session().client("s3").head_bucket(Bucket=bucket)
except botocore.exceptions.ParamValidationError as e:
    print(
        "You either forgot to specify your S3 bucket or you gave your bucket an invalid name!"
    )
except botocore.exceptions.ClientError as e:
    if e.response["Error"]["Code"] == "403":
        print(f"Hey! You don't have permission to access the bucket, {bucket}.")
    elif e.response["Error"]["Code"] == "404":
        print(f"Hey! Your bucket, {bucket}, doesn't exist!")
    else:
        raise
else:
    print(f"Training input/output will be stored in: s3://{bucket}/{prefix}")
print(f"Session timestamp: {str_today}")

INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole
INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole
INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole


Training input/output will be stored in: s3://sagemaker-us-east-1-028667762794/sagemaker/ipinsights-vpcflowlogs
Session timestamp: 09_22_2023_14_07_21


## 3. Query and transform VPC Flow Log data

In [24]:
# 3. query VPC flow logs from security lake glue table
ocsf_df = wr.athena.read_sql_query("SELECT src_endpoint.instance_uid as instance_id, src_endpoint.ip as sourceip FROM amazon_security_lake_table_"+seclakeregion+"_vpc_flow_1_0 WHERE src_endpoint.ip IS NOT NULL AND src_endpoint.instance_uid IS NOT NULL AND src_endpoint.instance_uid != '-' AND src_endpoint.ip != '-'", database="amazon_security_lake_glue_db_us_east_1", ctas_approach=False, unload_approach=True, s3_output=f"s3://{bucket}/unload/parquet/updated/{str_today}")
ocsf_df.head()

INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole
INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole
INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole
INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole
INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole
INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole
INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole
INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole
INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole
INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole
INFO:botocore.credentials:Found credentials from I

Unnamed: 0,instance_id,sourceip
0,i-0caa75b1c88bf0f16,10.0.103.247
1,i-0caa75b1c88bf0f16,10.0.103.247
2,i-0caa75b1c88bf0f16,10.0.103.247
3,i-0caa75b1c88bf0f16,10.0.103.39
4,i-0caa75b1c88bf0f16,10.0.103.247


## 4. Query and transform Route53 Log data

In [25]:
# 4. query Route53 logs from security lake glue table
ocsf_rt_53_df = wr.athena.read_sql_query("SELECT src_endpoint.instance_uid as instance_id, src_endpoint.ip as sourceip FROM amazon_security_lake_table_"+seclakeregion+"_route53_1_0 WHERE src_endpoint.ip IS NOT NULL AND src_endpoint.instance_uid IS NOT NULL AND src_endpoint.instance_uid != '-' AND src_endpoint.ip != '-'", database="amazon_security_lake_glue_db_us_east_1", ctas_approach=False, unload_approach=True, s3_output=f"s3://{bucket}/unload/rt53parquet/{str_today}")
ocsf_rt_53_df.head()


INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole
INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole
INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole
INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole
INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole
INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole
INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole
INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole
INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole


Unnamed: 0,instance_id,sourceip
0,i-0b4ef7e8aac82d605,10.0.1.130
1,i-0b4ef7e8aac82d605,10.0.1.130
2,i-0b4ef7e8aac82d605,10.0.1.130
3,i-0caa75b1c88bf0f16,10.0.103.247
4,i-0caa75b1c88bf0f16,10.0.103.247


## 5: Combine VPC Flow Log and Route 53 data

In [12]:
ocsf_complete = pd.concat([ocsf_df, ocsf_rt_53_df], ignore_index=True)

## 6. Download image and train IP Insight model

In [13]:
# 6 setup training data channel and IPInsights algorithm Docker image
training_path = f"s3://{bucket}/{prefix}/training/training_input.csv"
wr.s3.to_csv(ocsf_complete, training_path, header=False, index=False)

{'paths': ['s3://sagemaker-us-east-1-028667762794/sagemaker/ipinsights-vpcflowlogs/training/training_input.csv'],
 'partitions_values': {}}

In [14]:
from sagemaker.amazon.amazon_estimator import image_uris

image = sagemaker.image_uris.get_training_image_uri(boto3.Session().region_name,"ipinsights")

In [15]:
# change instance type depending on size of input training
ip_insights = sagemaker.estimator.Estimator(
    image,
    execution_role,
    instance_count=1,
    instance_type="ml.m5.large",
    output_path=f"s3://{bucket}/{prefix}/output",
    sagemaker_session=sagemaker.Session(),
)

# change hyperparameters depending on size of input training and desired training constraints
ip_insights.set_hyperparameters(
    num_entity_vectors="20000",
    random_negative_sampling_rate="5",
    vector_dim="128",
    mini_batch_size="1000",
    epochs="5",
    learning_rate="0.01",
)

In [16]:
input_data = {
    "train": sagemaker.session.s3_input(training_path, content_type="text/csv")
}

The class sagemaker.session.s3_input has been renamed in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


In [18]:
# train and fit IPInsights model based on training data

ip_insights.fit(input_data)

INFO:sagemaker:Creating training-job with name: ipinsights-2023-09-21-16-21-20-896


2023-09-21 16:21:21 Starting - Starting the training job...
2023-09-21 16:21:35 Starting - Preparing the instances for training......
2023-09-21 16:22:47 Downloading - Downloading input data...
2023-09-21 16:23:22 Training - Downloading the training image........................
2023-09-21 16:26:58 Training - Training image download completed. Training in progress.........................................................................
2023-09-21 16:39:12 Uploading - Uploading generated training model
2023-09-21 16:39:12 Completed - Training job completed
..Training seconds: 986
Billable seconds: 986


## 7. Deploy Sagemaker Endpoint

In [19]:
# deploy trained IPInsights model to SageMaker endpoint.  Again, change instance_type and autoscaling based on your scenario
predictor = ip_insights.deploy(initial_instance_count=1, instance_type="ml.m5.large")
print(f"Endpoint name: {predictor.endpoint}")

INFO:sagemaker:Creating model with name: ipinsights-2023-09-21-16-40-26-825
INFO:sagemaker:Creating endpoint-config with name ipinsights-2023-09-21-16-40-26-825
INFO:sagemaker:Creating endpoint with name ipinsights-2023-09-21-16-40-26-825


-----------!

See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


Endpoint name: ipinsights-2023-09-21-16-40-26-825


## 8. Submit network data for inference to the endpoint

This portion of code assumes you have test data saved in a local folder or a S3 bucket. 

The test data is simply a CSV file, where the first columns are instance ids and the second columns are IPs. 

It is recommended to test valid and invalid data to see the results of the model

In [28]:
# read file
# file @ S3 approach
# inference_df = wr.s3.read_csv(f's3://{bucket}/{prefix}/inference/testdata.csv')

# file @ local approach
inference_df = wr.pandas.read_csv('../data/testdata-ipinsights.csv',header=None)
inference_df

Unnamed: 0,0,1
0,i-0dee580a031e28c14,10.0.2.125
1,i-05891769c3b7b2879,10.0.3.238
2,i-0dee580a031e28c14,10.0.2.145
3,i-05891769c3b7b2879,10.0.10.11


In [30]:
# prepare bulk request from data frame
import io
from io import StringIO

csv_file = io.StringIO()
inference_csv = inference_df.to_csv(csv_file, sep=",", header=False, index=False)
inference_request_payload = csv_file.getvalue()
print(inference_request_payload)

i-0dee580a031e28c14,10.0.2.125
i-05891769c3b7b2879,10.0.3.238
i-0dee580a031e28c14,10.0.2.145
i-05891769c3b7b2879,10.0.10.11



In [31]:
# invoke deployed SageMaker model using inference request payload
inference_response = predictor.predict(
    inference_request_payload,
    initial_args={"ContentType":'text/csv'})

# log response
print(inference_response)

b'{"predictions": [{"dot_product": -0.0025198650546371937}, {"dot_product": -0.012088463641703129}, {"dot_product": -0.005051907151937485}, {"dot_product": -0.016000129282474518}]}'


## 9.Cleanup

In [None]:
# delete endpoint if necessary to minimize costs
predictor.delete_endpoint()