## Amazon SageMakerとAmazon Redshiftを利用し、大規模データセット対し高速・柔軟・セキュアにデータ分析を行う方法



必要な Python Package をインポートします。

In [None]:
# Import packages
import pandas as pd
import matplotlib.pyplot as plt
import psycopg2
import boto3
import json

AWS CloudFormation で設定したパラメータを取得します。

In [None]:
# Please edit stack name
stack_name = 'SageMakerRedshift2'

cfn = boto3.client('cloudformation')
response = cfn.describe_stacks(StackName=stack_name)['Stacks'][0]

for item in response['Parameters']:
    if item['ParameterKey'] == 'MasterUsername':
        db_user = item['ParameterValue']
    elif item['ParameterKey'] == 'DatabaseName':
        db_name = item['ParameterValue']
    elif item['ParameterKey'] == 'PortNumber':
        db_port = item['ParameterValue']
        
for item in response['Outputs']:
    if item['OutputKey'] == 'ClusterEndpoint':
        cluster_endpoint = item['OutputValue'].split(':')[0]
    elif item['OutputKey'] == 'ClusterName':
        cluster_name = item['OutputValue']
    elif item['OutputKey'] == 'RedshiftBucketAccessRoleArn':
        redshift_role = item['OutputValue']
        
#  show parameters
print('stack_name: {}'.format(stack_name))
print('db_user: {}'.format(db_user))
print('db_name: {}'.format(db_name))
print('db_port: {}'.format(db_port))
print('cluster_endpoint: {}'.format(cluster_endpoint))
print('cluster_name: {}'.format(cluster_name))
print('redshift_role: {}'.format(redshift_role))


