Solution for ACloudGuru exercise 'Streaming Data Collection Lab': Using Kinesis streams to ingest data and dump it to S3, using Kinesis Streams so that we would be able to add some processing steps easily (as opposed to simply using firehose)

In [145]:
import binascii
import os
import io
import inspect
import zipfile
import requests
import boto3
import time
import random
import json
import time

from botocore.exceptions import ClientError

In [146]:
acct = boto3.client('sts').get_caller_identity().get('Account')
print("Running on account "+acct)

Running on account 549935274340


#### Define a method that generates random 'user data':

In [147]:
def generateUser():
    data = requests.get('https://randomuser.me/api').json()
    user = {}
    user['FIRST']=data['results'][0]['name']['first']
    user['LAST']=data['results'][0]['name']['last']
    user['AGE']=data['results'][0]['dob']['age']
    user['GENDER']=data['results'][0]['gender']
    user['LATITUDE']=data['results'][0]['location']['coordinates']['latitude']
    user['LONGITUDE']=data['results'][0]['location']['coordinates']['longitude']
    return user

generateUser()

{'FIRST': 'Meral',
 'LAST': 'Numanoğlu',
 'AGE': 66,
 'GENDER': 'female',
 'LATITUDE': '74.5832',
 'LONGITUDE': '34.7845'}

#### Create the S3 bucket in which we want the result to be output:

In [148]:
bucketPrefix="kinesis-example-" #Change the random part to make your bucket name unique

sess = boto3.session.Session()
s3client = boto3.client('s3')

bucketName = ""

for bucket in boto3.resource('s3').buckets.all():
    if bucket.name.startswith(bucketPrefix):
        bucketName = bucket.name

if bucketName == "":
    bucketName = bucketPrefix + binascii.hexlify(os.urandom(6)).decode()
    print("Creating "+bucketName)
    s3client.create_bucket(Bucket=bucketName,
                           CreateBucketConfiguration={'LocationConstraint': sess.region_name})

bucketArn = "arn:aws:s3:::"+bucketName
    
print("Using S3 bucket "+bucketArn)

Using S3 bucket arn:aws:s3:::kinesis-example-d7064c509fb6


#### Create a new Kinesis stream with a semi-random name:

In [149]:
client = boto3.client('kinesis')

streamName = 'example_' + binascii.hexlify(os.urandom(2)).decode()

client.create_stream(
    StreamName=streamName,
    ShardCount=1
)

print("Creating Stream "+streamName)

response = client.describe_stream(StreamName=streamName)
status = response['StreamDescription']['StreamStatus']

while status == 'CREATING':
    time.sleep(1)
    response = client.describe_stream(StreamName=streamName)
    status = response['StreamDescription']['StreamStatus']

streamArn = response['StreamDescription']['StreamARN']

print("Stream ARN is "+streamArn)

Creating Stream example_da23
Stream ARN is arn:aws:kinesis:eu-central-1:549935274340:stream/example_da23


#### Set up Kinesis Firehose

FIrst, define the lambda that will filter the entries:

In [150]:
lambda_header="""
import base64
import gzip
import io
import json
import zlib

"""

import base64
import gzip
import io
import json
import zlib

def handler(event, context):
  output = []
  for record in event['records']:
    compressed_payload = base64.b64decode(record['data'])
    uncompressed_payload = gzip.decompress(compressed_payload)
    print('uncompressed_payload',uncompressed_payload)
    payload = json.loads(uncompressed_payload)
    output_record = {
        'recordId': record['recordId'],
        'result': 'Ok',
        'data': base64.b64encode(json.dumps(payload).encode('utf-8')).decode('utf-8')
    }
    output.append(output_record)
  print('Successfully processed {} records.'.format(len(event['records'])))
  return {'records': output}

