# Continuous Analytics and Machine Learning over Streaming Data

Streaming technologies provide you with the tools to collect, process, and analyze data streams in real time. AWS offers a wide range of streaming technology options including Amazon Managed Streaming for Apache Kafka (Amazon MSK), and the [Amazon Kinesis](https://aws.amazon.com/kinesis/) family of services. 

With Kinesis Data Firehose, you can prepare and load the data continuously to a destination of your choice. With Kinesis Data Analytics, you can process and analyze the data as it arrives. And with Kinesis Data Streams, you c an manage the ingest of data streams for custom applications. 

In this section, we move from our customer reviews training dataset into a real-world scenario. Customer feedback about products appear in all of a company's social media channels, on partner websites, in customer support messages etc. We need to capture this valuable customer sentiment about our products as quickly as possible to spot trends and react fast.

We will focus on analyzing a continuous stream of product review messages that we collect from all available online channels. 

![](data/readme_pics/streaming-architecture.png)

In a first step, we analyze the sentiment of the customer, so we can identify which customers  might need high-priority attention. 

Next, we run continuous streaming analytics over the incoming review messages to capture the average sentiment per product category. We visualize the continuous average sentiment in a metrics dashboard for the line of business owners. The line of business owners can now detect sentiment trends quickly,  and take action. 

We also calculate an anomaly score of the incoming messages to detect anomalies in the data schema or data values. In case of a rising anomaly score, we can alert the application developers in charge to investigate the root cause. 

As a last metric, we also calculate a continuous approximate count of the received messages. This number of online messages could be used by the digital marketing team to measure effectiveness of social media campaigns.

## _Kinesis Data Firehose vs. Kinesis Data Streams_

### Kinesis Data Firehose
* Amazon Kinesis Data Firehose is the easiest way to load streaming data into data stores and analytics tools. 
* It can capture, transform, and load streaming data into Amazon S3, Amazon Redshift, Amazon Elasticsearch Service, and Splunk, enabling near real-time analytics with existing business intelligence tools and dashboards youâ€™re already using today. 
* It is a fully managed service that automatically scales to match the throughput of your data and requires no ongoing administration. It can also batch, compress, and encrypt the data before loading it, minimizing the amount of storage used at the destination and increasing security.

### Kinesis Data Streams
* Amazon Kinesis Data Streams enables you to build custom applications that process or analyze streaming data for specialized needs. 
* You can continuously add various types of data such as clickstreams, application logs, and social media to an Amazon Kinesis data stream from hundreds of thousands of sources. 
* Within seconds, the data will be available for your Amazon Kinesis Applications to read and process from the stream.

# Setup IAM for Kinesis

In [None]:
import boto3
import sagemaker
import pandas as pd

sess = sagemaker.Session()
bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = boto3.Session().region_name 

sts = boto3.Session().client(service_name="sts", region_name=region)
iam = boto3.Session().client(service_name="iam", region_name=region)

# Create Kinesis Role
iam_kinesis_role_name = "DSOAWS_Kinesis"
iam_kinesis_role_passed = False
assume_role_policy_doc = {
    "Version": "2012-10-17",
    "Statement": [
        {"Effect": "Allow", "Principal": {"Service": "kinesis.amazonaws.com"}, "Action": "sts:AssumeRole"},
        {"Effect": "Allow", "Principal": {"Service": "firehose.amazonaws.com"}, "Action": "sts:AssumeRole"},
        {"Effect": "Allow", "Principal": {"Service": "kinesisanalytics.amazonaws.com"}, "Action": "sts:AssumeRole"},
    ],
}
import json
import time

from botocore.exceptions import ClientError

try:
    iam_role_kinesis = iam.create_role(
        RoleName=iam_kinesis_role_name,
        AssumeRolePolicyDocument=json.dumps(assume_role_policy_doc),
        Description="DSOAWS Kinesis Role",
    )
    print("Role succesfully created.")
    iam_kinesis_role_passed = True
except ClientError as e:
    if e.response["Error"]["Code"] == "EntityAlreadyExists":
        iam_role_kinesis = iam.get_role(RoleName=iam_kinesis_role_name)
        print("Role already exists. That is OK.")
        iam_kinesis_role_passed = True
    else:
        print("Unexpected error: %s" % e)

time.sleep(30)

iam_role_kinesis_name = iam_role_kinesis["Role"]["RoleName"]
print("Role Name: {}".format(iam_role_kinesis_name))
iam_role_kinesis_arn = iam_role_kinesis["Role"]["Arn"]
print("Role ARN: {}".format(iam_role_kinesis_arn))
account_id = sts.get_caller_identity()["Account"]

# Stream Name
stream_name = "dsoaws-kinesis-data-stream"
# Specify Firehose Name
firehose_name = "dsoaws-kinesis-data-firehose"
# Specify Lambda Function Name
lambda_fn_name_cloudwatch = "DeliverKinesisAnalyticsToCloudWatch"
lambda_fn_name_invoke_sm_endpoint = "InvokeSageMakerEndpointFromKinesis"
lambda_fn_name_sns = "PushNotificationToSNS"


In [None]:
# Create and Update Policy

kinesis_policy_doc = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:AbortMultipartUpload",
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:ListBucketMultipartUploads",
                "s3:PutObject",
            ],
            "Resource": ["arn:aws:s3:::{}".format(bucket), "arn:aws:s3:::{}/*".format(bucket)],
        },
        {
            "Effect": "Allow",
            "Action": ["logs:PutLogEvents"],
            "Resource": ["arn:aws:logs:{}:{}:log-group:/*".format(region, account_id)],
        },
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:*",
            ],
            "Resource": ["arn:aws:kinesis:{}:{}:stream/{}".format(region, account_id, stream_name)],
        },
        {
            "Effect": "Allow",
            "Action": [
                "firehose:*",
            ],
            "Resource": ["arn:aws:firehose:{}:{}:deliverystream/{}".format(region, account_id, firehose_name)],
        },
        {
            "Effect": "Allow",
            "Action": [
                "kinesisanalytics:*",
            ],
            "Resource": ["*"],
        },
        {
            "Sid": "UseLambdaFunction",
            "Effect": "Allow",
            "Action": ["lambda:InvokeFunction", "lambda:GetFunctionConfiguration"],
            "Resource": ["*"],
        },
        {"Effect": "Allow", "Action": "iam:PassRole", "Resource": ["arn:aws:iam::*:role/service-role/kinesis*"]},
    ],
}

