In [4]:
stream_name =  'kinesis-word-stream'
region = 'us-east-2'
aws_profile = 'streaming-data-amazon-kinesis'

## Creating a stream

In [6]:
import boto
from boto.kinesis.exceptions import ResourceInUseException
import os
import time

if aws_profile:
    os.environ['AWS_PROFILE'] = aws_profile

# connect to the kinesis
kinesis = boto.kinesis.connect_to_region(region)

def get_status():
    r = kinesis.describe_stream(stream_name)
    description = r.get('StreamDescription')
    status = description.get('StreamStatus')
    return status

def create_stream(stream_name):
    try:
        # create the stream
        kinesis.create_stream(stream_name, 1)
        print('stream {} created in region {}'.format(stream_name, region))
    except ResourceInUseException:
        print('stream {} already exists in region {}'.format(stream_name, region))


    # wait for the stream to become active
    while get_status() != 'ACTIVE':
        time.sleep(1)
    print('stream {} is active'.format(stream_name))

In [8]:
create_stream(stream_name)

stream kinesis-word-stream created in region us-east-2
stream kinesis-word-stream is active


## Creating a producer

Produces a stream of data. 

partition_key = ip address
data = UNIX timestamp

In [10]:
import datetime
import time
import threading
from boto.kinesis.exceptions import ResourceNotFoundException

class KinesisProducer(threading.Thread):
    """Producer class for AWS Kinesis streams

    This class will emit records with the IP addresses as partition key and
    the emission timestamps as data"""

    def __init__(self, stream_name, sleep_interval=None, ip_addr='8.8.8.8'):
        self.stream_name = stream_name
        self.sleep_interval = sleep_interval
        self.ip_addr = ip_addr
        super().__init__()

    def put_record(self):
        """put a single record to the stream"""
        timestamp = datetime.datetime.utcnow()
        part_key = self.ip_addr
        data = timestamp.isoformat()

        kinesis.put_record(self.stream_name, data, part_key)

    def run_continously(self):
        """put a record at regular intervals"""
        while True:
            self.put_record()
            time.sleep(self.sleep_interval)

    def run(self):
        """run the producer"""
        try:
            if self.sleep_interval:
                self.run_continously()
            else:
                self.put_record()
        except ResourceNotFoundException:
            print('stream {} not found. Exiting'.format(self.stream_name))

In [11]:
producer1 = KinesisProducer(stream_name, sleep_interval=2, ip_addr='8.8.8.8')
producer2 = KinesisProducer(stream_name, sleep_interval=5, ip_addr='8.8.8.9')
producer1.start()
producer2.start()

## Creating a consumer

Consumes the data flown from the producer. It just prints out the key and data.

In [12]:
from boto.kinesis.exceptions import ProvisionedThroughputExceededException
import datetime

class KinesisConsumer:
    """Generic Consumer for Amazon Kinesis Streams"""
    def __init__(self, stream_name, shard_id, iterator_type,
                 worker_time=30, sleep_interval=0.5):

        self.stream_name = stream_name
        self.shard_id = str(shard_id)
        self.iterator_type = iterator_type
        self.worker_time = worker_time
        self.sleep_interval = sleep_interval

    def process_records(self, records):
        """the main logic of the Consumer that needs to be implemented"""
        raise NotImplementedError

    @staticmethod
    def iter_records(records):
        for record in records:
            part_key = record['PartitionKey']
            data = record['Data']
            yield part_key, data

    def run(self):
        """poll stream for new records and pass them to process_records method"""
        response = kinesis.get_shard_iterator(self.stream_name,
            self.shard_id, self.iterator_type)

        next_iterator = response['ShardIterator']

        start = datetime.datetime.now()
        finish = start + datetime.timedelta(seconds=self.worker_time)

        while finish > datetime.datetime.now():
            try:
                response = kinesis.get_records(next_iterator, limit=25)

                records = response['Records']

                if records:
                    self.process_records(records)

                next_iterator = response['NextShardIterator']
                time.sleep(self.sleep_interval)
            except ProvisionedThroughputExceededException as ptee:
                time.sleep(1)

In [13]:
class EchoConsumer(KinesisConsumer):
    """Consumers that echos received data to standard output"""
    def process_records(self, records):
        """print the partion key and data of each incoming record"""
        for part_key, data in self.iter_records(records):
            print(part_key, ":", data)

In [14]:
shard_id = 'shardId-000000000000'
iterator_type =  'LATEST'
worker = EchoConsumer(stream_name, shard_id, iterator_type, worker_time=10)

In [15]:
worker.run()

8.8.8.9 : 2021-12-12T12:22:27.082714
8.8.8.8 : 2021-12-12T12:22:28.015056
8.8.8.8 : 2021-12-12T12:22:30.590019
8.8.8.9 : 2021-12-12T12:22:32.616004
8.8.8.8 : 2021-12-12T12:22:33.126118


## Creating a counter class

Counts the number of distinct requests from each particular IP in a specific time window, ie 1 minute

In [16]:
from collections import defaultdict, Counter
from dateutil import parser
from operator import itemgetter

class CounterConsumer(KinesisConsumer):
    """Consumer that counts IP occurences in 1-minute time buckets"""
    
    def __init__(self, stream_name, shard_id, iterator_type, worker_time):
        self.time_buckets = defaultdict(Counter)
        sleep_interval = 20 # seconds
        super().__init__(stream_name, shard_id, iterator_type, worker_time, sleep_interval)
        
    def print_counters(self):
        """helper method to show counting results"""
        
        now = datetime.datetime.utcnow()
        print("##### Last run at {}".format(now))
        for timestamp, ip_counts in self.time_buckets.items():
            # sort counts with respect to the IP address
            ip_counts = sorted(ip_counts.items(), key=itemgetter(0))
            print(timestamp, ':', list(ip_counts))
            
    def process_records(self, records):
        for ip_addr, timestamp_str in self.iter_records(records):
            timestamp = parser.parse(timestamp_str)
            timestamp = timestamp.replace(second=0, microsecond=0)
            self.time_buckets[timestamp][ip_addr] += 1
        self.print_counters()  

In [17]:
worker = CounterConsumer(stream_name, shard_id, iterator_type, worker_time=120)
worker.run()

##### Last run at 2021-12-12 12:23:39.984880
2021-12-12 12:23:00 : [('8.8.8.8', 8), ('8.8.8.9', 4)]
##### Last run at 2021-12-12 12:24:00.525199
2021-12-12 12:23:00 : [('8.8.8.8', 16), ('8.8.8.9', 7)]
##### Last run at 2021-12-12 12:24:21.068572
2021-12-12 12:23:00 : [('8.8.8.8', 16), ('8.8.8.9', 7)]
2021-12-12 12:24:00 : [('8.8.8.8', 8), ('8.8.8.9', 4)]
##### Last run at 2021-12-12 12:24:41.622645
2021-12-12 12:23:00 : [('8.8.8.8', 16), ('8.8.8.9', 7)]
2021-12-12 12:24:00 : [('8.8.8.8', 16), ('8.8.8.9', 8)]
##### Last run at 2021-12-12 12:25:02.168272
2021-12-12 12:23:00 : [('8.8.8.8', 16), ('8.8.8.9', 7)]
2021-12-12 12:24:00 : [('8.8.8.8', 24), ('8.8.8.9', 11)]


## Deleting the stream

In [18]:
kinesis.delete_stream(stream_name)

stream kinesis-word-stream not found. Exiting
stream kinesis-word-stream not found. Exiting
