# Stream Filtering

Experiment: can we use Kinesis data analytics to filter the records written to one stream to populate other streams? Or more accurately, how do we do this, and what's the latency for a record written to the main stream to hit the filtered stream?

## Setup

First, we need some streams

In [None]:
import boto3

kinesis_client = boto3.client('kinesis')

In [None]:
import os
account_no = os.environ['ACCOUNT_NO']

In [None]:
# Create some streams
main_stream_response = kinesis_client.create_stream(
    StreamName='main', 
    ShardCount = 1)

In [None]:
kinesis_client.describe_stream(StreamName='main')

In [None]:
kinesis_client.create_stream(StreamName='filtered', ShardCount=1)

In [None]:
kinesis_client.describe_stream(StreamName='filtered')

In [None]:
from datetime import datetime, timezone

def timestamp():
    the_time = datetime.now(timezone.utc)
    return the_time.isoformat()

## Stream Write

In [None]:
import uuid

event = {
    "specversion":"1.0",
    "type":"newFoo",
    "source":"foo",
    "id":str(uuid.uuid4()),
    "time":timestamp(),
    "data":{"fooaddr":"foostuffval",
           "foolist": [1,2,3],
           "barobj": {
               "baraatr1":"yes",
               "barattr2":False,
               "barattr3":122.22
           }}
}

In [None]:
event['source']

In [None]:
import json

prr = kinesis_client.put_record(
    StreamName='main',
    Data=json.dumps(event).encode(),
    PartitionKey=event['source']
)

In [None]:
prr

## Analytics App

### Role

In [None]:
kinesis_app_policy = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "ReadInputKinesis",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords"
            ],
            "Resource": [
                "arn:aws:kinesis:us-east-1:" + account_no + ":stream/main"
            ]
        },
        {
            "Sid": "WriteOutputKinesis",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:PutRecord",
                "kinesis:PutRecords"
            ],
            "Resource": [
                "arn:aws:kinesis:us-east-1:" + account_no + ":stream/filtered"
            ]
        }
    ]
}

In [None]:
kinesis_app_policy

In [None]:
assume_role_policy = {
    "Statement":[{
        "Effect":"Allow",
        "Principal": {"Service":["kinesisanalytics.amazonaws.com"]},
        "Action": ["sts:AssumeRole"]
    }]
}
    
    
import json

json.dumps(assume_role_policy)

In [None]:
iam = boto3.client('iam')

crr = iam.create_role(
    RoleName='sample-ka-app-role',
    AssumeRolePolicyDocument=json.dumps(assume_role_policy)
)

print(crr)

In [None]:
prp = iam.put_role_policy(
    RoleName='sample-ka-app-role',
    PolicyName='KAPolicy',
    PolicyDocument=json.dumps(kinesis_app_policy)
)

print(prp)

### App Definition

In [None]:
ka = boto3.client('kinesisanalyticsv2')

In [None]:
car = ka.create_application(
    ApplicationName='dave',
    ApplicationDescription='dave app',
    RuntimeEnvironment='SQL-1_0',
    ServiceExecutionRole='arn:aws:iam::" + account_no + ":role/service-role/sample-ka-app-role"
    
    more to come
)

In [None]:
# Dump an existing application

ka.describe_application(
    ApplicationName='dave'
)

In [None]:
# Pump the base cloud events.io schema through the stream to let the schema derivation tool define the schema for us.

seed_event = {
    "specversion":"1.0",
    "type":"newFoo",
    "source":"foo",
    "id":str(uuid.uuid4()),
    "time":timestamp()
}

kinesis_client.put_record(
    StreamName='main',
    Data=json.dumps(seed_event).encode(),
    PartitionKey=event['source']
)

### Sample Describe Application Output

