# Amazon SageMaker Anomaly Detection using Amazon Security Lake data

## 1. Introduction

In this sample notebook, you will train, build, and deploy a model using the IP Insights algorithm and Amazon Security Lake Data. You will query the Amazon Security Lake managed AWS LakeFormation table. You will perform data transformation on the results from the VPC Flow Log and Route 53 table. Train an IP Insights model with this data. Deploy your model to a SageMaker endpoint and ultimately test your model.

In [1]:
# 1. install 
%conda install openjdk -y
%pip install pyspark 
%pip install sagemaker_pyspark
%pip install awswrangler

Channels:
 - defaults
 - conda-forge
Platform: linux-64
Collecting package metadata (repodata.json): done
Solving environment: done


    current version: 23.11.0
    latest version: 24.3.0

Please update conda by running

    $ conda update -n base -c conda-forge conda



## Package Plan ##

  environment location: /opt/conda

  added / updated specs:
    - openjdk


The following packages will be downloaded:

    package                    |            build
    ---------------------------|-----------------
    ca-certificates-2024.3.11  |       h06a4308_0         127 KB
    certifi-2024.2.2           |  py310h06a4308_0         159 KB
    conda-24.3.0               |  py310h06a4308_0         957 KB
    openjdk-11.0.13            |       h87a67e3_0       341.0 MB
    ------------------------------------------------------------
                                           Total:       342.2 MB

The following NEW packages will be INSTALLED:

  openjdk            pkgs/main/linux-64::openjd

## 2. Setup your environment

In [1]:
# 2. setup, config .. imports

import boto3
import botocore
import os
import sagemaker
import pandas as pd
import awswrangler as wr

from datetime import datetime
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

dt_today = datetime.now()
str_today = dt_today.strftime("%m_%d_%Y_%H_%M_%S")

bucket = sagemaker.Session().default_bucket()
prefix = "sagemaker/ipinsights-vpcflowlogs"
execution_role = sagemaker.get_execution_role()
region = boto3.Session().region_name
seclakeregion = region.replace("-","_")

# check if the bucket exists
try:
    boto3.Session().client("s3").head_bucket(Bucket=bucket)
except botocore.exceptions.ParamValidationError as e:
    print(
        "You either forgot to specify your S3 bucket or you gave your bucket an invalid name!"
    )
except botocore.exceptions.ClientError as e:
    if e.response["Error"]["Code"] == "403":
        print(f"Hey! You don't have permission to access the bucket, {bucket}.")
    elif e.response["Error"]["Code"] == "404":
        print(f"Hey! Your bucket, {bucket}, doesn't exist!")
    else:
        raise
else:
    print(f"Training input/output will be stored in: s3://{bucket}/{prefix}")
print(f"Session timestamp: {str_today}")

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml
Training input/output will be stored in: s3://sagemaker-us-east-1-216332718170/sagemaker/ipinsights-vpcflowlogs
Session timestamp: 03_29_2024_15_57_29


## 3. Query and transform VPC Flow Log data

In [71]:
# 3. query VPC flow logs from VPC flow log athena integration table
#ocsf_df = wr.athena.read_sql_query("SELECT src_endpoint.instance_uid as instance_id, src_endpoint.ip as sourceip FROM amazon_security_lake_table_"+seclakeregion+"_vpc_flow_1_0 WHERE src_endpoint.ip IS NOT NULL AND src_endpoint.instance_uid IS NOT NULL AND src_endpoint.instance_uid != '-' AND src_endpoint.ip != '-'", database="amazon_security_lake_glue_db_us_east_1", ctas_approach=False, unload_approach=True, s3_output=f"s3://{bucket}/unload/parquet/updated/{str_today}")
wr.s3.delete_objects(f"s3://{bucket}/unload/parquet/updated/{str_today}")
ocsf_df = wr.athena.read_sql_query("""
SELECT interface_id, srcaddr FROM "vpcflowlogsathenadatabasefl06744210aaf126b7a"."fl06744210aaf126b7adaily2024032920240329" where starts_with(srcaddr, '10.') and starts_with(dstaddr, '10.') ;
""", database="vpcflowlogsathenadatabasefl06744210aaf126b7a", ctas_approach=False, unload_approach=False, s3_output=f"s3://{bucket}/unload/parquet/updated/{str_today}")
ocsf_df.head()

Unnamed: 0,interface_id,srcaddr
0,eni-045c7ef98233c196e,10.0.5.58
1,eni-0c87cc6e8dc7762e5,10.0.4.38
2,eni-0c87cc6e8dc7762e5,10.0.5.14
3,eni-0c87cc6e8dc7762e5,10.0.2.28
4,eni-0c87cc6e8dc7762e5,10.0.5.94


## 6. Download image and train IP Insight model

In [72]:
# 6 setup training data channel and IPInsights algorithm Docker image
training_path = f"s3://{bucket}/{prefix}/training/training_input.csv"

wr.s3.to_csv(ocsf_df, training_path, header=False, index=False)

{'paths': ['s3://sagemaker-us-east-1-216332718170/sagemaker/ipinsights-vpcflowlogs/training/training_input.csv'],
 'partitions_values': {}}

