Skip to content

Latest commit

 

History

History
69 lines (46 loc) · 2.01 KB

README.md

File metadata and controls

69 lines (46 loc) · 2.01 KB

A simple Python AWS Kinesis Producer.

    

Build Status PyPI version License

Features

  • Error handling and retrying with exponential backoff
  • Automatic batching and flush callbacks
  • Threaded execution

Inspired by the AWS blog post Implementing Efficient and Reliable Producers with the Amazon Kinesis Producer Library.

Installation

You can use pip to install Kiner.

pip install kiner

Usage

To use Kiner, you'll need to have AWS authentication credentials configured as stated in the boto3 documentation

from kiner.producer import KinesisProducer

p = KinesisProducer('stream-name', batch_size=500, max_retries=5, threads=10)

for i in range(10000):
    p.put_record(i)

p.close()

To be notified when data is flushed to AWS Kinesis, provide a flush_callback

from uuid import uuid4
from kiner.producer import KinesisProducer

def on_flush(count, last_flushed_at, Data=b'', PartitionKey='', Metadata=()):
    print(f"""
        Flushed {count} messages at timestamp {last_flushed_at}
        Last message was {Metadata['id']} paritioned by {PartitionKey} ({len(Data)} bytes)
    """)

p = KinesisProducer('stream-name', flush_callback=on_flush)

for i in range(10000):
    p.put_record(i, metadata={'id': uuid4()}, partition_key=f"{i % 2}")

p.close()

Contributions