print(json.dumps(kinesis_policy_doc, indent=4, sort_keys=True, default=str))
response = iam.put_role_policy(
    RoleName=iam_role_kinesis_name, PolicyName="DSOAWS_KinesisPolicy", PolicyDocument=json.dumps(kinesis_policy_doc)
)

time.sleep(30)

print(json.dumps(response, indent=4, sort_keys=True, default=str))

In [None]:
# Create AWS Lambda IAM Role
from botocore.exceptions import ClientError

iam_lambda_role_name = "DSOAWS_Lambda"
iam_lambda_role_passed = False
assume_role_policy_doc = {
    "Version": "2012-10-17",
    "Statement": [
        {"Effect": "Allow", "Principal": {"Service": "lambda.amazonaws.com"}, "Action": "sts:AssumeRole"},
        {"Effect": "Allow", "Principal": {"Service": "kinesisanalytics.amazonaws.com"}, "Action": "sts:AssumeRole"},
    ],
}


try:
    iam_role_lambda = iam.create_role(
        RoleName=iam_lambda_role_name,
        AssumeRolePolicyDocument=json.dumps(assume_role_policy_doc),
        Description="DSOAWS Lambda Role",
    )
    print("Role succesfully created.")
    iam_lambda_role_passed = True
except ClientError as e:
    if e.response["Error"]["Code"] == "EntityAlreadyExists":
        iam_role_lambda = iam.get_role(RoleName=iam_lambda_role_name)
        print("Role already exists. This is OK.")
        iam_lambda_role_passed = True
    else:
        print("Unexpected error: %s" % e)

