# Kinesis Data Analytics for SQL Applications

With Amazon Kinesis Data Analytics for SQL Applications, you can process and analyze streaming data using standard SQL. The service enables you to quickly author and run powerful SQL code against streaming sources to perform time series analytics, feed real-time dashboards, and create real-time metrics.

The service supports ingesting data from Amazon Kinesis Data Streams and Amazon Kinesis Data Firehose streaming sources. Then, you author your SQL code using the interactive editor and test it with live streaming data. You can also configure destinations where you want Kinesis Data Analytics to send the results.

<img src="img/use_case_1_analytics.png" width="80%" align="left">

# Create AWS Lambda Function as Kinesis Data Analytics Destination

Kinesis Data Analytics supports Amazon Kinesis Data Firehose, AWS Lambda, and Amazon Kinesis Data Streams as destinations. Let's create a Lambda function to publish our SQL application results as custom metrics to CloudWatch Metrics.

In [1]:
import boto3
import sagemaker
import pandas as pd
import json

sess   = sagemaker.Session()
bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = boto3.Session().region_name

iam = boto3.Session().client(service_name='iam', region_name=region)
sts = boto3.Session().client(service_name='sts', region_name=region)
account_id = sts.get_caller_identity()['Account']

lam = boto3.Session().client(service_name='lambda', region_name=region)

## Retrieve AWS Lambda Function Name

In [2]:
%store -r lambda_fn_name

In [3]:
try:
    lambda_fn_name
except NameError:
    print('+++++++++++++++++++++++++++++++')
    print('[ERROR] Please run all previous notebooks in this section before you continue.')
    print('+++++++++++++++++++++++++++++++')

In [4]:
print(lambda_fn_name)

DeliverKinesisAnalyticsToCloudWatch


## Check IAM Roles Are In Place

In [5]:
%store -r iam_lambda_role_name

In [6]:
try:
    iam_lambda_role_name
except NameError:
    print('+++++++++++++++++++++++++++++++')
    print('[ERROR] Please run all previous notebooks in this section before you continue.')
    print('+++++++++++++++++++++++++++++++')

In [7]:
print(iam_lambda_role_name)

DSOAWS_Lambda


In [8]:
%store -r iam_lambda_role_passed

In [9]:
try:
    iam_lambda_role_passed
except NameError:
    print('+++++++++++++++++++++++++++++++')
    print('[ERROR] Please run all previous notebooks in this section before you continue.')
    print('+++++++++++++++++++++++++++++++')

In [10]:
print(iam_lambda_role_passed)

True


In [11]:
if not iam_lambda_role_passed:
    print('+++++++++++++++++++++++++++++++')
    print('[ERROR] Please run all previous notebooks in this section before you continue.')
    print('+++++++++++++++++++++++++++++++')
else:
    print('[OK]')

[OK]


In [12]:
%store -r iam_role_lambda_arn

In [13]:
try:
    iam_role_lambda_arn
except NameError:
    print('+++++++++++++++++++++++++++++++')
    print('[ERROR] Please run all previous notebooks in this section before you continue.')
    print('+++++++++++++++++++++++++++++++')

In [14]:
print(iam_role_lambda_arn)

arn:aws:iam::146478650644:role/DSOAWS_Lambda


# Review Lambda Function Code 

In [15]:
!pygmentize src/lambda_function.py

