#  Amazon Redshift を利用したデータパイプラインの演習

## 0. 準備
最初に一度だけ実行します。　２度目からは、この章をスキップください


必要な python ライブラリーを pipで、このインスタンスにインストール

In [None]:
!pip install psycopg2-binary
!pip install numpy
!pip install matplotlib
!pip install boto3

# 1. S3バケット間　データコピー

ログインしている環境の12桁のAccountIDを設定します。

In [None]:
#accountid = 'xxxxxxxxxxxx'
accountids =  !aws sts get-caller-identity --query Account --output text
accountid = accountids[0]
print(accountid)

講師のS3バケットにあるデータ

In [None]:
!aws s3 ls s3://kh-handsondata/srcdata/gz/sh10/

自分のバケットにコピーを実行

In [None]:
!aws s3 cp --recursive s3://kh-handsondata/srcdata/gz/sh10  s3://{accountid}-workshopdata/srcdata/gz/sh10 

## サイズの確認

### コピー元

In [None]:
!aws s3 ls --recursive --summarize --human-readable s3://kh-handsondata/srcdata/gz/sh10/

### コピー先

In [None]:
!aws s3 ls --recursive --summarize --human-readable s3://{accountid}-workshopdata/srcdata/gz/sh10/

ファイル数と総サイズを確認ください

### 管理コンソールから　データを確認ください

# Redshiftに接続

必要な Python Package をインポートします。

In [1]:
# Import packages
import pandas as pd
import matplotlib.pyplot as plt
import psycopg2
import boto3
import json

## Obtain parameters from AWS CloudFormation

AWS CloudFormation で入力したパラメータおよび出力を取得します。

In [2]:
# Please edit stack name
stack_name = 'step1'


cfn = boto3.client('cloudformation')
response = cfn.describe_stacks(StackName=stack_name)['Stacks'][0]

for item in response['Parameters']:
    if item['ParameterKey'] == 'MasterUserName':
        db_user = item['ParameterValue']
    elif item['ParameterKey'] == 'DatabaseName':
        db_name = item['ParameterValue']

db_port='5439'
        
for item in response['Outputs']:
    if item['OutputKey'] == 'RedshiftClusterEndpoint':
        cluster_endpoint = item['OutputValue'].split(':')[0]
    elif item['OutputKey'] == 'RedshiftClusterName':
        cluster_name = item['OutputValue']
    elif item['OutputKey'] == 'RedshiftClusterRole':
        redshift_role = item['OutputValue']
        
#  show parameters
print('db_user: {}'.format(db_user))
print('db_name: {}'.format(db_name))
print('db_port: {}'.format(db_port))
print('cluster_endpoint: {}'.format(cluster_endpoint))
print('cluster_name: {}'.format(cluster_name))
print('redshift_role: {}'.format(redshift_role))


db_user: awsuser
db_name: mydb
db_port: 5439
cluster_endpoint: redshiftcluster-vgjakykw4eqh.ctlwj4ufuakz.ap-northeast-1.redshift.amazonaws.com
cluster_name: redshiftcluster-vgjakykw4eqh
redshift_role: arn:aws:iam::317319571831:role/step1-RedshiftRole-15LONBVV3NI6N


## Credential 取得