time.sleep(30)

iam_role_lambda_name = iam_role_lambda["Role"]["RoleName"]
print("Role Name: {}".format(iam_role_lambda_name))
iam_role_lambda_arn = iam_role_lambda["Role"]["Arn"]
print("Role ARN: {}".format(iam_role_lambda_arn))

In [None]:
# Create AWS Lambda IAM Policy

lambda_policy_doc = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "UseLambdaFunction",
            "Effect": "Allow",
            "Action": ["lambda:InvokeFunction", "lambda:GetFunctionConfiguration"],
            "Resource": "arn:aws:lambda:{}:{}:function:*".format(region, account_id),
        },
        {"Effect": "Allow", "Action": "cloudwatch:*", "Resource": "*"},
        {"Effect": "Allow", "Action": "sns:*", "Resource": "*"},
        {
            "Effect": "Allow",
            "Action": "logs:CreateLogGroup",
            "Resource": "arn:aws:logs:{}:{}:*".format(region, account_id),
        },
        {"Effect": "Allow", "Action": "sagemaker:InvokeEndpoint", "Resource": "*"},
        {
            "Effect": "Allow",
            "Action": ["logs:CreateLogStream", "logs:PutLogEvents"],
            "Resource": "arn:aws:logs:{}:{}:log-group:/aws/lambda/*".format(region, account_id),
        },
    ],
}

print(json.dumps(lambda_policy_doc, indent=4, sort_keys=True, default=str))
response = iam.put_role_policy(
    RoleName=iam_role_lambda_name, PolicyName="DSOAWS_LambdaPolicy", PolicyDocument=json.dumps(lambda_policy_doc)
)

time.sleep(30)
print(json.dumps(response, indent=4, sort_keys=True, default=str))

# Create Lambda To Invoke Sagemaker

In [None]:
# Review Lambda Function

lambda_fn_name_invoke_ep = "InvokeSageMakerEndpointFromKinesis"
!pygmentize src/invoke_sm_endpoint_from_kinesis.py

# Test the PyTorch Endpoint Similar to How the Lambda Invokes the Endpoint
inputs = [
    {"features": ["I love this product!"]},
    {"features": ["OK, but not great."]},
    {"features": ["This is not the right product."]},
]
from sagemaker.predictor import Predictor
from sagemaker.serializers import JSONLinesSerializer
from sagemaker.deserializers import JSONLinesDeserializer

predictor = Predictor(
    endpoint_name=pytorch_endpoint_name,
    serializer=JSONLinesSerializer(),
    deserializer=JSONLinesDeserializer(),
    sagemaker_session=sess
)

predicted_classes = predictor.predict(inputs)

for predicted_class in predicted_classes:
    print("Predicted class: {}".format(predicted_class))

In [None]:
# Create a .zip file for the Python dependencies (Lambda Layer)
# This requires us to create a directory called `python` for Python environments.

!rm -rf layer/python
!mkdir -p layer/python
!pip install -q --target layer/python sagemaker==2.38.0
!cd layer && zip -q --recurse-paths layer.zip .

with open("layer/layer.zip", "rb") as f:
    layer = f.read()

from botocore.exceptions import ClientError

sagemaker_lambda_layer_name = 'sagemaker-python-sdk-layer'
layer_response = iam.publish_layer_version(
    LayerName=sagemaker_lambda_layer_name,
    Content={"ZipFile": layer},
    Description="Layer with 'pip install sagemaker'",
    CompatibleRuntimes=['python3.9']
)

layer_version_arn = layer_response['LayerVersionArn']
print("Lambda layer {} successfully created with LayerVersionArn {}.".format(sagemaker_lambda_layer_name, layer_version_arn))

