# Queries on streaming data with Managed Apache Flink


In [1]:
%flink.pyflink
import os
import json

from datetime import datetime
from pyflink.common import Row
from pyflink.table.expressions import col, lit
from pyflink.table import (EnvironmentSettings, StreamTableEnvironment, TableEnvironment, TableDescriptor, Schema,
                           DataTypes, FormatDescriptor)
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table.window import Slide
from pyflink.table.udf import udf


First, set up the environment for executing table programs in streaming mode with the following lines:

In [3]:
%flink.pyflink
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

 As an example, the data that you will ingest looks like this:

```json
{
    'order_id': '1f4da8b2-73d0-49d5-9762-3e2e0a3cf004', 
    'order_timestamp': '2024-04-04T15:32:03', 
    'order_date': '2024-04-04', 
    'customer_number': 198, 
    'customer_visit_number': 1,
    'customer_city': 'Makati City', 
    'customer_country': 'Philippines',
    'customer_credit_limit': 60237,
    'device_type': 'desktop', 
    'browser': 'Opera/9.32.(Windows 98; tig-ER) Presto/2.9.179 Version/10.00', 
    'operating_system': 'Android', 
    'product_code': 'S32_1268', 
    'product_line': 'Trucks and Buses',
    'product_unitary_price': 96.31, 
    'quantity': 10, 
    'total_price': 963.1,
    'traffic_source': 'www.hardin-green.com'
}
```


In [6]:
%flink.pyflink

input_table_name = "source_stream"
table_env.execute_sql(f"DROP TABLE IF EXISTS {input_table_name};")

input_stream_name = "kinesis-input-stream"

region="us-east-1"
source_table_ddl = """
  CREATE TABLE {0} (
    order_id STRING,
    order_timestamp TIMESTAMP(0),
    order_date STRING,
    customer_number INT,
    customer_visit_number INT,
    customer_city STRING,
    customer_country STRING,
    customer_credit_limit INT,
    device_type STRING,
    browser STRING,
    operating_system STRING,
    product_code STRING, 
    product_line STRING,
    product_unitary_price NUMERIC,
    quantity INT, 
    total_price NUMERIC,
    traffic_source STRING,
    WATERMARK FOR order_timestamp AS order_timestamp - INTERVAL '5' MINUTES)
    PARTITIONED BY (order_id)
    WITH (
    'connector' = 'kinesis',
    'stream' = '{1}',
    'aws.region' = '{2}',
    'format' = 'json',
    'scan.stream.initpos' = 'TRIM_HORIZON',
    'json.timestamp-format.standard' = 'ISO-8601'
    ) """.format(input_table_name, input_stream_name, region)
table_env.execute_sql(source_table_ddl)

Now, let's create an User Defined Function (UDF) to convert the timestamps into string. This is helpful as a workaround to save timestamps in AWS Kinesis.


In [9]:
%flink.pyflink
@udf(input_types=[DataTypes.TIMESTAMP(3)], result_type=DataTypes.STRING())
def to_string(i):
    return str(i)
table_env.create_temporary_system_function("to_string", to_string)


 
Now, you will create a sliding window table from your `source_stream`. Here you can use the sliding window query to get the total number of sales. 
In this case, define the window size to be of 6 minutes while the window slide to be of 3 minutes.

In [11]:
%flink.pyflink
# Sliding window query
input_table = table_env.from_path("source_stream")
sliding_window_table = (
        input_table.window(
            Slide.over(lit(6).minute)
            .every(lit(3).minutes)
            .on(col("order_timestamp"))
            .alias("six_minute_window")
        )
        .group_by(col("six_minute_window"))
        .select(to_string(col("six_minute_window").end).alias("event_time"), col("total_price").sum.alias("total_sales"))
    )

Create a temporary view in your table environment based on the sliding window table that you created.


In [13]:
%flink.pyflink
table_env.create_temporary_view("sliding_window_table", sliding_window_table)

 
In the next cell, you have to define the schema of your sink table, which will be actually one of the output AWS Kinesis data streams. 

In [15]:
%flink.pyflink

table_name = "output_sliding_sales_stream"
table_env.execute_sql(f"DROP TABLE IF EXISTS {table_name};")

stream_name = "kinesis-total-sales-slide-output-stream"

region="us-east-1"
source_table_ddl = """
 CREATE TABLE {0} (
    event_time STRING,
    total_sales NUMERIC)

    WITH (
    'connector' = 'kinesis',
    'stream' = '{1}',
    'aws.region' = '{2}',
    'format' = 'json',
    'json.timestamp-format.standard' = 'ISO-8601'
    ) """.format(table_name, stream_name, region)
table_env.execute_sql(source_table_ddl)

 
After you have defined the schema in your output table, insert the data from the sliding window table into your sink:

In [17]:
%flink.pyflink
table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}"
                                     .format("output_sliding_sales_stream", "sliding_window_table"))


 
Run the following commands to run the consumer script:

```bash
python3 scripts/consumer/src/consumer.py kinesis-total-sales-slide-output-stream
```

You should start seeing some processed records with the schema that you just generated and should look similar to this output:

```json
{'event_time': '2024-06-01 01:00:00', 'total_sales': 25385}
```