Skip to content

Kinesis

Daniel Blazevski edited this page Sep 6, 2016 · 4 revisions

Introduction

Kinesis is a popular ingestion tool developed by Amazon. It is a service managed by AWS, so unlike other tools (e.g. Apache Kafka), Kinesis does not require you to to set up and configure on individual servers. The key concepts for using Kinesis for stream processing are

  • A stream: A queue for incoming data to reside in. Stream are labeled by a string. For example, Amazon might have an "Orders" stream, a "Customer-Review" stream, and so on.

  • A shard: A stream can be composed of one or more shards. One shard is able can read data at a rate of up to 2 MB/sec and can write up to 1,000 records/sec up to a max of 1 MB/sec. A user should specify the number of shards that coincides with the amount of data expected to be present in their system.

  • Partition Key: The Partition Key is a string that is hashed to determine which shard the record goes to. For instance, given record r = {name: 'Jane', city: 'New York'} one can, for example, specify the Partition Key as r["city"] which will send all the records with the same city to the same shard.

  • Producer: A producer is a source of data, typically generated external to your system in real-world applications (e.g. user click data)

  • Consumer: Once the data is placed in a stream, it can be processed and stored somewhere (e.g. on HDFS or a database). Anything that reads in data from a stream is said to be a consumer.

Preliminaries

When using AWS managed services, the boto library for Python is a good tool to connect and setup AWS services.

To install boto, run the command

pip install boto3

In order to not have your AWS credentials in your source code, boto allows you to store them in a file ~/.boto, which you have to create.

## copy and paste the three lines (modified with your AWS keys) into ~/.boto
[Credentials]
aws_access_key_id=XXXXX
aws_secret_access_key=XXXxxxXXX/XXXxxxXX

Example of creating a stream and putting data into the stream

Using the boto libary, one can easily create and feed data into a Kinesis stream. In the following example, the json library is used, which can be installed via

 pip install simplejson

The following script creates a Kinesis stream and puts in consecutive integers into the stream.

import boto3
import json

kinesis = boto3.client('kinesis')

""" Create kinesis stream, and wait until it is active. 
Without waiting, you will get errors when putting data into the stream
"""

stream = "myStream"
kinesis = boto3.client('kinesis')
if stream not in [f for f in kinesis.list_streams()['StreamNames']]:
    print 'Creating Kinesis stream %s' %  stream
    kinesis.create_stream(StreamName=stream, ShardCount=1)
else:
    print 'Kinesis stream %s exists' %  stream
while kinesis.describe_stream(StreamName=stream)['StreamDescription']['StreamStatus'] == 'CREATING':
    time.sleep(2)

i = 0
while 1==1:
     kinesis.put_record(StreamName=stream, Data=json.dumps(i), PartitionKey="partitionkey")
     i = i + 1

Example of consuming data from a stream

The boto library also allows one to read data already in a Kinesis stream. A simple example to read the latest data from the stream we created above is

import boto3
import time
import json

kinesis = boto3.client("kinesis")
shard_id = "shardId-000000000000" #we only have one shard!
pre_shard_it = kinesis.get_shard_iterator(StreamName="myStream", ShardId=shard_id, ShardIteratorType="LATEST")

shard_it = pre_shard_it["ShardIterator"]
while 1==1:
     out = kinesis.get_records(ShardIterator=shard_it, Limit=1)
     shard_it = out["NextShardIterator"]
     print out;
     time.sleep(1.0)

The output should looks like below when the two scripts are run in separate terminal tabs (the script to create and feed data into a stream is called ktest.py in the below figure)

Clone this wiki locally