In [None]:
# Create a .zip file for Python code
!zip src/InvokeSageMakerEndpointFromKinesis.zip src/invoke_sm_endpoint_from_kinesis.py
with open("src/InvokeSageMakerEndpointFromKinesis.zip", "rb") as f:
    code = f.read()

# Create The Lambda Function

try:
    response = iam.create_function(
        FunctionName="{}".format(lambda_fn_name_invoke_ep),
        Runtime="python3.9",
        Role="{}".format(iam_role_lambda_arn),
        Handler="src/invoke_sm_endpoint_from_kinesis.lambda_handler",
        Code={"ZipFile": code},
        Layers=[
            layer_version_arn
        ],
        Description="Query SageMaker Endpoint for star rating prediction on review input text.",
        # max timeout supported by Firehose is 5min
        Timeout=300,
        MemorySize=128,
        Publish=True,
    )
    print("Lambda Function {} successfully created.".format(lambda_fn_name_invoke_ep))
except ClientError as e:
    if e.response["Error"]["Code"] == "ResourceConflictException":
        response = iam.update_function_code(
            FunctionName="{}".format(lambda_fn_name_invoke_ep), ZipFile=code, Publish=True, DryRun=False
        )
        print("Updating existing Lambda Function {}.  This is OK.".format(lambda_fn_name_invoke_ep))
    else:
        print("Error: {}".format(e))

response = iam.get_function(FunctionName=lambda_fn_name_invoke_ep)

lambda_fn_arn_invoke_ep = response["Configuration"]["FunctionArn"]
print(lambda_fn_arn_invoke_ep)

response = iam.update_function_configuration(
    FunctionName=lambda_fn_name_invoke_ep, Environment={"Variables": {"ENDPOINT_NAME": pytorch_endpoint_name}}
)

# Create a Kinesis Data Firehose Delivery Stream

<img src="../readme_pics/realtime-streaming.png" width="100%" align="left">

In [None]:
firehose = boto3.Session().client(service_name="firehose", region_name=region)
from botocore.exceptions import ClientError

try:
    response = firehose.create_delivery_stream(
        DeliveryStreamName=firehose_name,
        DeliveryStreamType="DirectPut",
        ExtendedS3DestinationConfiguration={
            "RoleARN": iam_role_kinesis_arn,
            "BucketARN": "arn:aws:s3:::{}".format(bucket),
            "Prefix": "kinesis-data-firehose/",
            "ErrorOutputPrefix": "kinesis-data-firehose-error/",
            "BufferingHints": {"SizeInMBs": 1, "IntervalInSeconds": 60}, 
            "CompressionFormat": "UNCOMPRESSED",
            "CloudWatchLoggingOptions": {
                "Enabled": True,
                "LogGroupName": "/aws/kinesisfirehose/dsoaws-kinesis-data-firehose",
                "LogStreamName": "S3Delivery",
            },
            "ProcessingConfiguration": {
                "Enabled": True,
                "Processors": [
                    {
                        "Type": "Lambda",
                        "Parameters": [
                            {
                                "ParameterName": "LambdaArn",
                                "ParameterValue": "{}:$LATEST".format(lambda_fn_arn_invoke_ep),
                            },
                            {"ParameterName": "BufferSizeInMBs", "ParameterValue": "1"},
                            {"ParameterName": "BufferIntervalInSeconds", "ParameterValue": "60"},
                        ],
                    }
                ],
            },
            "S3BackupMode": "Enabled",
            "S3BackupConfiguration": {
                "RoleARN": iam_role_kinesis_arn,
                "BucketARN": "arn:aws:s3:::{}".format(bucket),
                "Prefix": "kinesis-data-firehose-source-record/",
                "ErrorOutputPrefix": "!{firehose:error-output-type}/",
                "BufferingHints": {"SizeInMBs": 1, "IntervalInSeconds": 60},
                "CompressionFormat": "UNCOMPRESSED",
            },
            "CloudWatchLoggingOptions": {
                "Enabled": False,
            },
        },
    )
    print("Delivery stream {} successfully created.".format(firehose_name))
    print(json.dumps(response, indent=4, sort_keys=True, default=str))
