# Amazon SageMaker を使用した $K$-means クラスタリング

- 次の AWS ブログの内容を元にしたノートブックです [[blog](https://aws.amazon.com/jp/blogs/news/k-means-clustering-with-amazon-sagemaker/)]
- SageMaker ビルトインである $k$-means クラスタリングを実演
- 実装のベースは [[Scully'10](https://www.eecs.tufts.edu/~dsculley/papers/fastkmeans.pdf), [Mayerson'01](http://web.cs.ucla.edu/~awm/papers/ofl.pdf), [Guha et al.'03](https://papers.nips.cc/paper/4362-fast-and-accurate-k-means-for-large-datasets.pdf)]

In [None]:
import boto3
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import display
import io
import time
import copy
import json
import sys
import sagemaker.amazon.common as smac
import os
import mxnet as mx
from scipy.spatial.distance import cdist
import numpy as np
from numpy import array
import urllib.request
import gzip
import pickle
import sklearn.cluster
import sklearn
import re
import sagemaker

In [None]:
# S3 バケットとプレフィックス
sess = sagemaker.Session()
bucket = sess.default_bucket()
prefix = 'sagemaker/DEMO-kmeans'

role = sagemaker.get_execution_role()

def get_gdelt(filename):
    s3 = boto3.resource('s3')
    s3.Bucket('gdelt-open-data').download_file('events/' + filename, '.gdelt.csv')
    df = pd.read_csv('.gdelt.csv', sep='\t')
    header = pd.read_csv('https://www.gdeltproject.org/data/lookups/CSV.header.historical.txt', sep='\t')
    df.columns = header.columns
    return df

data = get_gdelt('1979.csv')
data

In [None]:
data = data[['EventCode', 'NumArticles', 'AvgTone', 'Actor1Geo_Lat', 'Actor1Geo_Long', 'Actor2Geo_Lat', 'Actor2Geo_Long']]
data['EventCode'] = data['EventCode'].astype(object)

events = pd.crosstab(index=data['EventCode'], columns='count').sort_values(by='count', ascending=False).index[:20]

#トレーニングデータを Sagemaker K-means に必要な protobuf 形式に変換するルーチン
def write_to_s3(bucket, prefix, channel, file_prefix, X):
    buf = io.BytesIO()
    smac.write_numpy_to_dense_tensor(buf, X.astype('float32'))
    buf.seek(0)
    boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, channel, file_prefix + '.data')).upload_fileobj(buf)

#上記のアクター場所とイベントに基づいて、データをフィルタリング
def transform_gdelt(df, events=None):
    df = df[['AvgTone', 'EventCode', 'NumArticles', 'Actor1Geo_Lat', 'Actor1Geo_Long', 'Actor2Geo_Lat', 'Actor2Geo_Long']]
    df['EventCode'] = df['EventCode'].astype(object)
    if events is not None:
        df = df[np.in1d(df['EventCode'], events)]
    return pd.get_dummies(df[((df['Actor1Geo_Lat'] == 0) & (df['Actor1Geo_Long'] == 0) != True) &
                                   ((df['Actor2Geo_Lat'] == 0) & (df['Actor2Geo_Long'] == 0) != True)])

#トレーニングを準備し、S3 に保存
def prepare_gdelt(bucket, prefix, file_prefix, events=None, random_state=1729, save_to_s3=True):
    df = get_gdelt(file_prefix + '.csv')
    model_data = transform_gdelt(df, events)
    train_data = model_data.sample(frac=1, random_state=random_state).as_matrix()
    if save_to_s3:
        write_to_s3(bucket, prefix, 'train', file_prefix, train_data)
    return train_data

# 1979 年用のデータセットを使用。
train_79 = prepare_gdelt(bucket, prefix, '1979', events, save_to_s3=False)

In [None]:
# 1979 年のデータセットから最初の 10000 データポイントを可視化するために TSNE を使用
from sklearn import manifold
tsne = manifold.TSNE(n_components=2, init='pca', random_state=1200)
X_tsne = tsne.fit_transform(train_79[:10000])

plt.figure(figsize=(6, 5))
X_tsne_1000 = X_tsne[:1000]
plt.scatter(X_tsne_1000[:, 0], X_tsne_1000[:, 1])
plt.show()

In [None]:
BEGIN_YEAR = 1979
END_YEAR = 1980

for year in range(BEGIN_YEAR, END_YEAR):
    train_data = prepare_gdelt(bucket, prefix, str(year), events)

# SageMaker k-means ECR image ARN 
images = {'us-west-2': '174872318107.dkr.ecr.us-west-2.amazonaws.com/kmeans:latest',
          'us-east-1': '382416733822.dkr.ecr.us-east-1.amazonaws.com/kmeans:latest',
          'us-east-2': '404615174143.dkr.ecr.us-east-2.amazonaws.com/kmeans:latest',
          'eu-west-1': '438346466558.dkr.ecr.eu-west-1.amazonaws.com/kmeans:latest'}
image = images[boto3.Session().region_name]

In [None]:
from time import gmtime, strftime
output_time = strftime("%Y-%m-%d-%H-%M-%S", gmtime())
output_folder = 'kmeans-lowlevel-' + output_time
K = range(2, 12) # k が使用する範囲を変更
INSTANCE_COUNT = 1
run_parallel_jobs = True #一度に 1 つのジョブを実行するには、これを false にします。
#特に多数の EC2 インスタンスを 1 度に作成し、上限に達するのを避けたい場合
job_names = []

sagemaker_client = boto3.client('sagemaker')

# すべての k でジョブを起動する
for k in K:
    print('starting train job:'+ str(k))
    output_location = 's3://{}/kmeans_example/output/'.format(bucket) + output_folder
    print('training artifacts will be uploaded to: {}'.format(output_location))
    job_name = output_folder + str(k)

    create_training_params = \
    {
        "AlgorithmSpecification": {
            "TrainingImage": image,
            "TrainingInputMode": "File"
        },
        "RoleArn": role,
        "OutputDataConfig": {
            "S3OutputPath": output_location
        },
        "ResourceConfig": {
            "InstanceCount": INSTANCE_COUNT,
            "InstanceType": "ml.c5.18xlarge",
            "VolumeSizeInGB": 50
        },
        "TrainingJobName": job_name,
        "HyperParameters": {
            "k": str(k),
            "feature_dim": "26",
            "mini_batch_size": "1000"
        },
        "StoppingCondition": {
            "MaxRuntimeInSeconds": 60 * 60
        },
            "InputDataConfig": [
            {
                "ChannelName": "train",
                "DataSource": {
                    "S3DataSource": {
                        "S3DataType": "S3Prefix",
                        "S3Uri": "s3://{}/{}/train/".format(bucket, prefix),
                        "S3DataDistributionType": "FullyReplicated"
                    }
                },

                "CompressionType": "None",
                "RecordWrapperType": "None"
            }
        ]
    }
    
    sagemaker_client.create_training_job(**create_training_params)

    status = sagemaker_client.describe_training_job(TrainingJobName=job_name)['TrainingJobStatus']
    print(status)
    if not run_parallel_jobs:
        try:
            sagemaker_client.get_waiter('training_job_completed_or_stopped').wait(TrainingJobName=job_name)
        finally:
            status = sagemaker_client.describe_training_job(TrainingJobName=job_name)['TrainingJobStatus']
            print("Training job ended with status: " + status)
            if status == 'Failed':
                message = sagemaker_client.describe_training_job(TrainingJobName=job_name)['FailureReason']
                print('Training failed with the following error: {}'.format(message))
                raise Exception('Training job failed')
    
    job_names.append(job_name)

In [None]:
while len(job_names):
    try:
        sagemaker_client.get_waiter('training_job_completed_or_stopped').wait(TrainingJobName=job_names[0])
    finally:
        status = sagemaker_client.describe_training_job(TrainingJobName=job_name)['TrainingJobStatus']
        print("Training job ended with status: " + status)
        if status == 'Failed':
            message = sagemaker_client.describe_training_job(TrainingJobName=job_name)['FailureReason']
            print('Training failed with the following error: {}'.format(message))
            raise Exception('Training job failed')

    print(job_name)

    info = sagemaker_client.describe_training_job(TrainingJobName=job_name)
    job_names.pop(0)

In [None]:
colors = ['b', 'g', 'r']
markers = ['o', 'v', 's']
models = {}
distortions = []
for k in K:
    s3_client = boto3.client('s3')
    key = 'kmeans_example/output/' + output_folder +'/' + output_folder + str(k) + '/output/model.tar.gz'
    s3_client.download_file(bucket, key, 'model.tar.gz')
    print("Model for k={} ({})".format(k, key))
    !tar -xvf model.tar.gz                       
    kmeans_model=mx.ndarray.load('model_algo-1')
    kmeans_numpy = kmeans_model[0].asnumpy()
    distortions.append(sum(np.min(cdist(train_data, kmeans_numpy, 'euclidean'), axis=1)) / train_data.shape[0])
    models[k] = kmeans_numpy

In [None]:
# エルボーをプロット
plt.plot(K, distortions, 'bx-')
plt.xlabel('k')
plt.ylabel('distortion')
plt.title('Elbow graph')
plt.show()

## Set up hosting for the model
以下では [[notebook](https://github.com/hariby/amazon-sagemaker-examples/blob/master/sagemaker-python-sdk/1P_kmeans_lowlevel/kmeans_mnist_lowlevel.ipynb)] を参考にして、推論用のエンドポイントを構築します。

In order to set up hosting, we have to import the model from training to hosting. A common question would be, why wouldn't we automatically go from training to hosting?  And, in fact, the [k-means high-level example](/notebooks/sagemaker-python-sdk/1P_kmeans_highlevel/kmeans_mnist.ipynb) shows the functionality to do that.  For this low-level example though it makes sense to show each step in the process to provide a better understanding of the flexibility available.

### Import model into hosting
Next, you register the model with hosting. This allows you the flexibility of importing models trained elsewhere, as well as the choice of not importing models if the target of model creation is AWS Lambda, AWS Greengrass, Amazon Redshift, Amazon Athena, or other deployment target.

In [None]:
primary_container = {
    'Image': image,
    'ModelDataUrl': info['ModelArtifacts']['S3ModelArtifacts']
}

create_model_response = sagemaker_client.create_model(
    ModelName = job_name, 
    ExecutionRoleArn = role, 
    PrimaryContainer = primary_container
)

print(create_model_response['ModelArn'])

### Create endpoint configuration
Now, we'll create an endpoint configuration which provides the instance type and count for model deployment.

In [None]:
endpoint_config_name = 'KMeansEndpointConfig-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print(endpoint_config_name)
create_endpoint_config_response = sagemaker_client.create_endpoint_config(
    EndpointConfigName = endpoint_config_name,
    ProductionVariants=[{
        'InstanceType':'ml.m4.xlarge',
        'InitialInstanceCount':1,
        'ModelName':job_name,
        'VariantName':'AllTraffic'}])

print("Endpoint Config Arn: " + create_endpoint_config_response['EndpointConfigArn'])

### Create endpoint
Lastly, the customer creates the endpoint that serves up the model, through specifying the name and configuration defined above. The end result is an endpoint that can be validated and incorporated into production applications. This takes 9-11 minutes to complete.

In [None]:
%%time

endpoint_name = 'KMeansEndpoint-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print(endpoint_name)
create_endpoint_response = sagemaker_client.create_endpoint(
    EndpointName=endpoint_name,
    EndpointConfigName=endpoint_config_name)
print(create_endpoint_response['EndpointArn'])

resp = sagemaker_client.describe_endpoint(EndpointName=endpoint_name)
status = resp['EndpointStatus']
print("Status: " + status)

try:
    sagemaker_client.get_waiter('endpoint_in_service').wait(EndpointName=endpoint_name)
finally:
    resp = sagemaker_client.describe_endpoint(EndpointName=endpoint_name)
    status = resp['EndpointStatus']
    print("Arn: " + resp['EndpointArn'])
    print("Create endpoint ended with status: " + status)

    if status != 'InService':
        message = sagemaker_client.describe_endpoint(EndpointName=endpoint_name)['FailureReason']
        print('Training failed with the following error: {}'.format(message))
        raise Exception('Endpoint creation did not succeed')


## Validate the model for use
Finally, we'll validate the model for use. Let's generate a classification for a single observation from the trained model using the endpoint we just created.

In [None]:
# Simple function to create a csv from our numpy array
def np2csv(arr):
    csv = io.BytesIO()
    np.savetxt(csv, arr, delimiter=',', fmt='%g')
    return csv.getvalue().decode().rstrip()

In [None]:
runtime = boto3.Session().client('runtime.sagemaker')

In [None]:
payload = np2csv(train_79[0:10000])

response = runtime.invoke_endpoint(EndpointName=endpoint_name, 
                                   ContentType='text/csv', 
                                   Body=payload)
result = json.loads(response['Body'].read().decode())

cluster_ids = np.array([int(result['predictions'][i]['closest_cluster']) for i in range(len(result['predictions']))])

In [None]:
plt.figure(figsize=(6, 5))
X_tsne_1000 = X_tsne[:1000]
plt.scatter(X_tsne_1000[:, 0], X_tsne_1000[:, 1], c=cluster_ids[:1000])
plt.show()