```console
{'ApplicationDetail': {'ApplicationARN': 'arn:aws:kinesisanalytics:us-east-1:111111111111:application/dave',
  'ApplicationDescription': 'dave app',
  'ApplicationName': 'dave',
  'RuntimeEnvironment': 'SQL-1_0',
  'ApplicationStatus': 'RUNNING',
  'ApplicationVersionId': 5,
  'CreateTimestamp': datetime.datetime(2020, 1, 17, 16, 42, 12, tzinfo=tzlocal()),
  'LastUpdateTimestamp': datetime.datetime(2020, 1, 17, 17, 2, 53, tzinfo=tzlocal()),
  'ApplicationConfigurationDescription': {'SqlApplicationConfigurationDescription': {'InputDescriptions': [{'InputId': '2.1',
      'NamePrefix': 'SOURCE_SQL_STREAM',
      'InAppStreamNames': ['SOURCE_SQL_STREAM_001'],
      'KinesisStreamsInputDescription': {'ResourceARN': 'arn:aws:kinesis:us-east-1:111111111111:stream/main',
       'RoleARN': 'arn:aws:iam::111111111111:role/service-role/kinesis-analytics-dave-us-east-1'},
      'InputSchema': {'RecordFormat': {'RecordFormatType': 'JSON',
        'MappingParameters': {'JSONMappingParameters': {'RecordRowPath': '$'}}},
       'RecordEncoding': 'UTF-8',
       'RecordColumns': [{'Name': 'type',
         'Mapping': '$.type',
         'SqlType': 'VARCHAR(64)'},
        {'Name': 'payload', 'Mapping': '$', 'SqlType': 'VARCHAR(32000)'}]},
      'InputParallelism': {'Count': 1},
      'InputStartingPositionConfiguration': {'InputStartingPosition': 'NOW'}}],
    'OutputDescriptions': [{'OutputId': '5.1',
      'Name': 'DESTINATION_SQL_STREAM',
      'KinesisStreamsOutputDescription': {'ResourceARN': 'arn:aws:kinesis:us-east-1:111111111111:stream/filtered',
       'RoleARN': 'arn:aws:iam::111111111111:role/service-role/kinesis-analytics-dave-us-east-1'},
      'DestinationSchema': {'RecordFormatType': 'JSON'}}]},
   'ApplicationCodeConfigurationDescription': {'CodeContentType': 'PLAINTEXT',
    'CodeContentDescription': {'TextContent': '-- ** Continuous Filter ** \n-- Performs a continuous filter based on a WHERE condition.\n--          .----------.   .----------.   .----------.              \n--          |  SOURCE  |   |  INSERT  |   |  DESTIN. |              \n-- Source-->|  STREAM  |-->| & SELECT |-->|  STREAM  |-->Destination\n--          |          |   |  (PUMP)  |   |          |              \n--          \'----------\'   \'----------\'   \'----------\'               \n-- STREAM (in-application): a continuously updated entity that you can SELECT from and INSERT into like a TABLE\n-- PUMP: an entity used to continuously \'SELECT ... FROM\' a source STREAM, and INSERT SQL results into an output STREAM\n-- Create output stream, which can be used to send to a destination\nCREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("payload" VARCHAR(32000));\n-- Create pump to insert into output \nCREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"\n-- Select all columns from source stream\nSELECT STREAM "payload"\nFROM "SOURCE_SQL_STREAM_001"\n-- LIKE compares a string to a string pattern (_ matches all char, % matches substring)\n-- SIMILAR TO compares string to a regex, may use ESCAPE\nWHERE "type" = \'newFoo\';\n'}}}},
 'ResponseMetadata': {'RequestId': '0fe5fb44-5691-41e7-9c20-385ab35efff0',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '0fe5fb44-5691-41e7-9c20-385ab35efff0',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '2763',
   'date': 'Sat, 18 Jan 2020 01:15:07 GMT'},
  'RetryAttempts': 0}}

```

