In [None]:
# kinesisを使う場合、resource apiは使用できない。client apiを使用する。

import boto3
kinesis = boto3.client('kinesis')

In [None]:
# kinesisストリームの名前

stream_name = 'test'

In [None]:
# ストリームを作成。シャードは2つとする。

kinesis.create_stream(StreamName=stream_name, ShardCount=2)

In [None]:
# ストリームの情報を表示。
# ストリームの作成には20秒ほどかかる
# Ctrl-Enterで何度かこのセルを実行。
# StreamStatus が ACTIVE になったら、先へ進むことができる。

kinesis.describe_stream(StreamName=stream_name)

In [None]:
# ストリームの情報を表示。シャードの情報に絞り込んで表示してみよう。
# 2つのシャード  shardId-000000000000 と shardId-000000000001 が確認できる。
# また、それぞれのシャードに StartingHashKey, EndingHashKey という設定があるのがわかる。

kinesis.describe_stream(StreamName=stream_name)['StreamDescription']['Shards']

In [None]:
# StartingHashKey / EndingHashKey は、128ビット(16進数で32桁)の数値。それぞれ16進数で表示して確認してみる。
print('shardId-000000000000 StartingHashKey: 0x%032x' % 0)
print('shardId-000000000000   EndingHashKey: 0x%032x' % 170141183460469231731687303715884105727)

print('shardId-000000000001 StartingHashKey: 0x%032x' % 170141183460469231731687303715884105728)
print('shardId-000000000001   EndingHashKey: 0x%032x' % 340282366920938463463374607431768211455)

In [None]:
# シャードに適当なレコードを書き込む。PartitionKeyの指定を「a」から「h」を手動で変更しながら繰り返し実行し、
# 各レコードがどのシャードに入るかたしかめてみよう。

kinesis.put_record(StreamName=stream_name,Data='hello',PartitionKey='h')

In [None]:
# 各アルファベットのmd5値を計算し、shardId-000000000001 の StartingHashKey と比較してみよう
from hashlib import md5
for ch in ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']:
    digest = md5(ch.encode()).hexdigest()
    print(ch, digest, digest < '80000000000000000000000000000000', sep=' ')

In [None]:
# PartitionKeyを「a」から「h」まで変化させながらレコードを追加し、挙動を確認してみよう
for p in ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']:
    response = kinesis.put_record(StreamName=stream_name,Data='hello',PartitionKey=p)
    print('%c: ShardId: %s, \n   SequenceNumber: %d' % 
          (p, response['ShardId'], int(response['SequenceNumber'])))

In [None]:
# ShardIdが shardId-000000000001 になるパーティションキーだけを連続投入し、
# シーケンス番号の増加を観察してみよう
for p in [ 'b', 'd', 'e', 'f', 'g' ]:
    response = kinesis.put_record(StreamName=stream_name,Data='hello',PartitionKey=p)
    print('%c: ShardId: %s, \n   SequenceNumber: %d' % 
          (p, response['ShardId'], int(response['SequenceNumber'])))

In [None]:
# シャードからレコードを読み取るには、まずシャードを指定して、「シャードイテレータ」を入手する。
# またこのとき、レコードを、シャードのどのあたりから読み込み始めるかを指定する。
# 今回は「TRIM_HORIZON」として、シャードに残っている最も古いレコードから読み取りを行う。

kinesis.get_shard_iterator(
    StreamName=stream_name, 
    ShardId='shardId-000000000000', 
    ShardIteratorType='TRIM_HORIZON')

In [None]:
# シャードイテレータを取得
si = kinesis.get_shard_iterator(
    StreamName=stream_name, 
    ShardId='shardId-000000000000', 
    ShardIteratorType='TRIM_HORIZON')['ShardIterator']

# シャードイテレータを使って、シャードのレコードを取り出す. 

# 各レコードには次のような情報が付与されているのが確認できる。
# SequenceNumber = シーケンス番号
# ApproximateArrivalTimestamp = 概算到着タイムスタンプ
# Data = レコードのデータ
# PartitionKey = パーティションキー

kinesis.get_records(ShardIterator=si)