[34mfrom[39;49;00m [04m[36m__future__[39;49;00m [34mimport[39;49;00m print_function
[34mimport[39;49;00m [04m[36mboto3[39;49;00m
[34mimport[39;49;00m [04m[36mbase64[39;49;00m

[34mimport[39;49;00m [04m[36msys[39;49;00m
[34mimport[39;49;00m [04m[36mlogging[39;49;00m
[34mimport[39;49;00m [04m[36mtraceback[39;49;00m
[34mimport[39;49;00m [04m[36mjson[39;49;00m

logger = logging.getLogger()
logger.setLevel(logging.INFO)

client = boto3.client([33m'[39;49;00m[33mcloudwatch[39;49;00m[33m'[39;49;00m)

[34mdef[39;49;00m [32mlambda_handler[39;49;00m(event, context):
    output = []
    success = [34m0[39;49;00m
    failure = [34m0[39;49;00m
    [34mfor[39;49;00m record [35min[39;49;00m event[[33m'[39;49;00m[33mrecords[39;49;00m[33m'[39;49;00m]:
        [34mtry[39;49;00m:
            [37m#logger.info(f'event: {event}')[39;49;00m
            payload = base64.b64decode(record[[33m'[39;49;00m[33mdata[39;49;00m

# Zip The Function Code

In [16]:
!zip src/DeliverKinesisAnalyticsToCloudWatch.zip src/lambda_function.py

  adding: src/lambda_function.py (deflated 63%)


# Load the .zip File as Binary Code

In [17]:
with open('src/DeliverKinesisAnalyticsToCloudWatch.zip', 'rb') as f: 
    code = f.read()

# Create The Lambda Function

In [18]:
from botocore.exceptions import ClientError

try: 
    response = lam.create_function(
        FunctionName='{}'.format(lambda_fn_name),
        Runtime='python2.7',
        Role='{}'.format(iam_role_lambda_arn),
        Handler='src/lambda_function.lambda_handler',
        Code={
            'ZipFile': code
        },
        Description='Deliver output records from Kinesis Analytics application to CloudWatch.',
        Timeout=60,
        MemorySize=128,
        Publish=True
    )
    print('Lambda Function {} successfully created.'.format(lambda_fn_name))

except ClientError as e:
    if e.response['Error']['Code'] == 'ResourceConflictException':
        print('Lambda Function {} already exists. This is OK.'.format(lambda_fn_name))
    else:
        print('Error: {}'.format(e))

Lambda Function DeliverKinesisAnalyticsToCloudWatch successfully created.


In [19]:
response = lam.get_function(FunctionName=lambda_fn_name)
print(json.dumps(response, indent=4, sort_keys=True, default=str))

{
    "Code": {
        "Location": "https://prod-04-2014-tasks.s3.us-east-1.amazonaws.com/snapshots/146478650644/DeliverKinesisAnalyticsToCloudWatch-f6b92929-29a0-4ec4-8258-bce0f3dc11e8?versionId=emwVN.RhiYTjR_Da543taGCPO_OeTLxp&X-Amz-Security-Token=IQoJb3JpZ2luX2VjENz%2F%2F%2F%2F%2F%2F%2F%2F%2F%2FwEaCXVzLWVhc3QtMSJHMEUCIQDYtC8wRWhCx%2FZOiga%2FOjP%2F%2F9rqlu0nBs63B73XXkLMmgIgRQP8pr5NCvdhwyxdbdKUojQlbqX%2FBM%2Br7DQqKIjcTbkqvQMI9P%2F%2F%2F%2F%2F%2F%2F%2F%2F%2FARAAGgw3NDk2Nzg5MDI4MzkiDKVoE23hqXjrrDAuxCqRA4wZWz58emN2l1677LZi%2F0QFWH%2FmoD%2FgXgCM8KK36uKDxpKXSZHF5cWnh8688aae2W6c2uMlLi3piLc2InFh%2Fw9HM7ILefMPiqR%2F%2FXvt8mh%2FezzzOP1tskqe6EsFu6bringwJQEzeIN1lBzTkTDA4Cktk0huF4f1V0yI8Xj%2FHw3%2FsxcgrOpBfL4SyFxI4CA4jgktxirdGx8guP3dF%2FWK%2FsWadI7j9Zi4Qyq9wRoRfeLyd2m1CvHzdeTDU0cn0gjbokTEKxEdyErV%2FEpoUfQpnmxcydvhSc%2BpytFtowaVTdINOtw0JAtNdBCJwXAObBSJwIoDZ8imitziuWIFbqWcU2xKKOwC20mF1nS6Lt9io3uHU%2B2u3GVRpcaYZ69mU6p9V8Gqu9pR6c9CYoY8DdCCyudhr7kNQk0pdAeiqmYZAJCjOBDRoPw3O2e%2BNXwv0d4O1ZwJNOLtdH6ho0z

In [20]:
response = lam.get_function(FunctionName=lambda_fn_name)

lambda_fn_arn = response['Configuration']['FunctionArn']
print(lambda_fn_arn)

arn:aws:lambda:us-east-1:146478650644:function:DeliverKinesisAnalyticsToCloudWatch


In [21]:
%store lambda_fn_arn

Stored 'lambda_fn_arn' (str)


# Review Lambda Function

In [22]:
from IPython.core.display import display, HTML
        
display(HTML('<b>Review <a target="blank" href="https://console.aws.amazon.com/lambda/home?region={}#/functions/{}"> Lambda Function</a></b>'.format(region, lambda_fn_name)))


# Store Variables for Next Notebooks

In [23]:
%store

Stored variables and their in-db values:
auto_ml_job_name                                      -> 'automl-dm-26-16-09-46'
autopilot_endpoint_name                               -> 'automl-dm-ep-26-16-37-28'
autopilot_train_s3_uri                                -> 's3://sagemaker-us-east-1-146478650644/data/amazon
balance_dataset                                       -> True
experiment_name                                       -> 'Amazon-Customer-Reviews-BERT-Experiment-160114592
firehose_arn                                          -> 'arn:aws:firehose:us-east-1:146478650644:deliverys
firehose_name                                         -> 'dsoaws-kinesis-data-firehose'
iam_kinesis_role_name                                 -> 'DSOAWS_Kinesis'
iam_kinesis_role_passed                               -> True
iam_lambda_role_name                                  -> 'DSOAWS_Lambda'
iam_lambda_role_passed                                -> True
iam_role_kinesis_arn                             

In [None]:
%%javascript
Jupyter.notebook.save_checkpoint();
Jupyter.notebook.session.delete();