### Role Policy Outline
```
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "ReadInputKinesis",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords"
            ],
            "Resource": [
                "arn:aws:kinesis:us-east-1:111111111111:stream/main"
            ]
        },
        {
            "Sid": "WriteOutputKinesis",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:PutRecord",
                "kinesis:PutRecords"
            ],
            "Resource": [
                "arn:aws:kinesis:region:account-id:stream/%STREAM_NAME_PLACEHOLDER%"
            ]
        },
        {
            "Sid": "WriteOutputFirehose",
            "Effect": "Allow",
            "Action": [
                "firehose:DescribeDeliveryStream",
                "firehose:PutRecord",
                "firehose:PutRecordBatch"
            ],
            "Resource": [
                "arn:aws:firehose:region:account-id:deliverystream/%FIREHOSE_NAME_PLACEHOLDER%"
            ]
        },
        {
            "Sid": "ReadInputFirehose",
            "Effect": "Allow",
            "Action": [
                "firehose:DescribeDeliveryStream",
                "firehose:Get*"
            ],
            "Resource": [
                "arn:aws:firehose:region:account-id:deliverystream/%FIREHOSE_NAME_PLACEHOLDER%"
            ]
        },
        {
            "Sid": "ReadS3ReferenceData",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject"
            ],
            "Resource": [
                "arn:aws:s3:::kinesis-analytics-placeholder-s3-bucket/kinesis-analytics-placeholder-s3-object"
            ]
        },
        {
            "Sid": "ReadEncryptedInputKinesisStream",
            "Effect": "Allow",
            "Action": [
                "kms:Decrypt"
            ],
            "Resource": [
                "arn:aws:kms:region:account-id:key/%SOURCE_STREAM_ENCRYPTION_KEY_PLACEHOLDER%"
            ],
            "Condition": {
                "StringEquals": {
                    "kms:ViaService": "kinesis.us-east-1.amazonaws.com"
                },
                "StringLike": {
                    "kms:EncryptionContext:aws:kinesis:arn": "arn:aws:kinesis:us-east-1:111111111111:stream/main"
                }
            }
        },
        {
            "Sid": "WriteEncryptedOutputKinesisStream1",
            "Effect": "Allow",
            "Action": [
                "kms:GenerateDataKey"
            ],
            "Resource": [
                "arn:aws:kms:region:account-id:key/%DESTINATION_STREAM_ENCRYPTION_KEY_PLACEHOLDER%"
            ],
            "Condition": {
                "StringEquals": {
                    "kms:ViaService": "kinesis.us-east-1.amazonaws.com"
                },
                "StringLike": {
                    "kms:EncryptionContext:aws:kinesis:arn": "arn:aws:kinesis:region:account-id:stream/%STREAM_NAME_PLACEHOLDER%"
                }
            }
        },
        {
            "Sid": "WriteEncryptedOutputKinesisStream2",
            "Effect": "Allow",
            "Action": [
                "kms:GenerateDataKey"
            ],
            "Resource": [
                "arn:aws:kms:region:account-id:key/%DESTINATION_STREAM_ENCRYPTION_KEY_PLACEHOLDER%"
            ],
            "Condition": {
                "StringEquals": {
                    "kms:ViaService": "kinesis.us-east-1.amazonaws.com"
                },
                "StringLike": {
                    "kms:EncryptionContext:aws:kinesis:arn": "arn:aws:kinesis:region:account-id:stream/%STREAM_NAME_PLACEHOLDER%"
                }
            }
        },
        {
            "Sid": "WriteEncryptedOutputKinesisStream3",
            "Effect": "Allow",
            "Action": [
                "kms:GenerateDataKey"
            ],
            "Resource": [
                "arn:aws:kms:region:account-id:key/%DESTINATION_STREAM_ENCRYPTION_KEY_PLACEHOLDER%"
            ],
            "Condition": {
                "StringEquals": {
                    "kms:ViaService": "kinesis.us-east-1.amazonaws.com"
                },
                "StringLike": {
                    "kms:EncryptionContext:aws:kinesis:arn": "arn:aws:kinesis:region:account-id:stream/%STREAM_NAME_PLACEHOLDER%"
                }
            }
        },
        {
            "Sid": "UseLambdaFunction",
            "Effect": "Allow",
            "Action": [
                "lambda:InvokeFunction",
                "lambda:GetFunctionConfiguration"
            ],
            "Resource": [
                "arn:aws:lambda:region:account-id:function:%FUNCTION_NAME_PLACEHOLDER%:%FUNCTION_VERSION_PLACEHOLDER%"
            ]
        }
    ]
}
```

In [None]:
# Describe role output

iam = boto3.client('iam')

In [None]:
iam.get_role(RoleName='kinesis-analytics-sample-us-east-1')

In [None]:
iam.list_role_policies(RoleName='kinesis-analytics-sample-us-east-1')

In [None]:
car = ka.create_application(
    AppName = 'Dave',
    ApplicationDescription = 'Dave the wonder app',
    RuntimeEnvironment = 'SQL-1_0',
    ServiceExecutionRole = 'uh-oh'
    # Oh crap how do we specify all this stuff - maybe create one from the console and dump it...
)

## Stream Read

In [None]:
## Read from stream

shardId = prr['ShardId']
print('shard id is %s' % shardId)

gsir = kinesis_client.get_shard_iterator(
    StreamName='main',
    ShardId=shardId,
    ShardIteratorType='TRIM_HORIZON'
)
print(gsir)

In [None]:
## Read from currne position of the iterator
grr = kinesis_client.get_records(
    ShardIterator=gsir['ShardIterator']
)

print(grr)

In [None]:
records = grr['Records']
for r in records:
    print(r)

## Cleanup

In [None]:
kinesis_client.delete_stream(StreamName='main')
kinesis_client.delete_stream(StreamName='filtered')

In [None]:
kinesis_client.list_streams()

In [None]:
iam.delete_role_policy(
    RoleName='sample-ka-app-role',
    PolicyName='KAPolicy'
)

In [None]:
iam.delete_role(
    RoleName='sample-ka-app-role'
)