-
Notifications
You must be signed in to change notification settings - Fork 61
Kinesis
Kinesis is a popular ingestion tool developed by Amazon. It is a service managed by AWS, so unlike other tools (e.g. Apache Kafka), Kinesis does not require you to to set up and configure on individual servers. The key concepts for using Kinesis for stream processing are
-
A stream: A queue for incoming data to reside in. Stream are labeled by a string. For example, Amazon might have an "Orders" stream, a "Customer-Review" stream, and so on.
-
A shard: A stream can be composed of one or more shards. One shard is able can read data at a rate of up to 2 MB/sec and can write up to 1,000 records/sec up to a max of 1 MB/sec. A user should specify the number of shards that coincides with the amount of data expected to be present in their system.
-
Partition Key: The Partition Key is a string that is hashed to determine which shard the record goes to. For instance, given record
r = {name: 'Jane', city: 'New York'}
one can, for example, specify the Partition Key asr["city"]
which will send all the records with the same city to the same shard. -
Producer: A producer is a source of data, typically generated external to your system in real-world applications (e.g. user click data)
-
Consumer: Once the data is placed in a stream, it can be processed and stored somewhere (e.g. on HDFS or a database). Anything that reads in data from a stream is said to be a consumer.
When using AWS managed services, the boto
library for Python is a good tool to connect and setup AWS services.
To install boto, run the command
pip install boto3
In order to not have your AWS credentials in your source code, boto
allows you to store them in a file ~/.boto
, which you have to create.
## copy and paste the three lines (modified with your AWS keys) into ~/.boto
[Credentials]
aws_access_key_id=XXXXX
aws_secret_access_key=XXXxxxXXX/XXXxxxXX
Using the boto libary, one can easily create and feed data into a Kinesis stream. In the following example, the json
library is used, which can be installed via
pip install simplejson
The following script creates a Kinesis stream and puts in consecutive integers into the stream.
import boto3
import json
kinesis = boto3.client('kinesis')
""" Create kinesis stream, and wait until it is active.
Without waiting, you will get errors when putting data into the stream
"""
stream = "myStream"
kinesis = boto3.client('kinesis')
if stream not in [f for f in kinesis.list_streams()['StreamNames']]:
print 'Creating Kinesis stream %s' % stream
kinesis.create_stream(StreamName=stream, ShardCount=1)
else:
print 'Kinesis stream %s exists' % stream
while kinesis.describe_stream(StreamName=stream)['StreamDescription']['StreamStatus'] == 'CREATING':
time.sleep(2)
i = 0
while 1==1:
kinesis.put_record(StreamName=stream, Data=json.dumps(i), PartitionKey="partitionkey")
i = i + 1
The boto library also allows one to read data already in a Kinesis stream. A simple example to read the latest data from the stream we created above is
import boto3
import time
import json
kinesis = boto3.client("kinesis")
shard_id = "shardId-000000000000" #we only have one shard!
pre_shard_it = kinesis.get_shard_iterator(StreamName="myStream", ShardId=shard_id, ShardIteratorType="LATEST")
shard_it = pre_shard_it["ShardIterator"]
while 1==1:
out = kinesis.get_records(ShardIterator=shard_it, Limit=1)
shard_it = out["NextShardIterator"]
print out;
time.sleep(1.0)
The output should looks like below when the two scripts are run in separate terminal tabs (the script to create and feed data into a stream is called ktest.py
in the below figure)
Find out more about the Insight Data Engineering Fellows Program in New York and Silicon Valley, apply today, or sign up for program updates.
You can also read our engineering blog here.