Skip to content
This repository has been archived by the owner on Nov 22, 2021. It is now read-only.

ungikim/kinsumer

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

49 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

kinsumer

High level Amazon Kinesis Streams consumer.

Some features

  • Automatically detect shard count changes
  • Checkpoints/sequences persistence can be customized
  • Provided Checkpointer implementation for memory, and file
  • Memory bucket for temporary saving records

Usage

from kinsumer import Consumer

STREAM_REGION = 'ap-south-1'
STREAM_NAME = 'my-stream'
consumer = Consumer(__name__)
consumer.config.from_object(__name__)

@consumer.transform
def transform(data, shard_id, last_sequence_number, last_arrival_timestamp):
    """do transform and return"""
    return data

@consumer.after_consume
def after(data, shard_id, last_sequence_number, last_arrival_timestamp):
    """after transform and do something"""

if __name__ == '__main__':
    consumer.process()

Author and license

kinsumer is written by Ungi Kim, and licensed under the MIT license. You can find the source code from Github:

$ git clone git@github.com:ungikim/kinsumer.git

Missing features

  • Redis Checkpointer
  • Consumer Heartbeat

(Contributions would be appreciated!)

About

High level Amazon Kinesis Streams consumer

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages