# Amazon SageMaker IP Insights Algorithm 介绍
#### 识别异常IP地址的无监督异常检测
-------
1. [介绍](#介绍)
2. [准备数据](#准备数据)
3. [数据处理](#数据处理)
4. [模型训练](#模型训练)
5. [模型部署和推理](#模型部署和推理)
6. [计算异常Threshold](#计算异常Threshold)
7. (Optional) [自动超参数调优](#自动超参数调优)
8. (Optional) [批量推理](#批量推理)
9. [删除模型部署](#删除模型部署)

## 介绍
-------

Amazon SageMaker IP Insights 算法使用统计建模和神经网络来捕获在线资源（例如账户 ID 或主机名）与 IPv4 地址之间的关联。 底层实现上，它学习资源和 IP 地址的向量表示。 这实质上意味着，如果表示 IP 地址和资源的向量靠近在一起，那么该 IP 地址很可能访问该资源，即使IP地址之前从未访问过该资源。

在本notebok中，我们将手动生成用户使用IP地址访问资源的数据，使用 Amazon SageMaker IP Insights 算法来训练模型。 然后我们使用这个模型对数据进行推理并展示如何发现异常。 运行此notebook后，您应该能够：

- 获取、转换和存储在 Amazon SageMaker 中使用的数据，
- 创建 AWS SageMaker 训练作业以生成 IP Insights 模型，
- 使用模型通过 Amazon SageMaker 终端节点执行推理。

如果您想了解更多，请查看文档 [SageMaker IP Inisghts Documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/ip-insights.html). 


### 设置 S3 桶
我们首先需要指定存储训练数据和模型的位置。 ***这是本notebook唯一需要您修改的单元格。*** 

- `bucket` - 可访问的S3桶
- `prefix` - 此notebook的输入和输出数据将在存储桶中存储的位置。 （可以使用默认值）
- `region` - 指定notebook所在的region

In [None]:
bucket = 'hxh-ap-northeast-1'
prefix = 'ipinsightdemo'
region = 'ap-northeast-1'

## 准备数据
-------

下载用于生成用户请求日志数据的代码

In [None]:
import boto3
from os import path

tools_bucket = f"jumpstart-cache-prod-{region}"  # Bucket containing the data generation module.
tools_prefix = "1p-algorithms-assets/ip-insights"  # Prefix for the data generation module
s3 = boto3.client("s3")
    
data_generation_file = "generate_data.py"  # Synthetic data generation module
script_parameters_file = "ip2asn-v4-u32.tsv.gz"

if not path.exists(data_generation_file):
    s3.download_file(tools_bucket, f"{tools_prefix}/{data_generation_file}", data_generation_file)

if not path.exists(script_parameters_file):
    s3.download_file(tools_bucket, f"{tools_prefix}/{script_parameters_file}", script_parameters_file)

### 生成数据

下面将生成http access log的数据，用来模拟用户使用IP地址请求资源的数据，格式如下：

```
192.168.1.100 - user1 [15/Oct/2018:18:58:32 +0000] "GET /login_success?userId=1 HTTP/1.1" 200 476 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36"
192.168.1.102 - user2 [15/Oct/2018:18:58:35 +0000] "GET /login_success?userId=2 HTTP/1.1" 200 - "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36"
...
```

从日志中提取每个“访问事件”，并将相应的用户名和 IP 地址存储在一个包含两列的无标题 CSV 文件中，作为 SageMaker IP Insights 的训练数据。

```
user1, 192.168.1.100
user2, 193.168.1.102
...
```

[generate_data.py](./generate_data.py) 这段代码，将按照一定比例模拟用户的真实访问行为，包括在家里，在办公室，以及出差时，将使用不同的ASN和对应的IP地址，访问资源的情况

In [None]:
from generate_data import generate_dataset
from datetime import date

# 指定用户数量，每个用户大约产生300条请求日志，如果10000个用户，大约为 3 million 条日志，日志大小约700MB

from datetime import date
date = str(date.today())

NUM_USERS = 5000
log_file = f"ipinsights_web_traffic-{date}.log"
generate_dataset(NUM_USERS, log_file)

# Visualize a few log lines
!head $log_file

查看数据格式和数据量

In [None]:
import pandas as pd
log = pd.read_csv(
    log_file,
    sep=" ",
    na_values="-",
    header=None,
    names=["ip_address","rcf_id","user","timestamp","time_zone","request", "status", "size", "referer", "user_agent"]
)
log.head()

In [None]:
log.describe()

上传日志数据到S3

In [None]:
import boto3
object = f'{prefix}/{date}/{log_file}'
boto3.client('s3').upload_file(log_file, bucket, object)

## 数据处理
------

过滤URI为 '/login_success'的请求日志，作为训练数据，假设这类的请求都是正常请求

In [None]:
df = log
df = df[(df["request"].str.startswith("GET /login_success")) & (df["status"] == 200)]
df.describe()

拆分训练集和测试集，并shuffle

IP Insights模型训练过程本身不需要测试集，主要用于后续计算推理结果的threshold

In [None]:
from sklearn.model_selection import train_test_split
train_df, test_df = train_test_split(df, test_size=0.3, shuffle=True)

IP Insights模型只需要user_id和ip_address两个字段

In [None]:
train_df = train_df[['user', 'ip_address']]
test_df = test_df[['user', 'ip_address']]
train_df.head()

将训练数据保存为csv格式，上传到S3

In [None]:
train_data = train_df.to_csv(index=False, header=False)

In [None]:
## upload training data to S3
trainfile = 'train.csv'
s3 = boto3.client('s3')
s3_train_data = f"s3://{bucket}/{prefix}/{trainfile}"
key = f"{prefix}/{trainfile}"

print(f"Uploading data to: {s3_train_data}")
boto3.resource("s3").Bucket(bucket).Object(key).put(Body=train_data)

## 模型训练
-----

In [None]:
# from sagemaker.amazon.amazon_estimator import get_image_uri
import sagemaker
execution_role = sagemaker.get_execution_role()

image = sagemaker.image_uris.retrieve("ipinsights", boto3.Session().region_name)

In [None]:
## 使用刚刚生成的日志文件作为Input_data
input_data = {
    "train": sagemaker.inputs.TrainingInput(
        s3_train_data, distribution="FullyReplicated", content_type="text/csv"
    )
}

#### 使用 GPU 实例 p3 进行训练
*可选择使用managed spot实例进行训练，节省成本*

In [None]:
# Test Managed Spot Training
# Set up the estimator with training job configuration
ip_insights = sagemaker.estimator.Estimator(
    image,
    execution_role,
    instance_count=1,
    instance_type="ml.p3.2xlarge",
    output_path=f"s3://{bucket}/ipinsightoutput",
    sagemaker_session=sagemaker.Session(),
#     use_spot_instances=True,
#     max_wait=3600,
#     max_run=3600
)

# Configure algorithm-specific hyperparameters
ip_insights.set_hyperparameters(
    num_entity_vectors="20000",
    random_negative_sampling_rate="5",
    vector_dim="128",
    mini_batch_size="1000",
    epochs="5",
    learning_rate="0.01",
)
ip_insights.fit(input_data)

In [None]:
ip_insights._current_job_name

## 模型部署和推理
-----

使用 SageMaker 一键部署模型

In [None]:
predictor = ip_insights.deploy(initial_instance_count=1, instance_type="ml.m5.xlarge")

查看模型endpointName，如果需要部署pipeline，实现自动化模型训练和endpoint更新，则在更新模型endpoint时，需要指定以下EndpointName

In [None]:
print(predictor.endpoint_name)

配置predictor输入输出格式

In [None]:
# from sagemaker.predictor import csv_serializer, json_deserializer

# predictor.serializer = csv_serializer
# predictor.deserializer = json_deserializer

from sagemaker import serializers, deserializers
predictor.serializer = serializers.CSVSerializer()
predictor.deserializer = deserializers.JSONDeserializer()

在测试数据中任意选择两个样本，进行推理测试，默认推理结果只包含dot_product

In [None]:
predictor.predict(
    test_df[0:2].values, initial_args={"ContentType": "text/csv", "Accept": "application/json"}
)

推理测试

`设置Accept: verbose=true，在模型推理结果中还将返回userentity和ipaddress的embedding`

In [None]:
predictor.predict(
    test_df[0:2].values, initial_args={"ContentType": "text/csv", "Accept": "application/json; verbose=true"}
)

# predictor.predict(
#     test_df[0:2].values, initial_args={"ContentType": "text/csv", "Accept": "application/json; verbose=true"}
# )

## 计算异常Threshold
-----
**生成异常数据，用于计算dot_product的threshold**

 - 生成正常请求： 从test数据集里，挑出userid包含在train数据集里的数据（因为IP Insight只能对训练集存在的user进行推理），并随机采样xx条数据，代表正常请求，打上label=0
 - 生成异常请求：在train数据集里，随机采样xx个user，并为每个user随机生成一个IP，代表异常请求的数据，打上label=1
 - 用托管的模型对上述label=0和label=1的全部数据进行推理，得到dot_product

In [None]:
import numpy as np
from generate_data_demo import draw_ip


def score_ip_insights(predictor, df):
    def get_score(result):
        """Return the negative to the dot product of the predictions from the model."""
        return [-prediction["dot_product"] for prediction in result["predictions"]]

    df = df[["user", "ip_address"]]
    result = predictor.predict(df.values)
    return get_score(result)


def create_test_case(train_df, test_df, num_samples, attack_freq):
    """Creates a test case from provided train and test data frames.

    This generates test case for accounts that are both in training and testing data sets.

    :param train_df: (panda.DataFrame with columns ['user', 'ip_address']) training DataFrame
    :param test_df: (panda.DataFrame with columns ['user', 'ip_address']) testing DataFrame
    :param num_samples: (int) number of test samples to use
    :param attack_freq: (float) the ratio of negative_samples:positive_samples to generate for test case
    :return: DataFrame with both good and bad traffic, with labels
    """
    # Get all possible accounts. The IP Insights model can only make predictions on users it has seen in training
    # Therefore, filter the test dataset for unseen accounts, as their results will not mean anything.
    valid_accounts = set(train_df["user"])
    valid_test_df = test_df[test_df["user"].isin(valid_accounts)]

    good_traffic = valid_test_df.sample(num_samples, replace=False)
    good_traffic = good_traffic[["user", "ip_address"]]
    good_traffic["label"] = 0

    # Generate malicious traffic
    num_bad_traffic = int(num_samples * attack_freq)
    bad_traffic_accounts = np.random.choice(list(valid_accounts), size=num_bad_traffic, replace=True)
    bad_traffic_ips = [draw_ip() for i in range(num_bad_traffic)]        #"""Draw a random IP address from random ASN all uniform at random."""
    bad_traffic = pd.DataFrame({"user": bad_traffic_accounts, "ip_address": bad_traffic_ips})
    bad_traffic["label"] = 1

    # All traffic labels are: 0 for good traffic; 1 for bad traffic.
    all_traffic = good_traffic.append(bad_traffic)

    return all_traffic

生成正常和异常数据

In [None]:
NUM_SAMPLES = 10000
test_case = create_test_case(train_df, test_df, num_samples=NUM_SAMPLES, attack_freq=1)
test_case[9998:10004]

用模型对正常和异常数据推理，得到dot_product，正常情况下dot_product越大，代表user和IP相关联的可能性越大

In [None]:
test_case_scores = score_ip_insights(predictor, test_case)

在score_ip_insights函数中，是将dot_product正负对调，即label=0的dot_product得出负数（实际predict结果大部分为正），label=1的dot_product得出负数（实际predict结果大部分为负），主要是为了更直观的根据threshold体现异常（大于threshold）

In [None]:
test_case['dot_product'] = np.array(test_case_scores)
test_case[9998:10004]

通过图形展示label=0和label=1数据，分别对应的dot_product数据频率分布，X轴表示dot_product，Y轴表示频率

In [None]:
import matplotlib.pyplot as plt
x = pd.DataFrame()
y = pd.DataFrame()
x = test_case[test_case['label']==0]['dot_product']
y = test_case[test_case['label']==1]['dot_product']
# a.plot.hist(bins=100)
plt.legend(["Normal", "Random IP"])
plt.xlabel("IP Insights Score")
plt.ylabel("Frequency")
plt.hist(x, label="Normal", bins=100)
plt.hist(y, label="anomaly",bins=100)
plt.legend(bbox_to_anchor=(1.0, 1), loc=1, borderaxespad=0.)

plt.figure()

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt

n, x = np.histogram(test_case_scores[:NUM_SAMPLES], bins=100, density=True)
plt.plot(x[1:], n)

n, x = np.histogram(test_case_scores[NUM_SAMPLES:], bins=100, density=True)
plt.plot(x[1:], n)

plt.legend(["Normal", "Random IP"])
plt.xlabel("IP Insights Score")
plt.ylabel("Frequency")

plt.figure()

## 选择threshold

As we see in the figure above, there is a clear separation between normal traffic and random traffic. 
We could select a threshold depending on the application.

- If we were working with low impact decisions, such as whether to ask for another factor or authentication during login, we could use a `threshold = 0.0`. This would result in **catching more true-positives, at the cost of more false-positives**. 

- If our decision system were more **sensitive to false positives**, we could choose a larger threshold, such as `threshold = 10.0`. That way if we were sending the flagged cases to manual investigation, we would have a higher confidence that the acitivty was suspicious. 

In [None]:
threshold = 0.0

# flagged_cases = test_case[np.array(test_case_scores) > threshold]
flagged_cases = test_case[test_case['dot_product'] > threshold]
normal_cases = test_case[test_case['dot_product'] < threshold]

num_flagged_cases = len(flagged_cases)                                     # 预测为异常的样本数量
num_true_positives = len(flagged_cases[flagged_cases["label"] == 1])       # 正样本（实际异常样本），被正确预测为异常的数量
num_false_positives = len(flagged_cases[flagged_cases["label"] == 0])      # 负样本（实际正常样本），被错误预测为异常的数量
num_all_positives = len(test_case.loc[test_case["label"] == 1])            # 所有实际正样本（异常样本）总数，TP + FN
num_normal_cases = len(normal_cases)                                       # 预测为正常的样本数量
num_true_negative = len(normal_cases[normal_cases["label"] == 0])          # 负样本（实际正常样本），被正确预测为正常的数量

print(f"When threshold is set to: {threshold}")
print(f"Total of {num_flagged_cases} 被预测为异常")
print(f"Total of {num_normal_cases} 被预测为正常")
print(f"Total of {num_true_positives} true positives： 被预测为异常的数据，预测是正确的 ")
print(f"Total of {num_false_positives} false positives： 被预测为异常的数据，预测是错误的，即正常的数据被预测为异常 ")
print(f"Precision: {num_true_positives / float(num_flagged_cases)}")
print(f"TPR/Recall: {num_true_positives / float(num_all_positives)}")
print(f"FPR: {num_false_positives / float(num_false_positives + num_true_negative)}")

## 自动超参数调优
-----
### Amazon SageMaker Automatic Model Tuning

IP Insights是一种非监督学习算法，没有标签，所以需要一个validation数据集在多个模型之间评估模型指标，找到最优模型

从test_df中再拆分一部分validation数据，保存为csv并上传到S3

In [None]:
from sklearn.model_selection import train_test_split
test_df, validation_df = train_test_split(test_df, test_size=0.5, shuffle=True)

In [None]:
valid_data = validation_df.to_csv(index=False, header=False, columns=["user", "ip_address"])

In [None]:
## upload training data to S3
validationfile = "valid.csv"
s3 = boto3.client('s3')
s3_valid_data = f"s3://{bucket}/{prefix}/{validationfile}"
key = f"{prefix}/{validationfile}"

boto3.resource("s3").Bucket(bucket).Object(key).put(Body=valid_data)
print(f"Validation data has been uploaded to:: {s3_valid_data}")

创建超参数调优任务的Input Channel

In [None]:
# Configure SageMaker IP Insights Hyparameter Tuning Input Channels
input_data = {"train": s3_train_data, "validation": s3_valid_data}

指定超参数调优任务的参数

`可以使用spot实例节省成本`

In [None]:
# Set up the estimator with training job configuration
ip_insights_tuning = sagemaker.estimator.Estimator(
    image,
    execution_role,
    instance_count=1,
    instance_type="ml.p3.2xlarge",
    output_path=f"s3://{bucket}/ipinsightoutput",
    sagemaker_session=sagemaker.Session(),
#     use_spot_instances=True,
#     max_wait=3600,
#     max_run=3600
)

对于SageMaker内置算法均已预先定义了超参数调优的目标metric，以及大部分超参数的最佳建议

IP Insights算法的目标是训练得到一个分类器，以区分正常和异常，所以超参数评价指标是分类任务的常见指标：最大化AUC

超参数优化搜索范围为：向量维度（64-1024）

In [None]:
from sagemaker.tuner import HyperparameterTuner, IntegerParameter

# Configure HyperparameterTuner
ip_insights_tuner = HyperparameterTuner(
    estimator=ip_insights_tuning,  # previously-configured Estimator object
    objective_metric_name="validation:discriminator_auc",
    hyperparameter_ranges={"vector_dim": IntegerParameter(64, 1024)},
    max_jobs=4,
    max_parallel_jobs=2,
)

# Start hyperparameter tuning job
ip_insights_tuner.fit(input_data, include_cls_metadata=False)

查看模型参数调优任务的结果

In [None]:
# Wait for all the jobs to finish
ip_insights_tuner.wait()

# Visualize training job results
ip_insights_tuner.analytics().dataframe()

In [None]:
# ip_insights_tuner.best_estimator()

利用最优模型部署Endpoint

In [None]:
# Deploy best model
tuned_predictor = ip_insights_tuner.deploy(
    initial_instance_count=1,
    instance_type="ml.m5.large",
    serializer=serializers.CSVSerializer(),
    deserializer=deserializers.JSONDeserializer(),
    endpoint_name="TunedEndpoint"
)

## 批量推理
-----
Let's say we want to score all of the login events at the end of the day and aggregate flagged cases for investigators to look at in the morning. If we store the daily login events in S3, we can use IP Insights with [Amazon SageMaker Batch Transform](https://docs.aws.amazon.com/sagemaker/latest/dg/how-it-works-batch.html) to run inference and store the IP Insights scores back in S3 for future analysis.

Below, we take the training job from before and evaluate it on the validation data we put in S3.

从训练好的Estimator直接进行Batch Transform

In [None]:
transformer = ip_insights.transformer(
    instance_count=1, 
    instance_type="ml.m4.xlarge"
)

transformer.transform(s3_valid_data, content_type="text/csv", split_type="Line")

In [None]:
print(f"Batch Transform output is at: {transformer.output_path}")

In [None]:
# 也可以选择超参数自动调优任务的最佳模型，直接进行Batch Transform，将推理Output和Input数据join起来

# from sagemaker.transformer import Transformer

# transformer_model = ip_insights_tuner.best_estimator().transformer(
# #     model_name='my-previously-trained-model',
#     instance_count=1, 
#     instance_type="ml.m4.xlarge",
#     accept="text/csv",
#     assemble_with="Line"
# )

# transformer_model.transform(s3_valid_data, join_source= "Input", content_type="text/csv", split_type="Line")

## 删除模型部署
----
If you are done with this model, then we should delete the endpoint before we close the notebook. Or else you will continue to pay for the endpoint while it is running. 

In [None]:
sagemaker.Session().delete_endpoint(predictor.endpoint)

In [None]:
sagemaker.Session().delete_endpoint(tuned_precitor.endpoint)