except ClientError as e:
    if e.response["Error"]["Code"] == "ResourceInUseException":
        print("Delivery stream {} already exists.".format(firehose_name))
    else:
        print("Unexpected error: %s" % e)

In [None]:
status = ""
while status != "ACTIVE":
    r = firehose.describe_delivery_stream(DeliveryStreamName=firehose_name)
    description = r.get("DeliveryStreamDescription")
    status = description.get("DeliveryStreamStatus")
    time.sleep(5)

print("Delivery Stream {} is active".format(firehose_name))
firehose_arn = r["DeliveryStreamDescription"]["DeliveryStreamARN"]
print(firehose_arn)

# Create a Kinesis Data Stream

In [None]:
kinesis = boto3.Session().client(service_name="kinesis", region_name=region)
shard_count = 2
try:
    response = kinesis.create_stream(StreamName=stream_name, ShardCount=shard_count)
    print("Data Stream {} successfully created.".format(stream_name))
    print(json.dumps(response, indent=4, sort_keys=True, default=str))

except ClientError as e:
    if e.response["Error"]["Code"] == "ResourceInUseException":
        print("Data Stream {} already exists.".format(stream_name))
    else:
        print("Unexpected error: %s" % e)

status = ""
while status != "ACTIVE":
    r = kinesis.describe_stream(StreamName=stream_name)
    description = r.get("StreamDescription")
    status = description.get("StreamStatus")
    time.sleep(5)

print("Stream {} is active".format(stream_name))
stream_response = kinesis.describe_stream(StreamName=stream_name)
print(json.dumps(stream_response, indent=4, sort_keys=True, default=str))
stream_arn = stream_response["StreamDescription"]["StreamARN"]
print(stream_arn)

# Create Lambda Destination CloudWatch

In [None]:
account_id = sts.get_caller_identity()["Account"]
lam = boto3.Session().client(service_name="lambda", region_name=region)

!pygmentize src/deliver_metrics_to_cloudwatch.py
!zip src/DeliverKinesisAnalyticsToCloudWatch.zip src/deliver_metrics_to_cloudwatch.py
with open("src/DeliverKinesisAnalyticsToCloudWatch.zip", "rb") as f:
    code = f.read()

# Create The Lambda Function
try:
    response = lam.create_function(
        FunctionName="{}".format(lambda_fn_name_cloudwatch),
        Runtime="python3.9",
        Role="{}".format(iam_role_lambda_arn),
        Handler="src/deliver_metrics_to_cloudwatch.lambda_handler",
        Code={"ZipFile": code},
        Description="Deliver output records from Kinesis Analytics application to CloudWatch.",
        Timeout=900,
        MemorySize=128,
        Publish=True,
    )
    print("Lambda Function {} successfully created.".format(lambda_fn_name_cloudwatch))

except ClientError as e:
    if e.response["Error"]["Code"] == "ResourceConflictException":
        response = lam.update_function_code(
            FunctionName="{}".format(lambda_fn_name_cloudwatch), ZipFile=code, Publish=True, DryRun=False
        )
        print("Updating existing Lambda Function {}.  This is OK.".format(lambda_fn_name_cloudwatch))
    else:
        print("Error: {}".format(e))

response = lam.get_function(FunctionName=lambda_fn_name_cloudwatch)

lambda_fn_arn_cloudwatch = response["Configuration"]["FunctionArn"]
print(lambda_fn_arn_cloudwatch)


# Create Lambda Destination SNS


In [None]:
sns = boto3.Session().client(service_name="sns", region_name=region)
topics = sns.list_topics()
print(topics)
response = sns.create_topic(
    Name="review_anomaly_scores",
)
print(response)
sns_topic_arn = response["TopicArn"]
print(sns_topic_arn)

!pygmentize src/push_notification_to_sns.py
# Zip the Lambda Function
!zip src/PushNotificationToSNS.zip src/push_notification_to_sns.py

