### Configuring Environments and Source and Sink Tables

In [None]:
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import TableConfig, DataTypes, StreamTableEnvironment
from pyflink.table.descriptors import Schema, Json, Kafka, Rowtime
from pyflink.table.window import Tumble

exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
exec_env.set_parallelism(2)

t_config = TableConfig()
t_env = StreamTableEnvironment.create(exec_env, t_config)

# Define 'clicks' source table on JSON-encoded Kafka topic 'clicks'
t_env.connect(
    Kafka()
        .version("universal")
        .topic('clicks')
        .property("bootstrap.servers", "kafka:9092")
        .property("group.id", "joho!")
        .start_from_earliest()) \
        .in_append_mode() \
    .with_format(
        Json()
        .json_schema("{" +
                     "'title': 'clicks', " +
                     "'type': 'object', " + 
                     "'properties' : {" + 
                     "  'url': {'type': 'string'}, " + 
                     "  'user': {'type': 'string'}, " +
                     "  'ctime': {'type': 'string', 'format': 'date-time'} "+ 
                     "}}")) \
    .with_schema(
        Schema()
        .field("url", "VARCHAR")
        .field("user", "VARCHAR")
        .field("clicktime", "TIMESTAMP") \
                .rowtime(Rowtime()
                         .timestamps_from_field('ctime')
                         .watermarks_periodic_ascending())) \
    .register_table_source('clicks')

# Define 'click-counts' sink table on Kafka topic 'click-counts' with JSON encoding
t_env.connect(Kafka()
        .version("universal")
        .topic('click-counts')
        .property("bootstrap.servers", "kafka:9092")
        .property("group.id", "my-group")) \
        .in_append_mode() \
    .with_format(
        Json().derive_schema()) \
    .with_schema(
        Schema()
        .field("url", "VARCHAR")
        .field("count", "BIGINT")
        .field("cntstart", "TIMESTAMP") \
        .field("cntend", "TIMESTAMP")) \
    .register_table_sink("click-counts")

### Defining and Running a Python Table API Query

In [None]:
# Define query that reads from clicks source table, 
#   counts clicks per URL and 15 sec window,
#   and writes the result to the sink table.
t_env \
    .scan('clicks') \
    .window(Tumble.over('15.seconds').on('clicktime').alias('w')) \
    .group_by('url, w') \
    .select('url, user.count as count, w.start as cntstart, w.end as cntend') \
    .insert_into('click-counts')
  
# execute query
print("Start query...")
t_env.execute("Count Clicks")