In [73]:
from sagemaker.amazon.amazon_estimator import image_uris

image = sagemaker.image_uris.get_training_image_uri(boto3.Session().region_name,"ipinsights")

INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: 1.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.


In [80]:
# change instance type depending on size of input training
ip_insights = sagemaker.estimator.Estimator(
    image,
    execution_role,
    instance_count=1,
    instance_type="ml.m5.2xlarge",
    output_path=f"s3://{bucket}/{prefix}/output",
    sagemaker_session=sagemaker.Session(),
)

# change hyperparameters depending on size of input training and desired training constraints
ip_insights.set_hyperparameters(
    num_entity_vectors="20000",
    random_negative_sampling_rate="5",
    vector_dim="128",
    mini_batch_size="1000",
    epochs="5",
    learning_rate="0.01",
)

In [81]:
input_data = {
    "train": sagemaker.session.s3_input(training_path, content_type="text/csv")
}

See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


In [82]:
# train and fit IPInsights model based on training data

ip_insights.fit(input_data)

INFO:sagemaker:Creating training-job with name: ipinsights-2024-03-31-03-32-54-013


2024-03-31 03:32:54 Starting - Starting the training job...
2024-03-31 03:33:09 Starting - Preparing the instances for training...
2024-03-31 03:33:43 Downloading - Downloading input data...
2024-03-31 03:34:03 Downloading - Downloading the training image...............
2024-03-31 03:36:54 Training - Training image download completed. Training in progress.....[34mDocker entrypoint called with argument(s): train[0m
[34mRunning default environment configuration script[0m
  if num_device is 1 and 'dist' not in kvstore:[0m
  if cons['type'] is 'ineq':[0m
  if len(self.X_min) is not 0:[0m
[34m[03/31/2024 03:37:23 INFO 140269191436096] Reading default configuration from /opt/amazon/lib/python3.8/site-packages/algorithm/resources/default-input.json: {'batch_metrics_publish_interval': '1000', 'epochs': '10', 'learning_rate': '0.001', 'mini_batch_size': '5000', 'num_entity_vectors': '100000', 'num_ip_encoder_layers': '1', 'random_negative_sampling_rate': '1', 'shuffled_negative_sampling

## 7. Deploy Sagemaker Endpoint

In [83]:
# deploy trained IPInsights model to SageMaker endpoint.  Again, change instance_type and autoscaling based on your scenario
predictor = ip_insights.deploy(initial_instance_count=1, instance_type="ml.m5.large")
print(f"Endpoint name: {predictor.endpoint}")

INFO:sagemaker:Creating model with name: ipinsights-2024-03-31-03-52-50-569
INFO:sagemaker:Creating endpoint-config with name ipinsights-2024-03-31-03-52-50-569
INFO:sagemaker:Creating endpoint with name ipinsights-2024-03-31-03-52-50-569


-----------!

See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


Endpoint name: ipinsights-2024-03-31-03-52-50-569


## 8. Submit network data for inference to the endpoint

This portion of code assumes you have test data saved in a local folder or a S3 bucket. 

The test data is simply a CSV file, where the first columns are instance ids and the second columns are IPs. 

It is recommended to test valid and invalid data to see the results of the model

In [129]:
# read file
# file @ S3 approach
inference_df = wr.s3.read_csv(training_path, header=None).sample(5)

# file @ local approach
anomaly_df = wr.pandas.read_csv('../data/testdata-ipinsights.csv',header=None)
inference_df = pd.concat([inference_df, anomaly_df])

In [130]:
# prepare bulk request from data frame
import io
from io import StringIO

csv_file = io.StringIO()
inference_csv = inference_df.to_csv(csv_file, sep=",", header=False, index=False)
inference_request_payload = csv_file.getvalue()
print(inference_request_payload)

eni-08319ebb226002bc3,10.0.2.89
eni-04385e65ff724f39e,10.0.4.191
eni-095bb4db87156aa49,10.0.3.215
eni-08319ebb226002bc3,10.0.2.188
eni-0c87cc6e8dc7762e5,10.0.0.11
alialem,10.0.4.162
mydevice,10.0.4.178
eni-07d77ca24f4e21cde,50.23.54.127



In [131]:
# invoke deployed SageMaker model using inference request payload
inference_response = predictor.predict(
    inference_request_payload,
    initial_args={"ContentType":'text/csv'})

# log response
print(inference_response)

b'{"predictions": [{"dot_product": 1.4939231872558594}, {"dot_product": -0.4500129520893097}, {"dot_product": 1.333173155784607}, {"dot_product": 1.112644910812378}, {"dot_product": 1.6778275966644287}, {"dot_product": -0.010866310447454453}, {"dot_product": -0.007302725221961737}, {"dot_product": -19.2442684173584}]}'


## 9.Cleanup

In [51]:
# delete endpoint if necessary to minimize costs
predictor.delete_endpoint()

INFO:sagemaker:Deleting endpoint configuration with name: ipinsights-2024-03-29-17-43-25-784
INFO:sagemaker:Deleting endpoint with name: ipinsights-2024-03-29-17-43-25-784