with open("src/PushNotificationToSNS.zip", "rb") as f:
    code = f.read()

try:
    response = lam.create_function(
        FunctionName="{}".format(lambda_fn_name_sns),
        Runtime="python3.9",
        Role="{}".format(iam_role_lambda_arn),
        Handler="src/push_notification_to_sns.lambda_handler",
        Code={"ZipFile": code},
        Description="Deliver output records from Kinesis Analytics application to CloudWatch.",
        Timeout=300,
        MemorySize=128,
        Publish=True,
    )
    print("Lambda Function {} successfully created.".format(lambda_fn_name_sns))

except ClientError as e:
    if e.response["Error"]["Code"] == "ResourceConflictException":
        response = lam.update_function_code(
            FunctionName="{}".format(lambda_fn_name_sns), ZipFile=code, Publish=True, DryRun=False
        )
        print("Updating existing Lambda Function {}.  This is OK.".format(lambda_fn_name_sns))
    else:
        print("Error: {}".format(e))

response = lam.get_function(FunctionName=lambda_fn_name_sns)

lambda_fn_arn_sns = response["Configuration"]["FunctionArn"]
print(lambda_fn_arn_sns)

# Update Lambda Function with SNS Topic ARN
response = lam.update_function_configuration(
    FunctionName=lambda_fn_name_sns, Environment={"Variables": {"SNS_TOPIC_ARN": sns_topic_arn}}
)

# Create Kinesis Data Analytics App

In [None]:
kinesis_analytics = boto3.Session().client(service_name="kinesisanalytics", region_name=region)

# Kinesis Analytics Application Name
kinesis_data_analytics_app_name = "dsoaws-kinesis-data-analytics-sql-app"
in_app_stream_name = "SOURCE_SQL_STREAM_001"  # Default
print(in_app_stream_name)

## Create Application
window_seconds = 5
sql_code = """ \
        CREATE OR REPLACE STREAM "AVG_STAR_RATING_SQL_STREAM" ( \
            avg_star_rating DOUBLE); \
        CREATE OR REPLACE PUMP "AVG_STAR_RATING_SQL_STREAM_PUMP" AS \
            INSERT INTO "AVG_STAR_RATING_SQL_STREAM" \
                SELECT STREAM AVG(CAST("star_rating" AS DOUBLE)) AS avg_star_rating \
                FROM "{}" \
                GROUP BY \
                STEP("{}".ROWTIME BY INTERVAL '{}' SECOND); \
         \
        CREATE OR REPLACE STREAM "ANOMALY_SCORE_SQL_STREAM" (anomaly_score DOUBLE); \
        CREATE OR REPLACE PUMP "ANOMALY_SCORE_STREAM_PUMP" AS \
            INSERT INTO "ANOMALY_SCORE_SQL_STREAM" \
            SELECT STREAM anomaly_score \
            FROM TABLE(RANDOM_CUT_FOREST( \
                CURSOR(SELECT STREAM "star_rating" \
                    FROM "{}" \
            ) \
          ) \
        ); \
         \
        CREATE OR REPLACE STREAM "APPROXIMATE_COUNT_SQL_STREAM" (number_of_distinct_items BIGINT); \
        CREATE OR REPLACE PUMP "APPROXIMATE_COUNT_STREAM_PUMP" AS \
            INSERT INTO "APPROXIMATE_COUNT_SQL_STREAM" \
            SELECT STREAM number_of_distinct_items \
            FROM TABLE(COUNT_DISTINCT_ITEMS_TUMBLING( \
                CURSOR(SELECT STREAM "review_id" FROM "{}"), \
                'review_id', \
                {} \
              ) \
        ); \
    """.format(
    in_app_stream_name, in_app_stream_name, window_seconds, in_app_stream_name, in_app_stream_name, window_seconds
)

print(sql_code)

