In [1]:
# Load dependencies and set constants
from pyflink.datastream.functions import ProcessFunction
from pyflink.common.serialization import SimpleStringSchema, SerializationSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.datastream.connectors import FlinkKafkaConsumer, FlinkKafkaProducer
from matplotlib import pyplot as plt

import time
import sys

# specifiy Kafka producer and Kafka consumer servers (only one server in this case)
producer_props = {
    'bootstrap.servers': 'localhost:9092',
    'queue.buffering.max.messages': '1000000'
}

consumer_props = {
    "bootstrap.servers": "localhost:9092",
    "group.id": "twitter-consumers",
    "client.id": "client-1",
}

# topic from which to read
KAFKA_TOPIC = "twitter-stream"
# path to Kafka connector dependency
KAFKA_CONNECTOR_JAR = "file:///home/ubuntu/flink-sql-connector-kafka_2.11-1.12.2.jar"

In [None]:
# custom process-function. Executed upon record arrival
class MyProcessFunction(ProcessFunction):

    def process_element(self, value, ctx: 'ProcessFunction.Context'):
        # global might not be ideal here (parallelism possible?) but currently nothing else comes to mind
        global records_received
        global start_time
        global iterations
        cur_time = time.time()
        # ctx.timestamp() will return the LogAppendTime set by Kafka
        latency = (cur_time * 1000) - ctx.timestamp()
        result = str(latency)
        yield result

# set up execution environment
env = StreamExecutionEnvironment.get_execution_environment()
# EventTime --> Flink will look for pre-assigned timestamp
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

# Add the Kafka Connector Dependency
env.add_jars(KAFKA_CONNECTOR_JAR)
env.add_classpaths(KAFKA_CONNECTOR_JAR)

# define Kafka source and sink (mind the different topics)
kafka_consumer = FlinkKafkaConsumer("twitter-stream", SimpleStringSchema(), consumer_props)
kafka_producer = FlinkKafkaProducer("twitter-stream-results", SimpleStringSchema(), producer_props)

# create stream 
stream = env.add_source(kafka_consumer)
stream.process(MyProcessFunction(), output_type=Types.STRING()) \
      .add_sink(kafka_producer)

# execute job
env.execute()