Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
116 lines (79 sloc) 6.08 KB

Example: Retrieving the Most Frequently Occurring Values (TOP_K_ITEMS_TUMBLING)

This Amazon Kinesis Data Analytics example demonstrates how to use the TOP_K_ITEMS_TUMBLING function to retrieve the most frequently occurring values in a tumbling window. For more information, see TOP_K_ITEMS_TUMBLING function in the Amazon Kinesis Data Analytics SQL Reference.

The TOP_K_ITEMS_TUMBLING function is useful when aggregating over tens or hundreds of thousands of keys, and you want to reduce your resource usage. The function produces the same result as aggregating with GROUP BY and ORDER BY clauses.

In this example, you write the following records to an Amazon Kinesis data stream:

{"TICKER": "TBV"}
{"TICKER": "INTC"}
{"TICKER": "MSFT"}
{"TICKER": "AMZN"}
...

You then create a Kinesis Data Analytics application in the AWS Management Console, with the Kinesis data stream as the streaming source. The discovery process reads sample records on the streaming source and infers an in-application schema with one column (TICKER) as shown following.

[Console screenshot showing the in-application schema with a ticker column.]

You use the application code with the TOP_K_VALUES_TUMBLING function to create a windowed aggregation of the data. Then you insert the resulting data into another in-application stream, as shown in the following screenshot:

[Console screenshot showing the resulting data in an in-application stream.]

In the following procedure, you create a Kinesis Data Analytics application that retrieves the most frequently occurring values in the input stream.

Topics

Step 1: Create a Kinesis Data Stream

Create an Amazon Kinesis data stream and populate the records as follows:

  1. Sign in to the AWS Management Console and open the Kinesis console at https://console.aws.amazon.com/kinesis.

  2. Choose Data Streams in the navigation pane.

  3. Choose Create Kinesis stream, and then create a stream with one shard. For more information, see Create a Stream in the Amazon Kinesis Data Streams Developer Guide.

  4. To write records to a Kinesis data stream in a production environment, we recommend using either the Kinesis Client Library or Kinesis Data Streams API. For simplicity, this example uses the following Python script to generate records. Run the code to populate the sample ticker records. This simple code continuously writes a random ticker record to the stream. Leave the script running so that you can generate the application schema in a later step.

     
    import json
    import boto3
    import random
    import datetime
    
    kinesis = boto3.client('kinesis')
    def getReferrer():
        data = {}
        now = datetime.datetime.now()
        str_now = now.isoformat()
        data['EVENT_TIME'] = str_now
        data['TICKER'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV'])
        price = random.random() * 100
        data['PRICE'] = round(price, 2)
        return data
    
    while True:
            data = json.dumps(getReferrer())
            print(data)
            kinesis.put_record(
                    StreamName="ExampleInputStream",
                    Data=data,
                    PartitionKey="partitionkey")
    

Step 2: Create the Kinesis Data Analytics Application

Create a Kinesis Data Analytics application as follows:

  1. Open the Kinesis Data Analytics console at https://console.aws.amazon.com/kinesisanalytics.

  2. Choose Create application, type an application name, and choose Create application.

  3. On the application details page, choose Connect streaming data to connect to the source.

  4. On the Connect to source page, do the following:

    1. Choose the stream that you created in the preceding section.

    2. Choose Discover Schema. Wait for the console to show the inferred schema and samples records that are used to infer the schema for the in-application stream created. The inferred schema has one column.

    3. Choose Save schema and update stream samples. After the console saves the schema, choose Exit.

    4. Choose Save and continue.

  5. On the application details page, choose Go to SQL editor. To start the application, choose Yes, start application in the dialog box that appears.

  6. In the SQL editor, write the application code, and verify the results as follows:

    1. Copy the following application code and paste it into the editor:

      CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
        "TICKER" VARCHAR(4), 
        "MOST_FREQUENT_VALUES" BIGINT
      );
      
      CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
          INSERT INTO "DESTINATION_SQL_STREAM"
          SELECT STREAM * 
              FROM TABLE (TOP_K_ITEMS_TUMBLING(
                  CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
                  'TICKER',         -- name of column in single quotes
                  5,                       -- number of the most frequently occurring values
                  60                       -- tumbling window size in seconds
                  )
              );
      
    2. Choose Save and run SQL.

      On the **Real-time analytics **tab, you can see all the in-application streams that the application created and verify the data.

You can’t perform that action at this time.