In [1]:
import os
import io
import sys
import json
import boto3
import argparse
import datetime as dt
import pandas as pd
import numpy as np

In [2]:
kinesis_stream_name = 'customer_churn_stream'
region = 'us-east-1'
datafile = 'churn.txt'

In [22]:
!aws kinesis create-stream --stream-name customer_churn_stream --shard-count 1 --region us-east-1

In [3]:
# Generates a list of records to be sent to kinesis
def generate_kinesis_record(datafile):
    df = pd.read_csv(datafile, header='infer')
    records = json.loads(df.to_json(orient='records'))
    return records

In [5]:
session = boto3.Session()
kinesis_client = session.client('kinesis', region)

In [17]:
for row in generate_kinesis_record(datafile):
    data = json.dumps(row)
    stream_record = [{'Data': bytes(data, 'utf-8'), 'PartitionKey': 'partition_key'}]
    # print(stream_record)
    kinesis_client.put_records(StreamName=kinesis_stream_name, Records=stream_record)

In [18]:
response = kinesis_client.describe_stream(StreamName=stream_name)
response

{'StreamDescription': {'StreamName': 'customer_churn_stream',
  'StreamARN': 'arn:aws:kinesis:us-east-1:684423739646:stream/customer_churn_stream',
  'StreamStatus': 'ACTIVE',
  'Shards': [{'ShardId': 'shardId-000000000000',
    'HashKeyRange': {'StartingHashKey': '0',
     'EndingHashKey': '340282366920938463463374607431768211455'},
    'SequenceNumberRange': {'StartingSequenceNumber': '49621001703233331718298631089606496077201973825501134850'}}],
  'HasMoreShards': False,
  'RetentionPeriodHours': 24,
  'StreamCreationTimestamp': datetime.datetime(2021, 8, 11, 20, 25, 1, tzinfo=tzlocal()),
  'EnhancedMonitoring': [{'ShardLevelMetrics': []}],
  'EncryptionType': 'NONE'},
 'ResponseMetadata': {'RequestId': 'fd86d452-c8c4-f7a5-a408-ef40aa6e9500',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'fd86d452-c8c4-f7a5-a408-ef40aa6e9500',
   'x-amz-id-2': 'b/GoV9+fwEIrp6Ynxbm+hZy/EPCG3NwdZz8FdVBK8cYkQWFY+XIm8Y6JvoTtfV5PQMoGUyJcrhiwLgf00aQFJe683rd86hDD',
   'date': 'Wed, 11 Aug 

In [19]:
shard_id = response['StreamDescription']['Shards'][0]['ShardId']
shard_iterator = kinesis_client.get_shard_iterator(StreamName=stream_name,
                                                      ShardId=shard_id,
                                                      ShardIteratorType='TRIM_HORIZON')
shard_iterator

{'ShardIterator': 'AAAAAAAAAAGI71aLMAU9jdGaEMjNaHpPKLjNNKPxg9ieFviSk0DXDc0lZt5wTqIdhVeo3KxvicUd3Qms5tpcLVi4DjdoPwd9Gv5z6EoMyLSnNWAhrm3NJvMCFM74us1+CwFqyvVC93IJ8ENFIIhO8oeDXEuEglFtQLl3YdQvCRSB1jlWxD+KIx4wzKLIy/FZs1Ub4wQkPmADSdxYd/tvn9VO2hbuEDQayYaDjRVceZ1CLqpHlPXAHJ7ROV7IxA0eetkm1GQeCKQ=',
 'ResponseMetadata': {'RequestId': 'd892c731-3bcd-1cca-811c-fc2559677e6f',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'd892c731-3bcd-1cca-811c-fc2559677e6f',
   'x-amz-id-2': 'eH9FmPwMMB7K9DKuG2ZZ44CO5TtB7Xn514amB057B5UqgP4Ssqr3ZwLD/8MJHhwCwzu+BDr6HmBbZfS6EXCCrWi8i/ESlA76',
   'date': 'Wed, 11 Aug 2021 20:53:40 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '288'},
  'RetryAttempts': 0}}

In [20]:
my_shard_iterator = shard_iterator['ShardIterator']

record_response = kinesis_client.get_records(ShardIterator=my_shard_iterator, Limit=2)
record_response['Records']

[{'SequenceNumber': '49621001703233331718298631092725524691807791175838990338',
  'ApproximateArrivalTimestamp': datetime.datetime(2021, 8, 11, 20, 42, 57, 846000, tzinfo=tzlocal()),
  'Data': b'{"State": "KS", "Account Length": 128, "Area Code": 415, "Phone": "382-4657", "Int\'l Plan": "no", "VMail Plan": "yes", "VMail Message": 25, "Day Mins": 265.1, "Day Calls": 110, "Day Charge": 45.07, "Eve Mins": 197.4, "Eve Calls": 99, "Eve Charge": 16.78, "Night Mins": 244.7, "Night Calls": 91, "Night Charge": 11.01, "Intl Mins": 10.0, "Intl Calls": 3, "Intl Charge": 2.7, "CustServ Calls": 1, "Churn?": "False."}',
  'PartitionKey': 'partition_key'},
 {'SequenceNumber': '49621001703233331718298631092726733617627405805013696514',
  'ApproximateArrivalTimestamp': datetime.datetime(2021, 8, 11, 20, 42, 57, 857000, tzinfo=tzlocal()),
  'Data': b'{"State": "OH", "Account Length": 107, "Area Code": 415, "Phone": "371-7191", "Int\'l Plan": "no", "VMail Plan": "yes", "VMail Message": 26, "Day Mins": 161