In [151]:
lambdaRole={
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

lambdaRoleName="FirehoseLambdaRole"

iam = boto3.client('iam')

try:
    r = iam.create_role(RoleName=lambdaRoleName, 
                Path="/",
                Description="Lambda role for Firehose filter",
                AssumeRolePolicyDocument=json.dumps(lambdaRole))
    print("Role added.")
except ClientError as e:
    if e.response['Error']['Code'] != 'EntityAlreadyExists':
        raise
    print("Role already exists.")

lambdaPolicyArn = 'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole'
r = iam.attach_role_policy(RoleName=lambdaRoleName, 
                       PolicyArn=lambdaPolicyArn)
print("Lambda policy attached.")

lambdaRoleArn = iam.get_role(RoleName=lambdaRoleName)['Role']['Arn']
print("Lambda Role's ARN is "+roleArn)

Role already exists.
Lambda policy attached.
Lambda Role's ARN is arn:aws:iam::549935274340:role/ExampleFirehoseRole


In [152]:
awslambda=boto3.client('lambda')
lambdaName = "FilterFirehoseMessages"

lambda_zip = io.BytesIO()
fileInfo = zipfile.ZipInfo("lambda.py")
fileInfo.external_attr = 0o777 << 16

with zipfile.ZipFile(lambda_zip, mode="w",compression=zipfile.ZIP_DEFLATED) as zf:
    zf.writestr(fileInfo, lambda_header+inspect.getsource(handler))

print(str(len(lambda_zip.getvalue()))+" bytes in Lambda ZIP file")

try:
    response = awslambda.create_function(
        FunctionName=lambdaName,
        Runtime='python3.6',
        Role=lambdaRoleArn,
        Handler='lambda.handler',
        Code={'ZipFile':lambda_zip.getvalue()})
    print("Lambda uploaded.")
except ClientError as e:
    if e.response['Error']['Code'] != 'ResourceConflictException':
        raise
    print("Lambda already exists.")
    awslambda.update_function_code(
        FunctionName=lambdaName,
        ZipFile=lambda_zip.getvalue())
    response = awslambda.publish_version(
        FunctionName=lambdaName
    )
    
lambdaArn=response['FunctionArn']
print("Lambda's ARN is "+lambdaArn)

794 bytes in Lambda ZIP file
Lambda uploaded.
Lambda's ARN is arn:aws:lambda:eu-central-1:549935274340:function:FilterFirehoseMessages


Configure Logging:

In [153]:
logGroup="kinesisExample"
logStream="kinesisFirehose"
logArn="arn:aws:logs:region:account-id:log-group:"+logGroup+":log-stream:"+logStream

Define a role for the delivery stream:

In [154]:
roleName="ExampleFirehoseRole"
role={
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "Service": "firehose.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

policyName="ExampleFirehosePolicy"
policy={
    "Version": "2012-10-17",  
    "Statement":
    [    
        {      
            "Effect": "Allow",      
            "Action": [
                "s3:AbortMultipartUpload",
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:ListBucketMultipartUploads",
                "s3:PutObject"
            ],      
            "Resource": [        
                bucketArn,
                bucketArn+"/*"
            ]    
        },        
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords"
            ],
            "Resource": streamArn
        },
        {
           "Effect": "Allow",
           "Action": [
               "logs:PutLogEvents"
           ],
           "Resource": [
               logArn
           ]
        },
        {
           "Effect": "Allow", 
           "Action": [
               "lambda:InvokeFunction", 
               "lambda:GetFunctionConfiguration" 
           ],
           "Resource": [
               lambdaArn
           ]
        }
    ]
}

iam = boto3.client('iam')

try:
    r = iam.create_role(RoleName=roleName, 
                Path="/",
                Description="Example role for Kinesis Firehose",
                AssumeRolePolicyDocument=json.dumps(role))
    print("Role added.")
except ClientError as e:
    if e.response['Error']['Code'] != 'EntityAlreadyExists':
        raise
    print("Role already exists.")

policyArn="arn:aws:iam::"+str(acct)+":policy/"+policyName

try:
    r = iam.create_policy(PolicyName=policyName, 
                Description="Example policy for Kinesis Firehose",
                PolicyDocument=json.dumps(policy))
    print("Policy added.")
except ClientError as e:
    if e.response['Error']['Code'] != 'EntityAlreadyExists':
        raise
    print("Policy already exists.")
    r = iam.create_policy_version(
                PolicyArn=policyArn,
                PolicyDocument=json.dumps(policy),
                SetAsDefault=True)
    for version in iam.list_policy_versions(PolicyArn=policyArn)['Versions']:
        if not version['IsDefaultVersion']:
            iam.delete_policy_version(
                PolicyArn=policyArn,
                VersionId=version['VersionId']
            )
    print("Policy updated.")

iam.attach_role_policy(
    PolicyArn=policyArn,
    RoleName=roleName
)