Amazon Redshift へアクセスするための、[一時的データベースユーザー認証情報](https://docs.aws.amazon.com/ja_jp/redshift/latest/mgmt/generating-iam-credentials-cli-api.html)
を取得します。

In [None]:
# get temporal cluster credentials
redshift = boto3.client('redshift')
credentials = redshift.get_cluster_credentials(
    DbUser=db_user, 
    DbName=db_name, 
    ClusterIdentifier=cluster_name, 
    DurationSeconds=3600,
    AutoCreate=False
)

tmp_db_user = credentials['DbUser']
tmp_db_password = credentials['DbPassword']

Python用のPostgreSQLドライバであるpsycopg2を利用してRedshiftへアクセスします。

In [None]:
# connect to Redshift
conn = psycopg2.connect(
    host=cluster_endpoint, 
    port=db_port, 
    dbname=db_name, 
    user=tmp_db_user, 
    password=tmp_db_password
)

ここでは、公式ドキュメントで利用されているデータセットを使用します。
https://docs.aws.amazon.com/ja_jp/redshift/latest/gsg/rs-gsg-create-sample-db.html

はじめにテーブルを作成します。

In [None]:
sql_create_table = [
    """
    create table users(
        userid integer not null distkey sortkey,
        username char(8),
        firstname varchar(30),
        lastname varchar(30),
        city varchar(30),
        state char(2),
        email varchar(100),
        phone char(14),
        likesports boolean,
        liketheatre boolean,
        likeconcerts boolean,
        likejazz boolean,
        likeclassical boolean,
        likeopera boolean,
        likerock boolean,
        likevegas boolean,
        likebroadway boolean,
        likemusicals boolean);
    """, 
    """
    create table venue(
        venueid smallint not null distkey sortkey,
        venuename varchar(100),
        venuecity varchar(30),
        venuestate char(2),
        venueseats integer);
    """, 
    """
    create table category(
        catid smallint not null distkey sortkey,
        catgroup varchar(10),
        catname varchar(10),
        catdesc varchar(50));
    """, 
    """
    create table date(
        dateid smallint not null distkey sortkey,
        caldate date not null,
        day character(3) not null,
        week smallint not null,
        month character(5) not null,
        qtr character(5) not null,
        year smallint not null,
        holiday boolean default('N'));
    """, 
    """
    create table event(
        eventid integer not null distkey,
        venueid smallint not null,
        catid smallint not null,
        dateid smallint not null sortkey,
        eventname varchar(200),
        starttime timestamp);
    """, 
    """
    create table listing(
        listid integer not null distkey,
        sellerid integer not null,
        eventid integer not null,
        dateid smallint not null  sortkey,
        numtickets smallint not null,
        priceperticket decimal(8,2),
        totalprice decimal(8,2),
        listtime timestamp);
    """, 
    """
    create table sales(
        salesid integer not null,
        listid integer not null distkey,
        sellerid integer not null,
        buyerid integer not null,
        eventid integer not null,
        dateid smallint not null sortkey,
        qtysold smallint not null,
        pricepaid decimal(8,2),
        commission decimal(8,2),
        saletime timestamp);
    """
]

In [None]:
with conn.cursor() as cur:
    for sql in sql_create_table:
        cur.execute(sql)
        print('Done: ', sql)

次にCOPYコマンドを利用しS3からデータをロードします。

COPYコマンドを実行する場合は、クラスターが S3 のオブジェクトにアクセスするために必要な認証情報を提供する必要があります。ここでは推奨の認証方法であるIAM Roleでの認証を行っています。詳細については
「[IAM ロールを使用して COPY、UNLOAD、および CREATE EXTERNAL SCHEMA オペレーションを承認する](https://docs.aws.amazon.com/ja_jp/redshift/latest/mgmt/copy-unload-iam-role.html)」をご参照下さい。


In [None]:
sql_copy=[
    """
    copy users from 's3://awssampledbuswest2/tickit/allusers_pipe.txt' 
    credentials 'aws_iam_role={}' 
    delimiter '|' region 'us-west-2';
    """, 
    """
    copy venue from 's3://awssampledbuswest2/tickit/venue_pipe.txt' 
    credentials 'aws_iam_role={}' 
    delimiter '|' region 'us-west-2';
    """, 
    """
    copy category from 's3://awssampledbuswest2/tickit/category_pipe.txt' 
    credentials 'aws_iam_role={}' 
    delimiter '|' region 'us-west-2';
    """, 
    """
    copy date from 's3://awssampledbuswest2/tickit/date2008_pipe.txt' 
    credentials 'aws_iam_role={}' 
    delimiter '|' region 'us-west-2';
    """, 
    """
    copy event from 's3://awssampledbuswest2/tickit/allevents_pipe.txt' 
    credentials 'aws_iam_role={}' 
    delimiter '|' timeformat 'YYYY-MM-DD HH:MI:SS' region 'us-west-2';
    """, 
    """
    copy listing from 's3://awssampledbuswest2/tickit/listings_pipe.txt' 
    credentials 'aws_iam_role={}' 
    delimiter '|' region 'us-west-2';
    """, 
    """
    copy sales from 's3://awssampledbuswest2/tickit/sales_tab.txt'
    credentials 'aws_iam_role={}'
    delimiter '\t' timeformat 'MM/DD/YYYY HH:MI:SS' region 'us-west-2';
    """
]

In [None]:
%%time
with conn.cursor() as cur:
    for sql in sql_copy:
        cur.execute(sql.format(redshift_role))
        print('Done: ', sql)

SQLクエリを実行し、一部の必要なデータのみをpandasのDataFrameに格納します。

In [None]:
# Get definition for the sales table.
sql="""
SELECT *    
FROM pg_table_def    
WHERE tablename = 'sales';
"""
%time pd.read_sql(sql=sql, con=conn)

In [None]:
# Find total sales on a given calendar date.
sql="""
SELECT sum(qtysold) 
FROM   sales, date 
WHERE  sales.dateid = date.dateid 
AND    caldate = '2008-01-05';
"""
%time pd.read_sql(sql=sql, con=conn)

In [None]:
# Find top 10 buyers by quantity.
sql="""
SELECT firstname, lastname, total_quantity 
FROM   (SELECT buyerid, sum(qtysold) total_quantity
        FROM  sales
        GROUP BY buyerid
        ORDER BY total_quantity desc limit 10) Q, users
WHERE Q.buyerid = userid
ORDER BY Q.total_quantity desc;
"""
%time df = pd.read_sql(sql=sql, con=conn)
df.shape

In [None]:
df

In [None]:
# Find events in the 99.9 percentile in terms of all time gross sales.
sql="""
SELECT eventname, total_price 
FROM  (SELECT eventid, total_price, ntile(1000) over(order by total_price desc) as percentile 
       FROM (SELECT eventid, sum(pricepaid) total_price
             FROM   sales
             GROUP BY eventid)) Q, event E
       WHERE Q.eventid = E.eventid
       AND percentile = 1
ORDER BY total_price desc;
"""
%time df = pd.read_sql(sql=sql, con=conn)
df.shape

In [None]:
df.head()

DataFrameを可視化します。

In [None]:
df.total_price.hist()
plt.xlabel('Total price')
plt.ylabel('Histogram')

最後に、psycopg2のconnectionを閉じます。

In [None]:
conn.close()