# Trigger Kinesis Throughput Alarms

This notebook writes and reads to the test project's Kinesis Data Stream to create:
 - Write and read throughput errors
 - GetRecord/PutRecord(s) errors
 
for testing SLIC Watch alarms and dashboards.

In [None]:
import os
import boto3
from tqdm.notebook import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed

In [None]:
kinesis = boto3.client('kinesis')
stream_name = 'awesome-savage-stream'

## Trigger a PutRecord failure to trigger the PutRecord.Success Alarm
This triggers a failure by sending data that is too large and fails validation

In [None]:
kinesis.put_record(StreamName=stream_name, Data=os.urandom(1024**2 + 1), PartitionKey='a')

## Trigger a Write Throughput Limit Exceeded for a single shard

In [None]:
for _ in tqdm(range(0, 10)):
    kinesis.put_records(Records=[
        {
            'Data': data,
            'PartitionKey': 'a'
        }
        for _ in range(0, 2)
    ],
    StreamName=stream_name)

## Trigger a Read Throughput Limit Exceeded for a single shard

In [None]:
shard_id = kinesis.list_shards(StreamName=stream_name)['Shards'][0]['ShardId']

In [None]:
def fetch_records():
    itr = kinesis.get_shard_iterator(StreamName=stream_name, ShardId=shard_id, ShardIteratorType='TRIM_HORIZON')['ShardIterator']
    for _ in range(0, 50):
        resp = kinesis.get_records(ShardIterator=itr)
        itr = resp['NextShardIterator']

In [None]:
with ThreadPoolExecutor(max_workers=10) as ex:
    futures = [
        ex.submit(fetch_records) for _ in range(0, 10)
    ]
    for res in as_completed(futures):
        res.result()