roleArn = iam.get_role(RoleName=roleName)['Role']['Arn']
print("Role's ARN is "+roleArn)

Role added.
Policy added.
Role's ARN is arn:aws:iam::549935274340:role/ExampleFirehoseRole


Create the delivery stream:

In [156]:
deliveryStreamName = 'delivery_stream_' + binascii.hexlify(os.urandom(2)).decode()

kfhClient = boto3.client('firehose')

response = kfhClient.create_delivery_stream(
    DeliveryStreamName=deliveryStreamName,
    DeliveryStreamType='KinesisStreamAsSource',
    KinesisStreamSourceConfiguration={
        'KinesisStreamARN': streamArn,
        'RoleARN': roleArn
    },
    ExtendedS3DestinationConfiguration={
        'RoleARN': roleArn,
        'BucketARN': bucketArn,
        'Prefix': 'Users/',
        'ErrorOutputPrefix': 'error',
        'CompressionFormat': 'UNCOMPRESSED',
        'EncryptionConfiguration': {
            'NoEncryptionConfig': 'NoEncryption'
        },
        'CloudWatchLoggingOptions': {
            'Enabled': True,
            'LogGroupName': logGroup,
            'LogStreamName': logStream
        },
        'ProcessingConfiguration': {
            'Enabled': False,
            'Processors': [
                {
                    'Type': 'Lambda',
                    'Parameters': [
                        {
                            'ParameterName': 'LambdaArn',
                            'ParameterValue': lambdaArn
                        },
                    ]
                },
            ]
        },
        'S3BackupMode': 'Disabled'
    }
        
)

print("Creating delivery stream "+deliveryStreamName)

response = kfhClient.describe_delivery_stream(DeliveryStreamName=deliveryStreamName)
status = response['DeliveryStreamDescription']['DeliveryStreamStatus']

while status == 'CREATING':
    time.sleep(1)
    response = kfhClient.describe_stream_consumer(StreamARN=streamArn, ConsumerName=consumerName)
    status = response['DeliveryStreamDescription']['DeliveryStreamStatus']

deliveryStreamArn = response['DeliveryStreamDescription']['DeliveryStreamARN']

print("Firehose Delivery Stream ARN is "+deliveryStreamArn)

Creating delivery stream delivery_stream_0dd4
Firehose Delivery Stream ARN is arn:aws:firehose:eu-central-1:549935274340:deliverystream/delivery_stream_0dd4


#### Fill it with a hundred random messages:

In [157]:
shards = set()

for i in range(0,101):
    user = generateUser()
    shard = client.put_record(
        StreamName=streamName,
        Data=json.dumps(user),
        PartitionKey=user['LAST']
    )
    shards.add(shard['ShardId'])

print("Generated "+str(i)+" messages in shard "+str(shards))

Generated 100 messages in shard {'shardId-000000000000'}


In [None]:
#### Clean up any object we have created:

In [144]:
try:
    awslambda.delete_function(FunctionName=lambdaName)
except ClientError as e: 
    if e.response['Error']['Code'] != 'ResourceNotFoundException':
        raise
        
try:
    kfhClient.delete_delivery_stream(
        DeliveryStreamName=streamName)
except ClientError as e:
    if e.response['Error']['Code'] != 'ResourceNotFoundException':
        raise

try:
    client.delete_stream(
        StreamName=streamName,
        EnforceConsumerDeletion=True)
except ClientError as e:
    if e.response['Error']['Code'] != 'ResourceNotFoundException':
        raise
        
try:
    iam.detach_role_policy(RoleName=roleName, PolicyArn=policyArn)
    iam.delete_role(RoleName=roleName)
    for version in iam.list_policy_versions(PolicyArn=policyArn)['Versions']:
        if not version['IsDefaultVersion']:
            iam.delete_policy_version(
                PolicyArn=policyArn,
                VersionId=version['VersionId']
            )
    iam.delete_policy(PolicyArn=policyArn)
except ClientError as e:
    if e.response['Error']['Code'] != 'NoSuchEntity':
        raise       
        
try:
    iam.detach_role_policy(RoleName=lambdaRoleName, PolicyArn=lambdaPolicyArn)
    iam.delete_role(RoleName=lambdaRoleName)
except ClientError as e:
    if e.response['Error']['Code'] != 'NoSuchEntity':
        raise

NameError: name 'lambdaPolicyArn' is not defined