In [None]:
try:
    response = kinesis_analytics.create_application(
        ApplicationName=kinesis_data_analytics_app_name,
        Inputs=[
            {
                "NamePrefix": "SOURCE_SQL_STREAM",
                "KinesisFirehoseInput": {
                    "ResourceARN": "{}".format(firehose_arn),
                    "RoleARN": "{}".format(iam_role_kinesis_arn),
                },
                "InputProcessingConfiguration": {
                    "InputLambdaProcessor": {
                        "ResourceARN": "{}".format(lambda_fn_arn_invoke_ep),
                        "RoleARN": "{}".format(iam_role_lambda_arn),
                    }
                },
                "InputSchema": {
                    "RecordFormat": {
                        "RecordFormatType": "CSV",
                        "MappingParameters": {
                            "CSVMappingParameters": {"RecordRowDelimiter": "\n", "RecordColumnDelimiter": "\t"}
                        },
                    },
                    "RecordColumns": [
                        {"Name": "review_id", "Mapping": "review_id", "SqlType": "VARCHAR(14)"},
                        {"Name": "star_rating", "Mapping": "star_rating", "SqlType": "INTEGER"},
                        {"Name": "product_category", "Mapping": "product_category", "SqlType": "VARCHAR(24)"},
                        {"Name": "review_body", "Mapping": "review_body", "SqlType": "VARCHAR(65535)"},
                    ],
                },
            },
        ],
        Outputs=[
            {
                "Name": "AVG_STAR_RATING_SQL_STREAM",
                "LambdaOutput": {
                    "ResourceARN": "{}".format(lambda_fn_arn_cloudwatch),
                    "RoleARN": "{}".format(iam_role_lambda_arn),
                },
                "DestinationSchema": {"RecordFormatType": "CSV"},
            },
            {
                "Name": "ANOMALY_SCORE_SQL_STREAM",
                "LambdaOutput": {
                    "ResourceARN": "{}".format(lambda_fn_arn_sns),
                    "RoleARN": "{}".format(iam_role_kinesis_arn),
                },
                "DestinationSchema": {"RecordFormatType": "CSV"},
            },
            {
                "Name": "APPROXIMATE_COUNT_SQL_STREAM",
                "KinesisStreamsOutput": {
                    "ResourceARN": "{}".format(stream_arn),
                    "RoleARN": "{}".format(iam_role_kinesis_arn),
                },
                "DestinationSchema": {"RecordFormatType": "CSV"},
            },
        ],
        ApplicationCode=sql_code,
    )
    print("SQL application {} successfully created.".format(kinesis_data_analytics_app_name))
    print(json.dumps(response, indent=4, sort_keys=True, default=str))
except ClientError as e:
    if e.response["Error"]["Code"] == "ResourceInUseException":
        print("SQL App {} already exists.".format(kinesis_data_analytics_app_name))
    else:
        print("Unexpected error: %s" % e)

response = kinesis_analytics.describe_application(ApplicationName=kinesis_data_analytics_app_name)
print(json.dumps(response, indent=4, sort_keys=True, default=str))

input_id = response["ApplicationDetail"]["InputDescriptions"][0]["InputId"]
print(input_id)


In [None]:
# Start the Kinesis Data Analytics App

try:
    response = kinesis_analytics.start_application(
        ApplicationName=kinesis_data_analytics_app_name,
        InputConfigurations=[{"Id": input_id, "InputStartingPositionConfiguration": {"InputStartingPosition": "NOW"}}],
    )
    print(json.dumps(response, indent=4, sort_keys=True, default=str))
except ClientError as e:
    if e.response["Error"]["Code"] == "ResourceInUseException":
        print("Application {} is already starting.".format(kinesis_data_analytics_app_name))
    else:
        print("Error: {}".format(e))

response = kinesis_analytics.describe_application(ApplicationName=kinesis_data_analytics_app_name)

app_status = response["ApplicationDetail"]["ApplicationStatus"]
print("Application status {}".format(app_status))