Amazon Redshift へアクセスするための、[一時的データベースユーザー認証情報]を取得します。
(https://docs.aws.amazon.com/ja_jp/redshift/latest/mgmt/generating-iam-credentials-cli-api.html)


In [3]:
# get temporal cluster credentials
redshift = boto3.client('redshift')
credentials = redshift.get_cluster_credentials(
    DbUser=db_user, 
    DbName=db_name, 
    ClusterIdentifier=cluster_name, 
    DurationSeconds=3600,
    AutoCreate=False
)

tmp_db_user = credentials['DbUser']
tmp_db_password = credentials['DbPassword']

print('User: {}'.format(tmp_db_user ))
print('DB password: {}'.format( tmp_db_password))

User: IAM:awsuser
DB password: AG+Ru+IuekSuESktONqIKXD2EMyxsUxozEGY1rYmY7bQG29lgxsQ22HofRPCW8QpPp+eFqQ==


## Redshift DB接続

Python 用の PostgreSQL ドライバである psycopg2 を利用して Redshift へアクセスします。

In [None]:
# connect to Redshift
conn = psycopg2.connect(
    host=cluster_endpoint, 
    port=db_port, 
    dbname=db_name, 
    user=tmp_db_user, 
    password=tmp_db_password
)
conn.autocommit = True
print(conn)

## Create table 

スキーマとテーブルを作成します。

In [None]:
sql_create_table = [
    """
        CREATE SCHEMA IF NOT EXISTS sh10;
    """, 
    """
        CREATE TABLE IF NOT EXISTS sh10.channels(
          channel_id NUMERIC(38,0) NOT NULL,
          channel_desc VARCHAR(20) NOT NULL,
          channel_class VARCHAR(20) NOT NULL,
          channel_class_id NUMERIC(38,0) NOT NULL,
          channel_total VARCHAR(13) NOT NULL,
          channel_total_id NUMERIC(38,0) NOT NULL
        )
        DISTSTYLE ALL;
    """,
    """
        CREATE TABLE IF NOT EXISTS sh10.countries(
          country_id NUMERIC(38,0) NOT NULL,
          country_iso_code VARCHAR(2) NOT NULL,
          country_name VARCHAR(40) NOT NULL,
          country_subregion VARCHAR(30) NOT NULL,
          country_subregion_id NUMERIC(38,0) NOT NULL,
          country_region VARCHAR(20) NOT NULL,
          country_region_id NUMERIC(38,0) NOT NULL,
          country_total VARCHAR(11) NOT NULL,
          country_total_id NUMERIC(38,0) NOT NULL,
          country_name_hist VARCHAR(40)
        )
        DISTSTYLE ALL;
    """,
    """
        CREATE TABLE IF NOT EXISTS sh10.customers(
          cust_id NUMERIC(38,0) NOT NULL,
          cust_first_name VARCHAR(20) NOT NULL,
          cust_last_name VARCHAR(40) NOT NULL,
          cust_gender VARCHAR(1) NOT NULL,
          cust_year_of_birth SMALLINT NOT NULL,
          cust_marital_status VARCHAR(20),
          cust_street_address VARCHAR(40) NOT NULL,
          cust_postal_code VARCHAR(10) NOT NULL,
          cust_city VARCHAR(30) NOT NULL,
          cust_city_id NUMERIC(38,0) NOT NULL,
          cust_state_province VARCHAR(40) NOT NULL,
          cust_state_province_id NUMERIC(38,0) NOT NULL,
          country_id NUMERIC(38,0) NOT NULL,
          cust_main_phone_number VARCHAR(25) NOT NULL,
          cust_income_level VARCHAR(30),
          cust_credit_limit NUMERIC(38,0),
          cust_email VARCHAR(50),
          cust_total VARCHAR(14) NOT NULL,
          cust_total_id NUMERIC(38,0) NOT NULL,
          cust_src_id NUMERIC(38,0),
          cust_eff_from TIMESTAMP WITHOUT TIME ZONE,
          cust_eff_to TIMESTAMP WITHOUT TIME ZONE,
          cust_valid VARCHAR(1)
        )
        DISTSTYLE KEY DISTKEY (cust_id);
    """,
    """
        CREATE TABLE IF NOT EXISTS sh10.products(
          prod_id INTEGER NOT NULL,
          prod_name VARCHAR(50) NOT NULL,
          prod_desc VARCHAR(4000) NOT NULL,
          prod_subcategory VARCHAR(50) NOT NULL,
          prod_subcategory_id NUMERIC(38,0) NOT NULL,
          prod_subcategory_desc VARCHAR(2000) NOT NULL,
          prod_category VARCHAR(50) NOT NULL,
          prod_category_id NUMERIC(38,0) NOT NULL,
          prod_category_desc VARCHAR(2000) NOT NULL,
          prod_weight_class SMALLINT NOT NULL,
          prod_unit_of_measure VARCHAR(20),
          prod_pack_size VARCHAR(30) NOT NULL,
          supplier_id INTEGER NOT NULL,
          prod_status VARCHAR(20) NOT NULL,
          prod_list_price NUMERIC(8,2) NOT NULL,
          prod_min_price NUMERIC(8,2) NOT NULL,
          prod_total VARCHAR(13) NOT NULL,
          prod_total_id NUMERIC(38,0) NOT NULL,
          prod_src_id NUMERIC(38,0),
          prod_eff_from TIMESTAMP WITHOUT TIME ZONE,
          prod_eff_to TIMESTAMP WITHOUT TIME ZONE,
          prod_valid VARCHAR(1)
        )
        DISTSTYLE KEY DISTKEY (prod_id);
    """,
    """
        CREATE TABLE IF NOT EXISTS sh10.promotions(
          promo_id INTEGER NOT NULL,
          promo_name VARCHAR(30) NOT NULL,
          promo_subcategory VARCHAR(30) NOT NULL,
          promo_subcategory_id NUMERIC(38,0) NOT NULL,
          promo_category VARCHAR(30) NOT NULL,
          promo_category_id NUMERIC(38,0) NOT NULL,
          promo_cost NUMERIC(10,2) NOT NULL,
          promo_begin_date TIMESTAMP WITHOUT TIME ZONE NOT NULL,
          promo_end_date TIMESTAMP WITHOUT TIME ZONE NOT NULL,
          promo_total VARCHAR(15) NOT NULL,
          promo_total_id NUMERIC(38,0) NOT NULL
        )
        DISTSTYLE EVEN;
    """,
    """
        CREATE TABLE IF NOT EXISTS sh10.sales(
          prod_id NUMERIC(38,0) NOT NULL,
          cust_id NUMERIC(38,0) NOT NULL,
          time_id TIMESTAMP WITHOUT TIME ZONE NOT NULL,
          channel_id NUMERIC(38,0) NOT NULL,
          promo_id NUMERIC(38,0) NOT NULL,
          quantity_sold NUMERIC(10,2) NOT NULL,
          seller INTEGER NOT NULL,
          fulfillment_center INTEGER NOT NULL,
          courier_org INTEGER NOT NULL,
          tax_country VARCHAR(3) NOT NULL,
          tax_region VARCHAR(3),
          amount_sold NUMERIC(10,2) NOT NULL
        )
        DISTSTYLE KEY DISTKEY (cust_id)
        SORTKEY (time_id);
    """,

    """
        CREATE TABLE IF NOT EXISTS sh10.times(
          time_id TIMESTAMP WITHOUT TIME ZONE NOT NULL,
          day_name VARCHAR(36) NOT NULL,
          day_number_in_month VARCHAR(2) NOT NULL,
          day_number_in_year VARCHAR(3) NOT NULL,
          calendar_year VARCHAR(4) NOT NULL,
          calendar_quarter_number VARCHAR(1) NOT NULL,
          calendar_month_number VARCHAR(2) NOT NULL,
          calendar_week_number VARCHAR(2) NOT NULL,
          calendar_month_desc VARCHAR(7) NOT NULL,
          calendar_quarter_desc VARCHAR(6) NOT NULL
        )
        DISTSTYLE ALL;
   """
]

In [None]:
with conn.cursor() as cur:
    for sql in sql_create_table:
        cur.execute(sql)
        print('Done: ', sql)
    

プライマーキー、外部キーを設定します。

In [None]:
sql_create_cst = [
    """
        ALTER TABLE SH10.CHANNELS
          ADD CONSTRAINT CHANNELS_PK PRIMARY KEY ( CHANNEL_ID );
    """,
    """
        ALTER TABLE SH10.COUNTRIES
          ADD CONSTRAINT COUNTRIES_PK PRIMARY KEY ( COUNTRY_ID );
    """,
    """
        ALTER TABLE SH10.CUSTOMERS
          ADD CONSTRAINT CUSTOMERS_PK PRIMARY KEY ( CUST_ID );
    """,
    """
        ALTER TABLE SH10.PRODUCTS
          ADD CONSTRAINT PRODUCTS_PK PRIMARY KEY ( PROD_ID ) ;
    """,
    """
        ALTER TABLE SH10.PROMOTIONS
          ADD CONSTRAINT PROMOTIONS_PK PRIMARY KEY ( PROMO_ID );
    """,
    """
        ALTER TABLE SH10.SALES
          ADD CONSTRAINT SALES_UK UNIQUE ( PROD_ID , CUST_ID , PROMO_ID , CHANNEL_ID , TIME_ID ) ;
    """,
    """
        ALTER TABLE SH10.TIMES
          ADD CONSTRAINT TIMES_PK PRIMARY KEY ( TIME_ID );
    """,
    """
        ALTER TABLE SH10.CUSTOMERS
          ADD CONSTRAINT CUST_COUNTRIES_FK FOREIGN KEY ( COUNTRY_ID )
          REFERENCES SH10.COUNTRIES ( COUNTRY_ID );
    """,
    """
        ALTER TABLE SH10.SALES
          ADD CONSTRAINT SALES_CHANNELS_FK FOREIGN KEY ( CHANNEL_ID )
          REFERENCES SH10.CHANNELS ( CHANNEL_ID );
    """,
    """
        ALTER TABLE SH10.SALES
          ADD CONSTRAINT SALES_CUSTOMERS_FK FOREIGN KEY ( CUST_ID )
          REFERENCES SH10.CUSTOMERS ( CUST_ID );
    """,
    """
        ALTER TABLE SH10.SALES
          ADD CONSTRAINT SALES_PRODUCTS_FK FOREIGN KEY ( PROD_ID )
          REFERENCES SH10.PRODUCTS ( PROD_ID ) ;
    """,
    """
        ALTER TABLE SH10.SALES
          ADD CONSTRAINT SALES_PROMOTIONS_FK
          FOREIGN KEY ( PROMO_ID )
          REFERENCES SH10.PROMOTIONS ( PROMO_ID );
    """,
    """
        ALTER TABLE SH10.SALES
          ADD CONSTRAINT SALES_TIMES_FK FOREIGN KEY ( TIME_ID )
          REFERENCES SH10.TIMES ( TIME_ID );
    """
]

In [None]:
with conn.cursor() as cur:
    for sql in sql_create_cst:
        cur.execute(sql)
        print('Done: ', sql)

### 作成されたテーブルを確認

In [None]:
sql_query = """
    SELECT *
    FROM svv_all_tables
    WHERE schema_name = 'sh10'
    ORDER BY TABLE_NAME, SCHEMA_NAME;
"""

In [None]:
with conn.cursor() as cur:
    cur.execute(sql_query)
    print('Done: ', sql_query)
    for row in cur:
        print(row)

# 2. Load data from Amazon S3

データをS3からRedshiftにロードしましょう。　まず１つのテーブルで試します。

In [None]:
sql_copy=[
    """
    COPY sh10.channels
    FROM 's3://{}-workshopdata/srcdata/gz/sh10/channels/sh10_channels_manifest'
    credentials 'aws_iam_role={}' 
    DELIMITER '\t'
    region 'ap-northeast-1'
    GZIP
    MANIFEST
    MAXERROR 1;
    """
]

In [None]:
%%time
with conn.cursor() as cur:
    for sql in sql_copy:
        cur.execute(sql.format(accountid, redshift_role))
        print('Done: ', sql)

ロードした件数をカウントしましょう

In [None]:
sql_query = """
        SELECT count(*) from sh10.channels;
"""
%time df = pd.read_sql(sql=sql_query, con=conn)
df

テーブルの中身を確認しましょう

In [None]:
sql_query = """
        SELECT *  from sh10.channels limit 5;
"""

%time df = pd.read_sql(sql=sql_query, con=conn)
df

## 残りのテーブルへのロード

In [None]:
sql_copy=[
    """
    COPY sh10.countries
    FROM 's3://{}-workshopdata/srcdata/gz/sh10/countries/sh10_countries_manifest'
    credentials 'aws_iam_role={}' 
    DELIMITER '\t'
    region 'ap-northeast-1'
    GZIP
    MANIFEST
    MAXERROR 1 ;
    """,
    """
    COPY sh10.customers
    FROM 's3://{}-workshopdata/srcdata/gz/sh10/customers/sh10_customers_manifest'
    credentials 'aws_iam_role={}' 
    DELIMITER '\t'
    region 'ap-northeast-1'
    GZIP
    MANIFEST
    MAXERROR 1 ;
    """,
    """
    COPY sh10.products
    FROM 's3://{}-workshopdata/srcdata/gz/sh10/products/sh10_products_manifest'
    credentials 'aws_iam_role={}' 
    DELIMITER '\t'
    region 'ap-northeast-1'
    GZIP
    MANIFEST
    MAXERROR 1;
    """,
    """
    COPY sh10.promotions
    FROM 's3://{}-workshopdata/srcdata/gz/sh10/promotions/sh10_promotions_manifest'
    credentials 'aws_iam_role={}' 
    DELIMITER '\t'
    region 'ap-northeast-1'
    GZIP
    MANIFEST
    MAXERROR 1;
    """,
    """
    COPY sh10.sales
    FROM 's3://{}-workshopdata/srcdata/gz/sh10/sales/sh10_sales_manifest'
    credentials 'aws_iam_role={}' 
    DELIMITER '\t'
    region 'ap-northeast-1'
    GZIP
    MANIFEST
    MAXERROR 1;
    """,
    """
    COPY sh10.times
    FROM 's3://{}-workshopdata/srcdata/gz/sh10/times/sh10_times_manifest'
    credentials 'aws_iam_role={}' 
    DELIMITER '\t'
    region 'ap-northeast-1'
    GZIP
    MANIFEST
    MAXERROR 1;
    """
]

ここから１０分程度要します。　１つ目が出力が出てきたら、ここで休憩してください。

In [None]:
%%time
with conn.cursor() as cur:
    for sql in sql_copy:
        cur.execute(sql.format(accountid, redshift_role))
        print('Done: ', sql)

### 件数確認

無事にロードできたか件数を確認しましょう。

In [None]:
sql_query = [
    """
        SELECT count(*) from sh10.channels;
    """,
    """
        SELECT count(*) from sh10.countries;
    """,
    """
        SELECT count(*) from sh10.customers;
    """,
    """
        SELECT count(*) from sh10.products;
    """,
    """
        SELECT count(*) from sh10.promotions;
    """,
    """
        SELECT count(*) from sh10.sales;
    """,
    """
        SELECT count(*) from sh10.times;
    """
]

In [None]:
with conn.cursor() as cur:
    for sql in sql_query:
        cur.execute(sql)
        print('Done: ', sql)
        for row in cur:
            print(row)
            

余裕のある方は、テーブルの中身を確認してみてください。 SalesとCustomersは、件数が多いので、絞り込むか、もしくは limitを設定してください。

In [None]:
sql_query = """
        SELECT *  from sh10.sales limit 5;
"""

%time df = pd.read_sql(sql=sql_query, con=conn)
df

In [None]:
sql_query = """
        SELECT *  from sh10.customers limit 5;
"""

%time df = pd.read_sql(sql=sql_query, con=conn)
df

# 3. テーブルを調べてしましょう

### Salesテーブルのクエリ

In [None]:
sql_query = '''select
 date_trunc('mon', time_id)
  , count(1) as deal_count
from
  sh10.sales
group by
  date_trunc('mon', time_id) 
order by
  date_trunc('mon', time_id) desc
limit
  20;
'''
%time pd.read_sql(sql=sql_query, con=conn)

# 4. unload

S3にデータをunloadしてみましょう

In [None]:
sql_query = '''select
 to_char(time_id, 'YYYY') as year, to_char(time_id, 'MM') as month, to_char(time_id, 'DD') as day,*
from
  sh10.sales
where
  date_trunc('mon', time_id) = '2013-12-01'
limit
  5;
'''

df = pd.read_sql(sql=sql_query, con=conn)
df

### CSV gzip圧縮でunload

2013/12 のデータをアンロードします

In [None]:
sql_unload = """  unload  ( 'select * from sh10.sales  where   date_trunc(''mon'', time_id) = ''2013-12-01''' )
  to  's3://{}-workshopdata/unloaddata/gz/sh10/sales/sales-'
    credentials 'aws_iam_role={}'
    DELIMITER '\t'
    region 'ap-northeast-1'
    GZIP
    MANIFEST;
"""

In [None]:
%%time
with conn.cursor() as cur:
        cur.execute(sql_unload.format(accountid, redshift_role))
        print('Done: ', sql)

In [None]:
!aws s3 ls --recursive --summarize --human-readable s3://{accountid}-workshopdata/unloaddata/gz/sh10/sales/

### Parquet形式でunload

2013/12 のデータをアンロードします

In [None]:
sql_unload = """  unload  ( 'select  to_char(time_id, ''YYYY'') as year, to_char(time_id, ''MM'') as month,  to_char(time_id, ''DD'') as day, * from sh10.sales  where   date_trunc(''mon'', time_id) = ''2013-12-01''' )
  to  's3://{}-workshopdata/unloaddata/parquet/sh10/sales/'
    credentials 'aws_iam_role={}'
    FORMAT AS PARQUET
    PARTITION BY (year, month, day)
    region 'ap-northeast-1'
    MANIFEST;
"""

In [None]:
%%time
with conn.cursor() as cur:
        cur.execute(sql_unload.format(accountid,redshift_role))
        print('Done: ', sql)

In [None]:
!aws s3 ls --recursive --summarize --human-readable s3://{accountid}-workshopdata/unloaddata/parquet/sh10/sales/

## ある月(2013/12)を削除

### 削除対象カウント

In [None]:
sql_query = '''select
 date_trunc('mon', time_id)  , count(1) as deal_count
from
  sh10.sales
where  
   date_trunc( 'mon', time_id) = '2013-12-01'
group by
  date_trunc('mon', time_id) 
order by
  date_trunc('mon', time_id) desc
limit
  20;
'''

%time pd.read_sql(sql=sql_query, con=conn)

### 削除

2012/12を　Redshiftから削除してみます

In [None]:
sql_del= """
delete 
from 
   sh10.sales
where 
   date_trunc( 'mon', time_id) = '2013-12-01';    
"""

In [None]:
with conn.cursor() as cur:
        cur.execute(sql_del)
        print('Done: ', sql_del)

削除した月をカウントしましょう。

In [None]:
sql_query = '''select
 date_trunc('mon', time_id)  , count(1) as deal_count
from
  sh10.sales
where  
   date_trunc( 'mon', time_id) = '2013-12-01'
group by
  date_trunc('mon', time_id) 
order by
  date_trunc('mon', time_id) desc
limit
  20;
'''

%time pd.read_sql(sql=sql_query, con=conn)

## Unloadした Parquetファイルを、Redshiftにロードしてみましょう

In [None]:
sql_copy = [
"""
    COPY sh10.sales
    FROM  's3://{}-workshopdata/unloaddata/parquet/sh10/sales/manifest'
    credentials 'aws_iam_role={}' 
    FORMAT AS PARQUET
    MANIFEST;
"""
]

In [None]:
%%time
with conn.cursor() as cur:
    for sql in sql_copy:
        cur.execute(sql.format(accountid, redshift_role))
        print('Done: ', sql)

再び削除した月をカウントしましょう。

In [None]:
sql_query = '''select
 date_trunc('mon', time_id)  , count(1) as deal_count
from
  sh10.sales
where  
   date_trunc( 'mon', time_id) = '2013-12-01'
group by
  date_trunc('mon', time_id) 
order by
  date_trunc('mon', time_id) desc
limit
  20;
'''

%time pd.read_sql(sql=sql_query, con=conn)

# 5. Spectrumを使ってみましょう

# 外部テーブルの登録

Redshiftに外部テーブル用のスキーマを作成

In [None]:
sql_create_table= [
"""
    CREATE EXTERNAL SCHEMA workshop from data catalog
    DATABASE 'mydb'
    IAM_ROLE  '{}'
    create external database if not exists;
"""
]

In [None]:
with conn.cursor() as cur:
    for sql in sql_create_table:
        cur.execute(sql.format(redshift_role))
        print('Done: ', sql.format(redshift_role))

### gz用　外部テーブル

In [None]:
sql_create_table= [
"""
    CREATE EXTERNAL TABLE  workshop.sales_gz(
          prod_id DECIMAL(38,0),
          cust_id DECIMAL(38,0),
          time_id TIMESTAMP,
          channel_id DECIMAL(38,0),
          promo_id DECIMAL(38,0),
          quantity_sold DECIMAL(10,2),
          seller INT,
          fulfillment_center INT,
          courier_org INT,
          tax_country VARCHAR(3),
          tax_region VARCHAR(3),
          amount_sold DECIMAL(10,2)
    )
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY '\t'  LINES TERMINATED BY '\n'
    LOCATION 's3://{}-workshopdata/unloaddata/gz/sh10/sales/' ; 
"""    
]

In [None]:
with conn.cursor() as cur:
    for sql in sql_create_table:
        cur.execute(sql.format(accountid, redshift_role))
        print('Done: ', sql)

### Parquet用 外部テーブル

In [None]:
sql_create_table= [
"""
    CREATE EXTERNAL TABLE workshop.sales_pq(
         prod_id DECIMAL(38,0),
         cust_id DECIMAL(38,0),
         time_id TIMESTAMP,
         channel_id DECIMAL(38,0),
         promo_id DECIMAL(38,0),
         quantity_sold DECIMAL(10,2),
         seller INT,
         fulfillment_center INT,
         courier_org INT,
         tax_country VARCHAR(3),
         tax_region VARCHAR(3),
         amount_sold DECIMAL(10,2)
    )
    PARTITIONED BY (year int, month int, day int)
    STORED AS PARQUET
    LOCATION 's3://{}-workshopdata/unloaddata/parquet/sh10/sales/' 
    table properties ('compression_type'='snappy')
;
"""
]

In [None]:
with conn.cursor() as cur:
    for sql in sql_create_table:
        cur.execute(sql.format(accountid, redshift_role))
        print('Done: ', sql)

###  外部テーブルの作成を確認

In [None]:
sql = '''select * from svv_external_schemas;'''
%time pd.read_sql(sql=sql, con=conn)

In [None]:
sql = '''select * from svv_external_tables'''
%time pd.read_sql(sql=sql, con=conn)

### 補足　誤って作成されたテーブルの削除

In [None]:
################################
### 失敗した場合に テーブルを消す方法
sql_del= """
drop table workshop.sales_pq
"""
with conn.cursor() as cur:
#    cur.execute(sql_del)
    print('Done: ', sql_del)
################################

### Partitionへの追加

In [None]:
sql_add= [
"""
alter table workshop.sales_pq add
partition(year='2013', month='12',day='01') 
location 's3://{0}-workshopdata/unloaddata/parquet/sh10/sales/year=2013/month=12/day=01'
partition(year='2013', month='12',day='02') 
location 's3://{0}-workshopdata/unloaddata/parquet/sh10/sales/year=2013/month=12/day=02'
partition(year='2013', month='12',day='03') 
location 's3://{0}-workshopdata/unloaddata/parquet/sh10/sales/year=2013/month=12/day=03'
partition(year='2013', month='12',day='04') 
location 's3://{0}-workshopdata/unloaddata/parquet/sh10/sales/year=2013/month=12/day=04'
partition(year='2013', month='12',day='05') 
location 's3://{0}-workshopdata/unloaddata/parquet/sh10/sales/year=2013/month=12/day=05'
partition(year='2013', month='12',day='06') 
location 's3://{0}-workshopdata/unloaddata/parquet/sh10/sales/year=2013/month=12/day=06'
partition(year='2013', month='12',day='07') 
location 's3://{0}-workshopdata/unloaddata/parquet/sh10/sales/year=2013/month=12/day=07'
partition(year='2013', month='12',day='08') 
location 's3://{0}-workshopdata/unloaddata/parquet/sh10/sales/year=2013/month=12/day=08'
partition(year='2013', month='12',day='09') 
location 's3://{0}-workshopdata/unloaddata/parquet/sh10/sales/year=2013/month=12/day=09'
partition(year='2013', month='12',day='10') 
location 's3://{0}-workshopdata/unloaddata/parquet/sh10/sales/year=2013/month=12/day=10';
"""
]

In [None]:
with conn.cursor() as cur:
    for sql in sql_add:
        cur.execute(sql.format(accountid))
        print('Done: ', sql)

## Spectrum でクエリしてみましょう



### GZファイルのプレビュー

In [None]:
sql_query = '''select *
from
  workshop.sales_gz
limit
  10;
'''

%time pd.read_sql(sql=sql_query, con=conn)

### Parquetファイルのプレビュー

In [None]:
sql_query = '''select *
from
  workshop.sales_pq
limit
  10;
'''

%time pd.read_sql(sql=sql_query, con=conn)

### GZファイルの検索　(カウント)

In [None]:
sql_query = '''select
 date_trunc('mon', time_id)  , count(1) as deal_count
from
  workshop.sales_gz
group by
  date_trunc('mon', time_id) 
order by
  date_trunc('mon', time_id) desc
limit
  20;
'''

%time pd.read_sql(sql=sql_query, con=conn)

### Parquetファイルの検索　（カウント）

In [None]:
sql_query = '''select
 date_trunc('mon', time_id)  , count(1) as deal_count
from
  workshop.sales_pq
group by
  date_trunc('mon', time_id) 
order by
  date_trunc('mon', time_id) desc
limit
  20;
'''

%time pd.read_sql(sql=sql_query, con=conn)

### 残りのパーティション追加

In [None]:
sql_add= [
"""
alter table workshop.sales_pq add
partition(year='2013', month='12',day='11') 
location 's3://{0}-workshopdata/unloaddata/parquet/sh10/sales/year=2013/month=12/day=11'
partition(year='2013', month='12',day='12') 
location 's3://{0}-workshopdata/unloaddata/parquet/sh10/sales/year=2013/month=12/day=12'
partition(year='2013', month='12',day='13') 
location 's3://{0}-workshopdata/unloaddata/parquet/sh10/sales/year=2013/month=12/day=13'
partition(year='2013', month='12',day='14') 
location 's3://{0}-workshopdata/unloaddata/parquet/sh10/sales/year=2013/month=12/day=14'
partition(year='2013', month='12',day='15') 
location 's3://{0}-workshopdata/unloaddata/parquet/sh10/sales/year=2013/month=12/day=15'
partition(year='2013', month='12',day='16') 
location 's3://{0}-workshopdata/unloaddata/parquet/sh10/sales/year=2013/month=12/day=16'
partition(year='2013', month='12',day='17') 
location 's3://{0}-workshopdata/unloaddata/parquet/sh10/sales/year=2013/month=12/day=17'
partition(year='2013', month='12',day='18') 
location 's3://{0}-workshopdata/unloaddata/parquet/sh10/sales/year=2013/month=12/day=18'
partition(year='2013', month='12',day='19') 
location 's3://{0}-workshopdata/unloaddata/parquet/sh10/sales/year=2013/month=12/day=19'
partition(year='2013', month='12',day='20') 
location 's3://{0}-workshopdata/unloaddata/parquet/sh10/sales/year=2013/month=12/day=20'
partition(year='2013', month='12',day='21') 
location 's3://{0}-workshopdata/unloaddata/parquet/sh10/sales/year=2013/month=12/day=21'
partition(year='2013', month='12',day='22') 
location 's3://{0}-workshopdata/unloaddata/parquet/sh10/sales/year=2013/month=12/day=22'
partition(year='2013', month='12',day='23') 
location 's3://{0}-workshopdata/unloaddata/parquet/sh10/sales/year=2013/month=12/day=23'
partition(year='2013', month='12',day='24') 
location 's3://{0}-workshopdata/unloaddata/parquet/sh10/sales/year=2013/month=12/day=24'
partition(year='2013', month='12',day='25') 
location 's3://{0}-workshopdata/unloaddata/parquet/sh10/sales/year=2013/month=12/day=25'
partition(year='2013', month='12',day='26') 
location 's3://{0}-workshopdata/unloaddata/parquet/sh10/sales/year=2013/month=12/day=26'
partition(year='2013', month='12',day='27') 
location 's3://{0}-workshopdata/unloaddata/parquet/sh10/sales/year=2013/month=12/day=27'
partition(year='2013', month='12',day='28') 
location 's3://{0}-workshopdata/unloaddata/parquet/sh10/sales/year=2013/month=12/day=28'
partition(year='2013', month='12',day='29') 
location 's3://{0}-workshopdata/unloaddata/parquet/sh10/sales/year=2013/month=12/day=29'
partition(year='2013', month='12',day='30') 
location 's3://{0}-workshopdata/unloaddata/parquet/sh10/sales/year=2013/month=12/day=30'
partition(year='2013', month='12',day='31') 
location 's3://{0}-workshopdata/unloaddata/parquet/sh10/sales/year=2013/month=12/day=31';
"""
]

In [None]:
with conn.cursor() as cur:
    for sql in sql_add:
        cur.execute(sql.format(accountid))
        print('Done: ', sql)

###  もう一度 Parquetファイルの検索でカウント

In [None]:
sql_query = '''select
 date_trunc('mon', time_id)  , count(1) as deal_count
from
  workshop.sales_pq
group by
  date_trunc('mon', time_id) 
order by
  date_trunc('mon', time_id) desc
limit
  20;
'''

%time pd.read_sql(sql=sql_query, con=conn)

### システムテーブルを見てみましょう

In [None]:
sql_query = '''
select * from svl_s3query_summary order by starttime desc  limit 10;
'''
%time pd.read_sql(sql=sql_query, con=conn)

### 課金情報を見てみましょう

In [None]:
sql_query = '''
SELECT query,
       starttime,
       ROUND(elapsed::FLOAT/ 1000 / 1000,2) AS elp_s,
       ROUND(s3_scanned_bytes::FLOAT/ 1000 / 1000,4) AS s3_scan_mb,
       CASE
         WHEN s3_scanned_bytes > 0 
         THEN GREATEST(10,CEIL(s3_scanned_bytes::FLOAT/ 1000 / 1000))
         ELSE 0
       END AS s3_billing_mb,
       CASE
         WHEN s3_scanned_bytes > 0 
         THEN GREATEST(10,CEIL(s3_scanned_bytes::FLOAT/ 1000 / 1000))*0.0005	
         ELSE 0
       END AS s3_billing_cents
FROM svl_s3query_summary
WHERE userid > 1
AND  aborted = 0
Order by starttime desc  limit 10;
'''
%time pd.read_sql(sql=sql_query, con=conn)

## Reddshift と Spectrum のUnionをしてみましょう

In [None]:
sql_query = '''
select prod_id, cust_id, time_id, channel_id, promo_id, quantity_sold  from  workshop.sales_pq as  spectrum where spectrum.prod_id > 100 and spectrum.prod_id < 200
 union all
select prod_id, cust_id, time_id, channel_id, promo_id, quantity_sold from  sh10.sales as  native where native.prod_id > 100 and native.prod_id < 200  limit 10;
'''

%time pd.read_sql(sql=sql_query, con=conn)

## RedshiftとSpectrumでJoinしてみましょう。

In [None]:
sql_query = '''
select
  sales.time_id
  , sales.channel_id
  , sales.promo_id
  , sales.quantity_sold
  , sales.amount_sold
  , products.prod_category
  , products.prod_subcategory
  , channels.channel_class
  , promotions.promo_category
  , promotions.promo_subcategory
from
  workshop.sales_pq as sales
  , sh10.products as products
  , sh10.channels as channels
  , sh10.promotions as promotions
where
  sales.prod_id = products.prod_id
  and sales.channel_id = channels.channel_id
  and sales.promo_id = promotions.promo_id
  and sales.year > 2012;  
'''

%time pd.read_sql(sql=sql_query, con=conn)

## Close処理

最後に、psycopg2 の connection を閉じます。

In [None]:
conn.close()