In [4]:
import argparse
import os
import boto3
import json
from datetime import datetime
import random
import time
import uuid

In [5]:
#Get available streams

client = boto3.client('kinesis', region_name='us-west-2')
result = client.list_streams()
streams = []

if 'StreamNames' in result:
    for st in result['StreamNames']:
        record = {}
        details = client.describe_stream_summary(
            StreamName=st)['StreamDescriptionSummary']
        record['StreamName'] = st
        record['StreamARN'] = details['StreamARN']
        record['OpenShardCount'] = details['OpenShardCount']
        shards_info = client.list_shards(StreamName=st)
        record['shards'] = shards_info['Shards']
        streams.append(record)

streams

[{'StreamName': 'spark-practice',
  'StreamARN': 'arn:aws:kinesis:us-west-2:403644602806:stream/spark-practice',
  'OpenShardCount': 3,
  'shards': [{'ShardId': 'shardId-000000000000',
    'HashKeyRange': {'StartingHashKey': '0',
     'EndingHashKey': '113427455640312821154458202477256070484'},
    'SequenceNumberRange': {'StartingSequenceNumber': '49600006230285193018294798654067776114035449609093906434'}},
   {'ShardId': 'shardId-000000000001',
    'HashKeyRange': {'StartingHashKey': '113427455640312821154458202477256070485',
     'EndingHashKey': '226854911280625642308916404954512140969'},
    'SequenceNumberRange': {'StartingSequenceNumber': '49600006230307493763493329277209311832308097970599886866'}},
   {'ShardId': 'shardId-000000000002',
    'HashKeyRange': {'StartingHashKey': '226854911280625642308916404954512140970',
     'EndingHashKey': '340282366920938463463374607431768211455'},
    'SequenceNumberRange': {'StartingSequenceNumber': '49600006230329794508691859900350847550580

In [6]:
# Get buckets

result = []
client = boto3.client('s3')
bucket_list = client.list_buckets()
if 'Buckets' in bucket_list:
    result = [
     b['Name'] for b in bucket_list['Buckets']
    ]

result

['aws-emr-resources-403644602806-us-west-2',
 'aws-logs-403644602806-us-west-2',
 'elasticbeanstalk-us-west-2-403644602806',
 'flower-357',
 'spark-processed-357']

In [7]:
#Helper functions

def get_line():
    random.seed(datetime.utcnow().microsecond)
    dt = datetime.utcnow()\
        .strftime('%Y-%m-%d %H:%M:%S.%f')
    event = random.choice(['event1', 'event2', 'event3'])
    return '{};{}'.format(dt, event)


def randomize_interval(interval):
    random.seed(datetime.utcnow().microsecond)
    delta = interval + random.uniform(-0.1, 0.9)
    if delta <= 0:
        delta = interval
    return delta

In [8]:
client = boto3.client('kinesis', region_name='us-west-2')

for i in range(50):
    line = get_line()
    payload = {
        'value': line,
        'timestamp': str(datetime.utcnow()),
        'id': str(uuid.uuid4())
    }

    r = client.put_record(
        StreamName='spark-practice',
        Data=json.dumps(payload),
        PartitionKey=str(uuid.uuid4())
    )
    print('Record stored in shard {}'.format(r['ShardId']))
    time.sleep(randomize_interval(1))

Record stored in shard shardId-000000000000
Record stored in shard shardId-000000000001
Record stored in shard shardId-000000000002
Record stored in shard shardId-000000000001
Record stored in shard shardId-000000000002
Record stored in shard shardId-000000000000
Record stored in shard shardId-000000000000
Record stored in shard shardId-000000000001
Record stored in shard shardId-000000000001
Record stored in shard shardId-000000000000
Record stored in shard shardId-000000000001
Record stored in shard shardId-000000000002
Record stored in shard shardId-000000000002
Record stored in shard shardId-000000000001
Record stored in shard shardId-000000000000
Record stored in shard shardId-000000000000
Record stored in shard shardId-000000000002
Record stored in shard shardId-000000000002
Record stored in shard shardId-000000000002
Record stored in shard shardId-000000000000
Record stored in shard shardId-000000000000
Record stored in shard shardId-000000000000
Record stored in shard shardId-0