while app_status != "RUNNING":
    time.sleep(5)
    response = kinesis_analytics.describe_application(ApplicationName=kinesis_data_analytics_app_name)
    app_status = response["ApplicationDetail"]["ApplicationStatus"]
    print("Application status {}".format(app_status))

print("Application status {}".format(app_status))

# Put Customer Reviews On Kinesis Data Firehose

<img src="img/kinesis-complete.png" width="90%" align="left">

In [None]:
firehoses = firehose.list_delivery_streams(DeliveryStreamType="DirectPut")
print(json.dumps(firehoses, indent=4, sort_keys=True, default=str))

# Download Dataset
!aws s3 cp 's3://dsoaws/amazon-reviews-pds/tsv/amazon_reviews_us_Digital_Software_v1_00.tsv.gz' ./data/


df = pd.read_csv(
    "./data/amazon_reviews_us_Digital_Software_v1_00.tsv.gz",
    delimiter="\t",
    quoting=csv.QUOTE_NONE,
    compression="gzip",
)
print(df.shape)

df_star_rating_and_review_body = df[["review_id", "star_rating", "product_category", "review_body"]][0:1]
df_star_rating_and_review_body.to_csv(sep="\t", header=None, index=False)

In [None]:
# Simulate Producer Application Writing Records to the Stream

# Open Lambda (CloudWatch) Logs
# Open Custom CloudWatch Metrics
# Open Kinesis Data Analytics Console UI

# Put Records onto Firehose
firehose_response = firehose.describe_delivery_stream(DeliveryStreamName=firehose_name)
print(json.dumps(firehose_response, indent=4, sort_keys=True, default=str))
%%time

step = 1

for start_idx in range(0, 500, step):
    end_idx = start_idx + step

    df_star_rating_and_review_body = df[["review_id", "product_category", "review_body"]][start_idx:end_idx]

    reviews_tsv = df_star_rating_and_review_body.to_csv(sep="\t", header=None, index=False)

    # print(reviews_tsv.encode('utf-8'))

    response = firehose.put_record(Record={"Data": reviews_tsv.encode("utf-8")}, DeliveryStreamName=firehose_name)

# Review S3 Kinesis Source Records


They should look like this: 
    
```
R2EI7QLPK4LF7U	Digital_Software	So far so good
R1W5OMFK1Q3I3O	Digital_Software	Needs a little more work.....
RPZWSYWRP92GI	 Digital_Software	Please cancel.
R2WQWM04XHD9US	Digital_Software	Works as Expected!
```

# Review S3 Kinesis Transformed Records

They should look like this:
    
```
R2EI7QLPK4LF7U	5	Digital_Software	So far so good
R1W5OMFK1Q3I3O	3	Digital_Software	Needs a little more work.....
RPZWSYWRP92GI	 1	Digital_Software	Please cancel.
R2WQWM04XHD9US	5	Digital_Software	Works as Expected!
```

# Kinesis Analytics UI: 

<img src="../readme_pics/kinesis_analytics_1.png" width="80%" align="left">
<img src="../readme_pics/kinesis_analytics_5.png" width="80%" align="left">
<img src="../readme_pics/kinesis_analytics_4.png" width="80%" align="left">


# Create and Put Anomaly Data Onto Stream
Here, we are hard-coding a bad review to trigger an anomaly.

In [None]:
%%time

import time

anomaly_step = 1

for start_idx in range(0, 10000, anomaly_step):
    timestamp = int(time.time())

    df_anomalies = pd.DataFrame(
        [
            {
                "review_id": str(timestamp),
                "product_category": "Digital_Software",
                "review_body": "This is an awful waste of time.",
            },
        ],
        columns=["review_id", "star_rating", "product_category", "review_body"],
    )

    reviews_tsv_anomalies = df_anomalies.to_csv(sep="\t", header=None, index=False)

    response = firehose.put_record(
        Record={"Data": reviews_tsv_anomalies.encode("utf-8")}, DeliveryStreamName=firehose_name
    )

<img src="../readme_pics/kinesis_analytics_3.png" width="80%" align="left">