* intro to kinesis
* configure your AWS credentials
* KCL wordputter
* implementing simple processor class (echo)
* implementing simple counter (no ordering)
* testing with two KCL wordputters with lag (simulating long network delay)
* implement counter with buffer

# Introduction

At Sqreen we use AWS Kinesis service to process data from our agents in near real-time. This kind of processing became recently popular with the appearance of general use libraries that support it (such as [Apache Kafka](https://kafka.apache.org/)). Since such libraries deal with the stream of data, the name of such processing became "stream processing". It's a departure from the old model of analytics that ran analysis in batches (hence its name "batch processing"). The main differences between these two approaches are:

* stream processing deals with data that are punctual in time, i.e. with events that are generated at specific points in time, whereas batch processing is applied to data batches (for example, stored in databases),
* stream processing analyses data online, i.e. almost immediately after it arrives, whereas batch processing waits for the data collection to be finished (the moment can be defined arbitarily, for example, at the end of the day) to analyse it off-line,
* data analysed by stream processing is unbounded, i.e. it does not have the specific end, whereas the batches are bounded, i.e. they have a well defined size.

Libraries such as [Apache Kafka](https://kafka.apache.org/) provide streams that receive data from the event  sources (producers) and pass them down to the consumers, which in turn can forward them to other streams. In essence they are similar to message queues, but they support multiple consumers that process the same messages in parallel (like in [publish-subscribe](https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern) messaging model) and store the old messages even after they were delivered to the consumers. They are a kind of append-only event logs. Although, logs are most commonly associated with the append only flat files siting in the `\var\log` directory and containing information logged by your operating system, streams are logs that are optimised for storing/provisioning  binary data (that could be text but also fragments of images, sensor readings, etc.). This log-like design of streams allows new consumers to be added and read from the stream starting with any offset (any message in the past) or removed without any impact on the remaining consumers. Since a single machine may not be able to process high-frequency events created in real web applications, both streams and their consumers can be distributed by partitioning the source events.

Streams have found applications in many problems. They are commonly used for real-time data analytics (such as streams of twits), for replicating databases (both for performance and reliability reasons), for real-time monitoring and detection of special events (such as fraud detection) and for building data-intensive systems that require different representations of the same data (for example, databases for operations, indexes for fast queries, and data warehouses for running batch analyses).

[Amazon Kinesis Data Streams](https://docs.aws.amazon.com/streams/latest/dev/introduction.html) is a managed service that provides streaming platform. It includes stream storage and an API to implement producers and consumers. Amazon charges per hour of each stream partition (called shards in Kinesis) and per volume of data flowing through the stream.

* what is stream and stream processing
* difference between streams and queues
* applications of stream processing

# Goal

The goal of this tutorial is to familirize you with the stream processing with Amazon Kinesis. In particular, we will try to implement a simple producer-stream-consumer pipeline that counts the number of requests in a consecutive, one-minute-long time windows. We will apply this pipeline to simulated data, but it could be easily extended to work with real websites. This is precisely one of the applications that we use Kinesis for at Sqreen (more about it below).

## Requirements

To install dependencies, run the following commands at the command line (i.e. in the shell).
    
```
pip install aws
pip install boto
```

## Configure AWS credentials

To connect to AWS, you must first create your credentials (you will get them from the AWS Console). Then, simply configure them using the following command:

```aws configure --profile blogpost-kinesis```

`blogpost-kinesis` is the name of the profile you will use for this tutorial. When requested you will need to copy-paste you public and secret access keys obtained from [AWS Management Console](https://console.aws.amazon.com/).

## Creating stream


Let's create our first stream. You can either do it using the AWS Console or the API. We will use the second approach. First, we need to define the name of the stream, the region in which we will create it, and the profile to use for our AWS credentials (you can set it to `None` if you use the default profile).

In [1]:
stream_name =  'blogpost-word-stream'
region = 'eu-west-1'
aws_profile = 'blogpost-kinesis'

Now we can use `boto` library to create the stream and wait until it becomes active.

In [2]:
import boto
from boto.kinesis.exceptions import ResourceInUseException
import os
import time

if aws_profile:
    os.environ['AWS_PROFILE'] = aws_profile

# connect to the kinesis
kinesis = boto.kinesis.connect_to_region(region)

try:
    # create the stream
    kinesis.create_stream(stream_name, 1)
    print('stream {} created in region {}'.format(stream_name, region))
except ResourceInUseException:
    print('stream {} already exists in region {}'.format(stream_name, region))

def get_status():
    r = kinesis.describe_stream(stream_name)
    description = r.get('StreamDescription')
    status = description.get('StreamStatus')
    return status

# wait for the stream to become active
while get_status() != 'ACTIVE':
    time.sleep(1)
print('stream {} is active'.format(stream_name))

stream blogpost-word-stream created in region eu-west-1
stream blogpost-word-stream is active


# Putting data into streams

To have operational stream processing, we need a source of the messagers (producer in AWS terminology) and receiver (consumer) that will obtain and process the messages. We will first define the producer.

In [3]:
import datetime
import time
import threading
from boto.kinesis.exceptions import ResourceNotFoundException

class KinesisProducer(threading.Thread):
    """Producer class for AWS Kinesis streams
    
    This class will emit records with the IP addresses as partition key and
    the emission timestamps as data"""
    
    def __init__(self, stream_name, sleep_interval=None, ip_addr='8.8.8.8'):
        self.stream_name = stream_name
        self.sleep_interval = sleep_interval
        self.ip_addr = ip_addr
        super().__init__()
        
    def put_record(self):
        """put a single record to the stream"""
        timestamp = datetime.datetime.utcnow()
        part_key = self.ip_addr
        data = timestamp.isoformat()

        kinesis.put_record(self.stream_name, data, part_key)
    
    def run_continously(self):
        """put a record at regular intervals"""
        while True:
            self.put_record()
            time.sleep(self.sleep_interval)
                
    def run(self):
        try:
            if self.sleep_interval:
                self.run_continously()
            else:
                self.put_record()
        except ResourceNotFoundException:
            print('stream {} not found. Exiting'.format(self.stream_name))

Note that for the partion key we used the IP address and for the data the timestamps. In theory, you are almost completely free to choose whatever you want for the data, as long as it can be serialised in binary format and it's less than 50 KB of size. If you need emit larger data, you need to split it into several records. The partion key must be a string shorter than 256 characters, it will be used to determine which shard to send the data to (see below).

Note that we implemented the `KinesisProducer` as a Python thread, such that it can run in the background and won't block the Python interpreter. This way we can continue executing Python instructions.

Now we create two of such producers with different IP addresses and different intervals between consecutive messages.

In [4]:
producer1 = KinesisProducer(stream_name, sleep_interval=2, ip_addr='8.8.8.8')
producer2 = KinesisProducer(stream_name, sleep_interval=5, ip_addr='8.8.8.9')
producer1.start()
producer2.start()

Sqreen's Security Automation feature allows one to monitor traffic at the website and set conditions under which a given client should be blocked (such as, trying to read the same page too many times). To implement this feature, we are running similar event sources that inform the stream about the IP addresses from which the requests are emitted together with the timestamp of the request.

# Consuming from stream

Consumers receive the messages from the stream and process them. Their output could be messages forwared to another stream, file saved on the filesystem (or Amazon S3 storage) or records stored in a database. 

First, let's define a generic consumer, which will consist of `run` method polling for new events from the Kinesis stream and `process_method` that will process the event data and produce any of the side effects (i.e. forwarding the results to another stream or commiting them to a database). The `process_method` will not be implemented in this generic base class, and it will need to be implemented in the sub-classes (see below).

In [5]:
from boto.kinesis.exceptions import ProvisionedThroughputExceededException
import datetime

In [6]:
# https://github.com/aws-samples/kinesis-poster-worker/blob/master/worker.py

class KinesisConsumer:
    """Generic Consumer for Amazon Kinesis Streams"""
    def __init__(self, stream_name, shard_id, iterator_type,
                 worker_time=30, sleep_interval=0.5):
   
        self.stream_name = stream_name
        self.shard_id = str(shard_id)
        self.iterator_type = iterator_type
        self.worker_time = worker_time
        self.sleep_interval = sleep_interval
        
    def process_records(self, records):
        """the main logic of the Consumer that needs to be implemented"""
        raise NotImplementedError
    
    @staticmethod
    def iter_records(records):
        for record in records:
            part_key = record['PartitionKey']
            data = record['Data']
            yield part_key, data
    
    def run(self):
        """poll stream for new records and forward them to process_records method"""
        response = kinesis.get_shard_iterator(self.stream_name,
            self.shard_id, self.iterator_type)
        
        next_iterator = response['ShardIterator']

        start = datetime.datetime.now()
        finish = start + datetime.timedelta(seconds=self.worker_time)
        
        while finish > datetime.datetime.now():
            try:
                response = kinesis.get_records(next_iterator, limit=25)
        
                records = response['Records']
            
                if records:
                    self.process_records(records)
            
                next_iterator = response['NextShardIterator']
                time.sleep(self.sleep_interval)
            except ProvisionedThroughputExceededException as ptee:
                time.sleep(1)

Note that each stream can have many consumers that receive all the messages and process them independently. Now, we will implement `process_records` method that will simply print the received messages to the `stdout`. We will do that by sub-classing the `KinesisConsumer` class.

In [7]:
class EchoConsumer(KinesisConsumer):
    """Consumers that echos received data to standard output"""
    def process_records(self, records):
        """print the partion key and data of each incoming record"""
        for part_key, data in self.iter_records(records):
            print(part_key, ":", data)

We attach the consumer to our stream. To do that we need to pass the shard ID and the position in the stream to start processing the messages. For the later, we can choose between that newest (`LATEST`) or the oldest (`TRIM_HORIZON`) record in the stream. Note that the default retention period for messages in Kinesis streams is 24 hours. It can be extended up to 168 hours at an additional cost.

The streams are partitioned into seperate "sub-streams" (called shards) that receive messages from the same source. The target shard for each message is determined from the partion key. Each consumer can read from one or more shards, but there must be at least one consumer per shard, otherwise some messages will be lost. Since, we only use one shard in this example, we can directly pass the default shard ID. If you need to configure more than one shard (to increase the throughput), you will need to query the stream for the IDs of all active shards using the API. For the sake of this tutorial, we will assume that we have only a single shard.

In [8]:
shard_id = 'shardId-000000000000'
iterator_type =  'LATEST'
worker = EchoConsumer(stream_name, shard_id, iterator_type, worker_time=10)

In [9]:
worker.run()

8.8.8.8 : 2018-07-31T21:56:42.972560
8.8.8.8 : 2018-07-31T21:56:45.093759
8.8.8.9 : 2018-07-31T21:56:46.252859
8.8.8.8 : 2018-07-31T21:56:47.200131
8.8.8.8 : 2018-07-31T21:56:49.331134


As expected the consumer printed all received records with their partion keys (IP addresses) and data (timestamps). 

Finally, we can implement a consumer with some non-trivial logic. The goal of this consumer is to count the number of distinct requests from each particular IP in a specific time window (here one minute). Again, we will subclass the `KinesisConsumer` class and re-implement the `process_records` method. In addition, we will define one extra helper method `print_counters` that will simply dump the current counts to the standard output. In practice, we would forward the outputs of such processing to another stream for further analysis (filtering, detection of untypical events etc.) or store it in the DB. This is what actually happens in Sqreen's Security Automation pipeline.

[TODO: Should add here a simplified structure of Automation pipeline?].

In [10]:
from collections import defaultdict, Counter
from dateutil import parser
from operator import itemgetter

class CounterConsumer(KinesisConsumer):
    """Consumer that counts IP occurences in 1-minute time buckets"""
    
    def __init__(self, stream_name, shard_id, iterator_type, worker_time):
        self.time_buckets = defaultdict(Counter)
        sleep_interval = 20 # seconds
        super().__init__(stream_name, shard_id, iterator_type, worker_time, sleep_interval)
        
    def print_counters(self):
        """helper method to show counting results"""
        
        now = datetime.datetime.utcnow()
        print("##### Last run at {}".format(now))
        for timestamp, ip_counts in self.time_buckets.items():
            # sort counts with respect to the IP address
            ip_counts = sorted(ip_counts.items(), key=itemgetter(0))
            print(timestamp, ':', list(ip_counts))
            
    def process_records(self, records):
        for ip_addr, timestamp_str in self.iter_records(records):
            timestamp = parser.parse(timestamp_str)
            timestamp = timestamp.replace(second=0, microsecond=0)
            self.time_buckets[timestamp][ip_addr] += 1
        self.print_counters()         

Let's test the consumer:

In [11]:
worker = CounterConsumer(stream_name, shard_id, iterator_type, worker_time=120)
worker.run()

##### Last run at 2018-07-31 21:57:12.333587
2018-07-31 21:56:00 : [('8.8.8.8', 4), ('8.8.8.9', 1)]
2018-07-31 21:57:00 : [('8.8.8.8', 5), ('8.8.8.9', 3)]
##### Last run at 2018-07-31 21:57:32.456236
2018-07-31 21:56:00 : [('8.8.8.8', 4), ('8.8.8.9', 1)]
2018-07-31 21:57:00 : [('8.8.8.8', 15), ('8.8.8.9', 7)]
##### Last run at 2018-07-31 21:57:52.605160
2018-07-31 21:56:00 : [('8.8.8.8', 4), ('8.8.8.9', 1)]
2018-07-31 21:57:00 : [('8.8.8.8', 24), ('8.8.8.9', 10)]
##### Last run at 2018-07-31 21:58:12.749545
2018-07-31 21:56:00 : [('8.8.8.8', 4), ('8.8.8.9', 1)]
2018-07-31 21:57:00 : [('8.8.8.8', 28), ('8.8.8.9', 12)]
2018-07-31 21:58:00 : [('8.8.8.8', 6), ('8.8.8.9', 2)]
##### Last run at 2018-07-31 21:58:32.902239
2018-07-31 21:56:00 : [('8.8.8.8', 4), ('8.8.8.9', 1)]
2018-07-31 21:57:00 : [('8.8.8.8', 28), ('8.8.8.9', 12)]
2018-07-31 21:58:00 : [('8.8.8.8', 15), ('8.8.8.9', 6)]


All the lines seperated by the hash signs `#####` show the results of the counting process for a single run of the consumer. Since the consumer is executed each time new events arrive, the lines show updated state of the `time_buckets` cache. Each line starts with the timestamp denoting the beginning of the time bucket (it ends with the beginning of the next time bucket, i.e. the windows do not overlap), and the it's followed by the list of IP address and count pairs. Every time the consumer runs the values are updated, such that the counts increase. If new requests arrive at the time that is not covered by any of the buckets, a new bucket is added and the count starts from zero for this bucket. The effect is roughly what we tried to achieve.

# Conclusions

We demonstrated how to use Amazon Kinesis on a simple request counting example. Although the example was simplified, it contained the basic components of all stream processors &mdash; the two producers, a stream (with a signle partition) and one consumer. You can easily take this example and adapt it to your needs. 

One important limitation of the present `CounterConsumer` is that it keeps in memory and print all windows at each run of the consumer. In real applications we might want to save only the completed windows in the database and remove them from the `time_buckets` cache, but this would require to know when a window is closed and no new events will arrive that would fit in this window. This may not be a trivial problem, because we can never be sure whether some events will arrive late, for example due to some network delay or temporary network outage.

Another extension of `CounterConsumer` is to allow for an overlap between the windows. This overlap would provide some smoothing in the counts and make our pipeline more responsive, because the end user would not have to wait for the full window to be complete before seeing a new event being added to the counts.

Last, but not least we did not cover important topic of spawning new consumers in case when the existing consumer fails or we want to scale up the application and checkpointing the state of the consumer. This process can be automatised using [Amazon Kinesis Client library](https://github.com/awslabs/amazon-kinesis-client-python), but with the complication that you would need to start the consumer by means of Java-based orchestrator (called MultiLangDaeamon).

In [12]:
# delete the stream at the end of the exercise to minimize AWS costs
kinesis.delete_stream(stream_name)

stream blogpost-word-stream not found. Exiting
stream blogpost-word-stream not found. Exiting
