In this demo we will be ingesting IoT data into a Kafka topic from which it will be analysed using Stream Analytics. To make it a bit more realistic, the data is not directly sent to Kafka from the IoT devices (vehicles) but first sent through an MQTT broker (IoT gateway).
The following diagram shows the setup of the data flow which will be implemented step by step. Of course we will not be using real-life data, but have a program simulating trucks and their driving behaviour.
We will see various technologies in action, such as Kafka, MQTT, Kafka Connect, Kafka Streams and ksqlDB.
The platform where the demos can be run on, has been generated using the platys
toolset using the platys-modern-data-platform
stack.
The generated artefacts are available in the ./docker
folder.
The prerequisites for running the platform are Docker and Docker Compose.
The environment is completely based on docker containers. In order to easily start the multiple containers, we are going to use Docker Compose. You need to have at least 12 GB of RAM available, but better is 16 GB.
First, create the following two environment variables, which export the Public IP address (if a cloud environment) and the Docker Engine (Docker Host) IP address:
export DOCKER_HOST_IP=<docker-host-ip>
export PUBLIC_IP=<public-host-ip>
You can either add them to /etc/environment
(without export) to make them persistent or use an .env
file inside the docker
folder with the two variables.
It is very important that these two are set, otherwise the platform will not run properly.
Now navigate into the docker
folder and start docker-compose
.
cd ./docker
docker-compose up -d
To show all logs of all containers use
docker-compose logs -f
To show only the logs for some of the containers, for example kafka-connect-1
and kafka-connect-2
, use
docker-compose logs -f kafka-connect-1 kafka-connect-2
If you want to stop the platform (all containers), run
docker-compose down
Some services in the docker-compose.yml
are optional and can be removed, if you don't have enough resources to start them.
As a final step, add dataplatform
as an alias to the /etc/hosts
file so that the links used in this document work.
<public-host-ip> dataplatform
If you have no rights for doing that, then you have to use your IP address instead of dataplatform
in all the URLs.
For a list of available services, navigate to http://dataplatform:80/services.
When a terminal is needed, you can use the Web Terminal available at http://dataplatform:3001/.
The Kafka cluster is configured with auto.topic.create.enable
set to false
. Therefore we first have to create all the necessary topics, using the kafka-topics
command line utility of Apache Kafka.
From a terminal window, use the kafka-topics
CLI inside the kafka-1
docker container to create the topics vehicle_tracking_
and logisticsdb_driver
.
docker exec -it kafka-1 kafka-topics --bootstrap-server kafka-1:19092 --create --topic vehicle_tracking_sysA --partitions 8 --replication-factor 3
docker exec -it kafka-1 kafka-topics --bootstrap-server kafka-1:19092 --create --topic vehicle_tracking_sysB --partitions 8 --replication-factor 3
If you don't like to work with the CLI, you can also create the Kafka topics using the graphical user interfaces Cluster Manager for Kafka (CMAK) or the Apache Kafka HQ (AKHQ).
The necessary tables are created automatically when running the stack using Docker Compose. Use the following command in a terminal window, to show the content of the driver
table:
docker exec -ti postgresql psql -d demodb -U demo -c "SELECT * FROM logistics_db.driver"
Create the MySQL table with shipment information:
docker exec -it mysql bash -c 'mysql -u root -pmanager'
CREATE USER 'debezium'@'%' IDENTIFIED WITH mysql_native_password BY 'dbz';
CREATE USER 'replicator'@'%' IDENTIFIED BY 'replpass';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium';
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'replicator';
GRANT SELECT, INSERT, UPDATE, DELETE ON sample.* TO sample;
USE sample;
DROP TABLE shipment;
CREATE TABLE shipment (
id INT PRIMARY KEY,
vehicle_id INT,
target_wkt VARCHAR(2000),
create_ts timestamp DEFAULT CURRENT_TIMESTAMP,
update_ts timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
INSERT INTO shipment (id, vehicle_id, target_wkt) VALUES (1,11, 'POLYGON ((-91.29638671875 39.04478604850143, -91.4501953125 38.46219172306828, -90.98876953125 37.94419750075404, -89.912109375 37.78808138412046, -88.9892578125 38.37611542403604, -88.92333984375 38.77121637244273, -89.71435546875 39.470125122358176, -90.19775390625 39.825413103424786, -91.29638671875 39.04478604850143))');
INSERT INTO shipment (id, vehicle_id, target_wkt) VALUES (2, 42, 'POLYGON ((-91.29638671875 39.04478604850143, -91.4501953125 38.46219172306828, -90.98876953125 37.94419750075404, -89.912109375 37.78808138412046, -88.9892578125 38.37611542403604, -88.92333984375 38.77121637244273, -89.71435546875 39.470125122358176, -90.19775390625 39.825413103424786, -91.29638671875 39.04478604850143))');
INSERT INTO shipment (id, vehicle_id, target_wkt) VALUES (3, 12, 'POLYGON ((-91.29638671875 39.04478604850143, -91.4501953125 38.46219172306828, -90.98876953125 37.94419750075404, -89.912109375 37.78808138412046, -88.9892578125 38.37611542403604, -88.92333984375 38.77121637244273, -89.71435546875 39.470125122358176, -90.19775390625 39.825413103424786, -91.29638671875 39.04478604850143))');
INSERT INTO shipment (id, vehicle_id, target_wkt) VALUES (4, 13, 'POLYGON ((-91.29638671875 39.04478604850143, -91.4501953125 38.46219172306828, -90.98876953125 37.94419750075404, -89.912109375 37.78808138412046, -88.9892578125 38.37611542403604, -88.92333984375 38.77121637244273, -89.71435546875 39.470125122358176, -90.19775390625 39.825413103424786, -91.29638671875 39.04478604850143))');
INSERT INTO shipment (id, vehicle_id, target_wkt) VALUES (5, 14, 'POLYGON ((-91.29638671875 39.04478604850143, -91.4501953125 38.46219172306828, -90.98876953125 37.94419750075404, -89.912109375 37.78808138412046, -88.9892578125 38.37611542403604, -88.92333984375 38.77121637244273, -89.71435546875 39.470125122358176, -90.19775390625 39.825413103424786, -91.29638671875 39.04478604850143))');
INSERT INTO shipment (id, vehicle_id, target_wkt) VALUES (6, 15, 'POLYGON ((-91.29638671875 39.04478604850143, -91.4501953125 38.46219172306828, -90.98876953125 37.94419750075404, -89.912109375 37.78808138412046, -88.9892578125 38.37611542403604, -88.92333984375 38.77121637244273, -89.71435546875 39.470125122358176, -90.19775390625 39.825413103424786, -91.29638671875 39.04478604850143))');
INSERT INTO shipment (id, vehicle_id, target_wkt) VALUES (7, 32, 'POLYGON ((-91.29638671875 39.04478604850143, -91.4501953125 38.46219172306828, -90.98876953125 37.94419750075404, -89.912109375 37.78808138412046, -88.9892578125 38.37611542403604, -88.92333984375 38.77121637244273, -89.71435546875 39.470125122358176, -90.19775390625 39.825413103424786, -91.29638671875 39.04478604850143))');
INSERT INTO shipment (id, vehicle_id, target_wkt) VALUES (8, 48, 'POLYGON ((-91.29638671875 39.04478604850143, -91.4501953125 38.46219172306828, -90.98876953125 37.94419750075404, -89.912109375 37.78808138412046, -88.9892578125 38.37611542403604, -88.92333984375 38.77121637244273, -89.71435546875 39.470125122358176, -90.19775390625 39.825413103424786, -91.29638671875 39.04478604850143))');
For simulating vehicle tracking data, we are going to use a Java program (adapted from Hortonworks) and maintained here.
The simulator can produce data to various targets, such as Kafka, MQTT or Files. These two options are shown below.
Now let's produce the truck events to the MQTT broker running on port 1883. In a terminal window run the following command:
docker run --network host --rm trivadis/iot-truck-simulator '-s' 'MQTT' '-h' $DOCKER_HOST_IP '-p' '1883' '-f' 'JSON' '-vf' '1-49'
Leave this running in the terminal window.
Note: You can shortcut this workshop here by skipping step 1 and publishing directly to Kafka instead of MQTT. In that case perform use the following command instead of the one below docker run trivadis/iot-truck-simulator '-s' 'KAFKA' '-h' $DOCKER_HOST_IP '-p' '9092' '-f' 'JSON' '-vf' '1-49'
and skip to Step 2.
In this part we will show how we can consume the data from the MQTT broker and send it to a Kafka topic. We will be using Kafka Connect for that.
Let's first see that we get vehicle tracking messages in the topic MQTT topic hierarchy trucks/+/position
.
There two options for consuming from MQTT
- use dockerized MQTT client in the terminal
- use browser-based HiveMQ Web UI
To start consuming using through a command line, perform the following docker command:
docker run -it --rm --name mqttclient efrecon/mqtt-client sub -h "$DOCKER_HOST_IP" -p 1883 -t "truck/+/position" -v
The consumed messages will show up in the terminal.
To start consuming using the MQTT UI (HiveMQ Web UI), navigate to http://dataplatform:28136 and connect using dataplatform
for the Host field, 9101
for the Port field and then click on Connect:
When successfully connected, click on Add New Topic Subscription and enter truck/+/position
into Topic field and click Subscribe:
As soon as messages are produced to MQTT, you should see them either on the CLI or in the MQTT UI (Hive MQ) as shown below.
Alternatively you can also use the MQTT.fx, MQTT Explorer or MQTTLens applications to browse for the messages on the MQTT broker. They are all available for installation on Mac or Windows.
In order to get the messages from MQTT into Kafka, we will be using the Kafka Connect framework. Kafka Connect is part of the Apache Kafka project and can run various connectors, either as Source or as a Sink Connector, as shown in the following diagram:
Connectors are available from Confluent as well as other, 3rd party organisations. A good source for connectors is the Confluent Hub, although it is not complete, some connectors can also be found on GitHub projects.
There are multiple Kafka Connectors available for MQTT. We can either use the one provided by Confluent Inc. (in preview and available as evaluation license on Confluent Hub) or the one provided as part of the Landoop Stream-Reactor Project available on GitHub. Here we will use the one provided by Landoop.
You can check the connector which are pre-installed by calling the REST API of the Kafka Connector cluster:
curl -XGET http://dataplatform:8083/connector-plugins | jq
we can see that there is no MQTT connector available yet
There are two instances of the Kafka Connect service instance running as part of the Modern Data Platform, kafka-connect-1
and kafka-connect-2
.
To add the connector implementations, without having to copy them into the docker container (or even create a dedicated docker image holding the jar), both connect services are configured to use additional connector implementations from the local folder /etc/kafka-connect/custom-plugins
inside the docker container. This folder is mapped as a volume to the plugins/kafka-connect
folder outside of the container on to the docker host.
So into this folder we have to copy the artefacts of the Kafka connectors we want to use.
Navigate into the plugins/kafka-connect
folder (which is a sub-folder of the docker
folder with the docker-compose.yml
file.
cd $DATAPLATFORM_HOME/plugins/kafka-connect/connectors
and download the 4.2.0/kafka-connect-mqtt-4.2.0.zip
file from the Landoop Stream-Reactor Project project.
sudo wget https://github.com/lensesio/stream-reactor/releases/download/4.2.0/kafka-connect-mqtt-4.2.0.zip
Once it is successfully downloaded, uncompress it using this tar
command and remove the file.
sudo unzip kafka-connect-mqtt-4.2.0.zip
sudo rm kafka-connect-mqtt-4.2.0.zip
Now let's restart Kafka connect in order to pick up the new connector (Make sure to navigate back to the docker folder first, either using cd $DATAPLATFORM_HOME
or cd ../..
)
cd $DATAPLATFORM_HOME
docker-compose restart kafka-connect-1 kafka-connect-2
The connector should now be added to the Kafka cluster. You can confirm that by watching the log file of the two containers
docker-compose logs -f kafka-connect-1 kafka-connect-2
After a while you should see an output similar to the one below with a message that the MQTT connector was added and later that the connector finished starting ...
...
kafka-connect-2 | [2019-06-08 18:01:02,590] INFO Registered loader: PluginClassLoader{pluginLocation=file:/etc/kafka-connect/custom-plugins/kafka-connect-mqtt-1.2.1-2.1.0-all/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
kafka-connect-2 | [2019-06-08 18:01:02,591] INFO Added plugin 'com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
kafka-connect-2 | [2019-06-08 18:01:02,591] INFO Added plugin 'com.datamountaineer.streamreactor.connect.mqtt.sink.MqttSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
kafka-connect-2 | [2019-06-08 18:01:02,592] INFO Added plugin 'com.datamountaineer.streamreactor.connect.converters.source.JsonResilientConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
kafka-connect-2 | [2019-06-08 18:01:02,592] INFO Added plugin 'com.landoop.connect.sql.Transformation' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
...
kafka-connect-2 | [2019-06-08 18:01:11,520] INFO Starting connectors and tasks using config offset -1 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
kafka-connect-2 | [2019-06-08 18:01:11,520] INFO Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
Before we start the connector, let's use a Kafka console listener to consume from the target topic vehicle_tracking_sysA
. We can use the kcat
utility (formerly kcatkcat
), which you can either install locally or use the one provided with the Compose stack:
docker exec -ti kcat kcat -b kafka-1 -t vehicle_tracking_sysA -f "%k - %s\n" -q
Now let's start the connector:
curl -X "POST" "$DOCKER_HOST_IP:8083/connectors" \
-H "Content-Type: application/json" \
--data '{
"name": "mqtt-vehicle-position-source",
"config": {
"connector.class": "io.confluent.connect.mqtt.MqttSourceConnector",
"tasks.max": "1",
"mqtt.server.uri": "tcp://mosquitto-1:1883",
"mqtt.topics": "truck/+/position",
"mqtt.clean.session.enabled":"true",
"mqtt.connect.timeout.seconds":"30",
"mqtt.keepalive.interval.seconds":"60",
"mqtt.qos":"0",
"kafka.topic":"vehicle_tracking_sysA",
"confluent.topic.bootstrap.servers": "kafka-1:19092,kafka-2:19093",
"confluent.topic.replication.factor": "3",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}
}'
The truck position messages are sent to the vehicle_tracking_sysA
topic and should show up on the kcat consumer immediately.
If you want to stop the connector, you can again use the REST API:
curl -X "DELETE" "$DOCKER_HOST_IP:8083/connectors/mqtt-vehicle-position-source"
Navigate to the Kafka Connect UI to view the connector in a graphical window.
In this part we will refine the data and place it in a new topic. The idea here is to have one normalised topic in Avro format, where all the tracking data from both system A and B will be placed, so that further processing can take it from there. For the refinement we will be using ksqlDB.
ksqlDB is an event streaming database purpose-built to help developers create stream processing applications on top of Apache Kafka.
Let's connect to the ksqlDB shell
docker exec -it ksqldb-cli ksql http://ksqldb-server-1:8088
Show the available Kafka topics
show topics;
Let's consume the data from the truck_position
topic, assuming the truck simulator and the MQTT connector is still running.
print 'vehicle_tracking_sysA';
You can also add the keyword from beginning
to start consuming at the beginning of the topic.
print 'vehicle_tracking_sysA' from beginning;
You can also use the show commands for showing the other KSQL objects (which we will now create)
show streams;
show tables;
show queries;
First drop the stream if it already exists:
DROP STREAM IF EXISTS vehicle_tracking_sysA_s;
Now let's create the ksqlDB Stream
CREATE STREAM IF NOT EXISTS vehicle_tracking_sysA_s
(mqttTopic VARCHAR KEY,
timestamp VARCHAR,
truckId VARCHAR,
driverId BIGINT,
routeId BIGINT,
eventType VARCHAR,
latitude DOUBLE,
longitude DOUBLE,
correlationId VARCHAR)
WITH (kafka_topic='vehicle_tracking_sysA',
value_format='JSON');
We are using the JSON
value format, as our stream is a JSON-formatted string.
Let's see the live data by using a SELECT
on the Stream with the EMIT CHANGES
clause:
SELECT * FROM vehicle_tracking_sysA_s EMIT CHANGES;
This is a so-called Push Query (declared by the EMIT CHANGES
clause). A push query is a form of query issued by a client that subscribes to a result as it changes in real-time.
You should see a continuous stream of events as a result of the SELECT statement, similar as shown below:
ksql> SELECT * from vehicle_tracking_sysA_s EMIT CHANGES;
+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+
|MQTTTOPIC |TIMESTAMP |TRUCKID |DRIVERID |ROUTEID |EVENTTYPE |LATITUDE |LONGITUDE |CORRELATIONID |
+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+
|truck/11/position |1599398981285 |11 |17 |1594289134 |Normal |38.99 |-93.45 |-8240058917944842967 |
|truck/42/position |1599398981846 |42 |22 |1325562373 |Normal |37.15 |-97.32 |-8240058917944842967 |
|truck/10/position |1599398982135 |10 |10 |1962261785 |Normal |38.09 |-91.44 |-8240058917944842967 |
|truck/34/position |1599398982454 |34 |16 |1198242881 |Normal |39.01 |-93.85 |-8240058917944842967 |
We have submitted our first simple KSQL statement. Let's now add some analytics to this base statement.
Get info on the stream using the DESCRIBE
command
DESCRIBE vehicle_tracking_sysA_s;
or with the additional EXTENDED
option
DESCRIBE EXTENDED vehicle_tracking_sysA_s;
First drop the stream if it already exists:
DROP STREAM IF EXISTS vehicle_tracking_refined_s;
And now create the refined ksqlDB Stream with a CREATE STREAM ... AS SELECT ...
statement. We include an additional column source
, which holds the system the data is coming from.
CREATE STREAM IF NOT EXISTS vehicle_tracking_refined_s
WITH (kafka_topic='vehicle_tracking_refined',
value_format='AVRO',
VALUE_AVRO_SCHEMA_FULL_NAME='com.trivadis.avro.VehicleTrackingRefined')
AS SELECT truckId AS ROWKEY
, 'Tracking_SysA' AS source
, timestamp
, AS_VALUE(truckId) AS vehicleId
, driverId
, routeId
, eventType
, latitude
, longitude
, correlationId
FROM vehicle_tracking_sysA_s
PARTITION BY truckId
EMIT CHANGES;
To check that the refined topic does in fact hold Avro formatted data, let's just do a normal kcat on the truck_position_refined
topic
docker exec -ti kcat kcat -b kafka-1 -t vehicle_tracking_refined
we can see that it is serialised as Avro
Normal�����Q�B@�ףp=
WX��$343671958179690963
���1598125263176��88�6�������
Normal���Q���C@��p=
דW��$343671958179690963
% Reached end of topic truck_position_refined [0] at offset 367
���1598125263336��71����ߩ��2Unsafe following distance��Q����B@����(\?W��$343671958179690963
% Reached end of topic truck_position_refined [5] at offset 353
% Reached end of topic truck_position_refined [2] at offset 324
���1598125263526��101���������
Normalďż˝=
ףp�E@�R�����V��$343671958179690963
% Reached end of topic truck_position_refined [7] at offset 355
we can use the -s
and -r
option to specify the Avro Serde and the URL of the schema registry and the output is readable:
docker exec -ti kcat kcat -b kafka-1 -t vehicle_tracking_refined -s value=avro -r http://schema-registry-1:8081
You can use the Schema Registry UI on http://dataplatform:28102 to view the Avro Schema created by ksqlDB.
In this part we will show how we can integrate the data from the 2nd vehicle tracking system (System B), where the only integration point available is a set of log files. We can tail these log files and by that get the information as soon as it arrives. We convert the file source into a streaming data source by that. We will be using StreamSets Data Collector for the tail operation, as in real life this data collector would have to run on the Vehicle Tracking System itself or at least on a machine next to it. At the end it needs to be able to access the actual, active file while it is being written to by the application. StreamSets even has an Edge option which is a down-sized version of the full version and is capable of running on a Rasperry Pi.
Let's again start a simulator, but this time simulating the file where the tracking data is appended to:
docker run -v "${PWD}/data-transfer/logs:/out" --rm trivadis/iot-truck-simulator "-s" "FILE" "-f" "CSV" "-d" "1000" "-vf" "50-100" "-es" "2"
Create a StreamSets data flow to tail File into Kafka topic vehicle_tracking_sysB
.
You can import that data flow from ./streamsets/File_to_Kafka.json
if you don't want to create it from scratch.
Now let's listen on the new topic
docker exec -ti kcat kcat -b kafka-1 -t vehicle_tracking_sysB -f "%k - %s\n" -q
and then start the flow in StreamSets. You should see the data from the file arriving as a stream of vehicle tracking data.
Field[STRING:97] - {"text":"SystemB,1599556302227,97,21,1325712174,Normal,37.7:-92.61,4331001611104251967"}
Field[STRING:97] - {"text":"SystemB,1599556302994,97,21,1325712174,Normal,37.6:-92.74,4331001611104251967"}
Field[STRING:97] - {"text":"SystemB,1599556303791,97,21,1325712174,Normal,37.51:-92.89,4331001611104251967"}
The first part (before the dash) shows the content of the Kafka key, generated in the Expression Evaluator
component in StreamSets. The second part represents the Kafka value. Compared to the data from System A, this system delivers its data in CSV format. Additionally the system name is produced and there is only one value for latitude/longitude, it is sent as string and the two values are delimited by a colon character (:
).
In this part we will do the refinement on the raw data from System B and place it into the same topic vehicle_tracking_refined
as used in step 2.
Firs lets create the Stream on the raw data topic:
DROP STREAM IF EXISTS vehicle_tracking_sysB_s;
CREATE STREAM IF NOT EXISTS vehicle_tracking_sysB_s
(ROWKEY VARCHAR KEY,
system VARCHAR,
timestamp VARCHAR,
vehicleId VARCHAR,
driverId BIGINT,
routeId BIGINT,
eventType VARCHAR,
latLong VARCHAR,
correlationId VARCHAR)
WITH (kafka_topic='vehicle_tracking_sysB',
value_format='DELIMITED');
System B delivers the latitude and longitude in one field as a string, with the two values delimited by a colon character.
DESCRIBE vehicle_tracking_sysB_s;
DESCRIBE vehicle_tracking_refined_s;
Now we can use the INSERT
statement to write the data into the vehicle_tracking_refined_s
stream we have created in step 2. We have to make sure that the structure matches (the refinement we perform), which in this case is providing the right value for the soruce
column as well as splitting the latLong
value into a latitude
and longitude
value:
INSERT INTO vehicle_tracking_refined_s
SELECT ROWKEY
, 'Tracking_SysB' AS source
, timestamp
, vehicleId
, driverId
, routeId
, eventType
, CAST(split(latLong,':')[1] as DOUBLE) as latitude
, CAST(split(latLong,':')[2] AS DOUBLE) as longitude
, correlationId
FROM vehicle_tracking_sysB_s
EMIT CHANGES;
So with the vehicle position data from both source systems normalized into the vehicle_tracking_refined
topic and available in ksqlDB throught the vehicle_tracking_refined_s
stream object, is it possible to query for the latest position for a given vehicle.
In ksqlDB suche queries are called pull queries, in contrast to the streaming queries we have seen so far, known as push queries (using the EMIT CHANGES
clause). A pull query is a form of query issued by a client that retrieves a result as of "now", like a query against a traditional RDBS.
So let's do a SELECT
on the stream, restricting on the vehicleId
without an EMIT CHANGES
SELECT * FROM vehicle_tracking_refined_s WHERE vehicleId = '42';
This query will return all the messages collected so far for vehicle 42
.
ksql> SELECT * FROM vehicle_tracking_refined_s WHERE vehicleId = '42'
>;
+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|ROWKEY |SOURCE |TIMESTAMP|VEHICLEID|DRIVERID |ROUTEID |EVENTTYPE|LATITUDE |LONGITUDE|CORRELATI|
| | | | | | | | | |ONID |
+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|42 |Tracking_|166508600|42 |11 |196226178|Normal |38.65 |-90.2 |-60501605|
| |SysA |2589 | | |5 | | | |965346141|
| | | | | | | | | |45 |
|42 |Tracking_|166508600|42 |11 |196226178|Normal |39.1 |-89.74 |-60501605|
| |SysA |5857 | | |5 | | | |965346141|
| | | | | | | | | |45 |
|42 |Tracking_|166508600|42 |11 |196226178|Normal |39.84 |-89.63 |-60501605|
| |SysA |9608 | | |5 | | | |965346141|
| | | | | | | | | |45 |
|42 |Tracking_|166508601|42 |11 |196226178|Normal |40.38 |-89.17 |-60501605|
| |SysA |3558 | | |5 | | | |965346141|
| | | | | | | | | |45 |
|42 |Tracking_|166508601|42 |11 |196226178|Normal |40.76 |-88.77 |-60501605|
| |SysA |6637 | | |5 | | | |965346141|
| | | | | | | | | |45 |
|42 |Tracking_|166508601|42 |11 |196226178|Normal |41.11 |-88.42 |-60501605|
| |SysA |9778 | | |5 | | | |965346141|
| | | | | | | | | |45 |
|42 |Tracking_|166508602|42 |11 |196226178|Normal |41.48 |-88.07 |-60501605|
| |SysA |2997 | | |5 | | | |965346141|
| | | | | | | | | |45 |
|42 |Tracking_|166508602|42 |11 |196226178|Normal |41.87 |-87.67 |-60501605|
| |SysA |6737 | | |5 | | | |965346141|
| | | | | | | | | |45 |
|42 |Tracking_|166508603|42 |11 |196226178|Normal |41.87 |-87.67 |-60501605|
| |SysA |0128 | | |5 | | | |965346141|
| | | | | | | | | |45 |
Query Completed
Query terminated
Note that the query terminates because of it being a pull query (similar to database query, ending at the end of the set).
Could we also use a query to just return the latest data point per vehicle?
For that we can use a GROUP BY
on vehicleId
and using latest_by_offset
on each field.
DROP TABLE IF EXISTS vehicle_tracking_refined_t DELETE TOPIC;
CREATE TABLE IF NOT EXISTS vehicle_tracking_refined_t
WITH (kafka_topic = 'vehicle_tracking_refined_t')
AS
SELECT vehicleId
, latest_by_offset(driverId) driverId
, latest_by_offset(source) source
, latest_by_offset(eventType) eventType
, latest_by_offset(latitude) latitude
, latest_by_offset(longitude) longitude
FROM vehicle_tracking_refined_s
GROUP BY vehicleId
EMIT CHANGES;
This table uses the vehicleId
as the primary key (due to the GROUP BY) and materialises all values as the latest one from the aggregation.
DESCRIBE vehicle_tracking_refined_t;
A describe on the table shows that this primary key is of type STRING
:
ksql> DESCRIBE vehicle_tracking_refined_t;
Name : VEHICLE_TRACKING_REFINED_T
Field | Type
--------------------------------------------
VEHICLEID | VARCHAR(STRING) (primary key)
DRIVERID | BIGINT
SOURCE | VARCHAR(STRING)
EVENTTYPE | VARCHAR(STRING)
LATITUDE | DOUBLE
LONGITUDE | DOUBLE
--------------------------------------------
So to test the pull query, we have to switch to a string, otherwise an error is shown:
SELECT * FROM vehicle_tracking_refined_t WHERE vehicleId = '42';
But we could also change the CREATE TABLE
statement to CAST the vehicleId
into a BIGINT
:
DROP TABLE IF EXISTS vehicle_tracking_refined_t DELETE TOPIC;
CREATE TABLE IF NOT EXISTS vehicle_tracking_refined_t
WITH (kafka_topic = 'vehicle_tracking_refined_t')
AS
SELECT CAST(vehicleId AS BIGINT) vehicleId
, latest_by_offset(driverId) driverId
, latest_by_offset(source) source
, latest_by_offset(eventType) eventType
, latest_by_offset(latitude) latitude
, latest_by_offset(longitude) longitude
FROM vehicle_tracking_refined_s
GROUP BY CAST(vehicleId AS BIGINT)
EMIT CHANGES;
Now we can use it with an integer:
SELECT * FROM vehicle_tracking_refined_t WHERE vehicleId = 42;
For transporting messages from Reis to Kafka, in this workshop we will be using Kafka Connect. We could also use StreamSets or Apache NiFi to achieve the same result.
Luckily, there are multiple Kafka Sink Connectors available for writing from Redis. We can either use the one provided by Confluent Inc. (which is part of Confluent Enterprise but available as evaluation license on Confluent Hub) or the one provided by Redis Inc., which is free to use Redis Kafka Connector (Source and Sink) by Redis available on GitHub. We will be using the later one here. It is already installed as part of the platform.
For creating an instance of the connector over the API, you can either use a REST client or the Linux curl
command line utility, which should be available on the Docker host. Curl is what we are going to use here.
Create a folder scripts (if it does not yet exist) and navigate into the folder.
mkdir scripts
cd scripts
In the scripts
folder, create a file start-redis.sh
and add the code below.
#!/bin/bash
echo "removing Redis Sink Connector"
curl -X "DELETE" "http://dataplatform:8083/connectors/redis-sink"
echo "creating Redis Sink Connector"
curl -X PUT \
http://${DOCKER_HOST_IP}:8083/connectors/redis-sink/config \
-H 'Content-Type: application/json' \
-H 'Accept: application/json' \
-d '{
"connector.class": "com.redis.kafka.connect.RedisSinkConnector",
"tasks.max": "1",
"redis.uri": "redis://redis-1:6379",
"redis.insecure": "true",
"redis.password": "abc123!",
"redis.command": "HSET",
"topics": "vehicle_tracking_refined",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry-1:8081",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
}'
The script first removes the Redis connector, if it does exists already and then creates it from scratch.
Also create a separate script stop-redis.sh
for stopping the connector with the following script:
#!/bin/bash
echo "removing Redis Sink Connector"
curl -X "DELETE" "http://${DOCKER_HOST_IP}:8083/connectors/redis-sink"
Make sure that the both scripts are executable
sudo chmod +x start-redis.sh
sudo chmod +x stop-redis.sh
Finally let's start the connector by running the start-redis
script.
./scripts/start-redis.sh
The connector will receive the messages from the Kafka topic and write it to Redis, using the Kafka key as the key in Redis. We have opted for using the Hash data structure for the value ("redis.command": "HSET"
).
Let's see if we have values in Redis by using the Redis CLI
docker exec -ti redis-1 redis-cli
let's authenticate and then use the KEYS *
command to get all keys
AUTH abc123!
KEYS *
We can see a key for each vehicle.
127.0.0.1:6379> KEYS *
1) "vehicle_tracking_refined:\"46\""
2) "vehicle_tracking_refined:\"18\""
3) "vehicle_tracking_refined:\"25\""
4) "vehicle_tracking_refined:80"
5) "com.redis.kafka.connect.sink.offset.vehicle_tracking_refined.3"
6) "vehicle_tracking_refined:\"11\""
7) "vehicle_tracking_refined:\"16\""
8) "vehicle_tracking_refined:\"29\""
9) "vehicle_tracking_refined:\"45\""
10) "vehicle_tracking_refined:\"38\""
11) "com.redis.kafka.connect.sink.offset.vehicle_tracking_refined.7"
12) "com.redis.kafka.connect.sink.offset.vehicle_tracking_refined.0"
13) "com.redis.kafka.connect.sink.offset.vehicle_tracking_refined.4"
14) "vehicle_tracking_refined:97"
15) "vehicle_tracking_refined:69"
16) "vehicle_tracking_refined:74"
17) "vehicle_tracking_refined:\"30\""
18) "com.redis.kafka.connect.sink.offset.vehicle_tracking_refined.1"
19) "vehicle_tracking_refined:\"40\""
20) "vehicle_tracking_refined:51"
21) "vehicle_tracking_refined:\"14\""
22) "vehicle_tracking_refined:\"43\""
23) "vehicle_tracking_refined:90"
24) "vehicle_tracking_refined:\"20\""
25) "vehicle_tracking_refined:\"26\""
26) "com.redis.kafka.connect.sink.offset.vehicle_tracking_refined.6"
27) "com.redis.kafka.connect.sink.offset.vehicle_tracking_refined.5"
28) "vehicle_tracking_refined:\"33\""
29) "vehicle_tracking_refined:85"
30) "vehicle_tracking_refined:\"10\""
31) "vehicle_tracking_refined:60"
32) "vehicle_tracking_refined:\"35\""
33) "vehicle_tracking_refined:53"
Let's see the content for vehicle 53
using the HGETALL
command
127.0.0.1:6379> HGETALL "vehicle_tracking_refined:53"
1) "SOURCE"
2) "Tracking_SysB"
3) "TIMESTAMP"
4) "1688321889262"
5) "VEHICLEID"
6) "53"
7) "DRIVERID"
8) "20"
9) "ROUTEID"
10) "1090292248"
11) "EVENTTYPE"
12) "Unsafe following distance"
13) "LATITUDE"
14) "40.7"
15) "LONGITUDE"
16) "-89.52"
17) "CORRELATIONID"
18) "5823429444287523"
The connector in Kafka Connect to work with S3 compliant object storage is the Confluent Kafka Connect S3. It is part of the data platform.
curl -X "GET" "$DOCKER_HOST_IP:8083/connector-plugins" | jq
So all we have to do is create another script with the REST call to setup the connector.
In the scripts
folder, create a file start-s3.sh
and add the code below.
#!/bin/bash
echo "removing S3 Sink Connector"
curl -X "DELETE" "$DOCKER_HOST_IP:8083/connectors/s3-sink"
echo "creating Confluent S3 Sink Connector"
curl -X PUT \
http://${DOCKER_HOST_IP}:8083/connectors/s3-sink2/config \
-H 'Content-Type: application/json' \
-H 'Accept: application/json' \
-d '{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"partition.duration.ms": "3600000",
"flush.size": "2000",
"topics": "vehicle_tracking_refined",
"tasks.max": "1",
"timezone": "Europe/Zurich",
"locale": "en",
"partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner",
"timestamp.extractor":"RecordField",
"timestamp.field":"TIMESTAMP",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"s3.region": "us-east-1",
"s3.bucket.name": "logistics-bucket",
"s3.part.size": "5242880",
"store.url": "http://minio-1:9000",
"topics.dir": "refined",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry-1:8081",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}'
We configure the connector to read the topic vehicle_tracking_refined
and write messages to the bucket named logistics-bucket
.
Also create a separate script stop-s3.sh
for just stopping the connector and add the following code:
#!/bin/bash
echo "removing S3 Sink Connector"
curl -X "DELETE" "$DOCKER_HOST_IP:8083/connectors/s3-sink"
Make sure that the both scripts are executable
sudo chmod +x start-s3.sh
sudo chmod +x stop-s3.sh
Before we can start the script, we have to make sure that the bucket logistics-bucket
exists in Object Storage.
docker exec -ti minio-mc mc mb minio-1/logistics-bucket
Finally let's start the connector by running the start-s3
script.
./scripts/start-s3.sh
You have to make sure that either the ingestion into vehicle_tracking_refined
is still working or that you have existing messages in the topic vehicle_tracking_refined
.
As soon as the connector picks up messages, they should start to appear in the logistics-bucket
bucket in MiniIO.
You should see a new folder topics
with a sub-folder vehicle_tracking_refined
representing the topic and inside this folder there is another folder per partition.
In each folder you will find multiple objects, all with some messages from Kafka.
Let's see the content of one of the objects. We cannot do that directly from the MinIO UI, we have to first download it and then use a local editor. To download an object, select the object and then click on the Download action to the right.
The content of the object should be similar to the one shown below
Downloads % cat vehicle_tracking_refined+0+0000000000.avro
Objavro.schemaďż˝
{"type":"record","name":"VehicleTrackingRefined","namespace":"com.trivadis.avro","fields":[{"name":"SOURCE","type":["null","string"],"default":null},{"name":"TIMESTAMP","type":["null","long"],"default":null},{"name":"VEHICLEID","type":["null","long"],"default":null},{"name":"DRIVERID","type":["null","long"],"default":null},{"name":"ROUTEID","type":["null","long"],"default":null},{"name":"EVENTTYPE","type":["null","string"],"default":null},{"name":"LATITUDE","type":["null","double"],"default":null},{"name":"LONGITUDE","type":["null","double"],"default":null},{"name":"CORRELATIONID","type":["null","string"],"default":null}],"connect.version":2,"connect.name":"com.trivadis.avro.VehicleTrackingRefined"}avro.codenull�O;O��f,Φ/[+���|Tracking_SysA����bP*����
Normal�G�z�D@{�G�V�&6906439778495426077Tracking_SysA����bP*����
Normal�z�GaD@�z�G1V�&6906439778495426077Tracking_SysA����bP*����
Normalq=
ף0D@{�G�JV�&6906439778495426077Tracking_SysA����bP*����
Normal�Q���C@���QhV�&6906439778495426077Tracking_SysA����bP*����
Normal����̌C@���(\oV�&6906439778495426077Tracking_SysA����bP*����
Normal33333SC@����̌V�&6906439778495426077Tracking_SysA���bP*����
ףp�D@���G�z�V��&6906439778495426077��Tracking_SysA������b�P�*�������
...
In this part we will be using ksqlDB and as an alternative solution Kafka Streams or Faust to analyse the data in the refined topic vehicle_tracking_refined
.
Now with the data from both system integrated, let's work with it!
In this new stream we are only interested in the messages where the eventType
is not normal. First let's create a SELECT statement which performs the right result, using the ksqlDB CLI:
SELECT * FROM vehicle_tracking_refined_s
WHERE eventType != 'Normal'
EMIT CHANGES;
Now let's create a new stream with that information.
DROP STREAM IF EXISTS problematic_driving_s;
CREATE STREAM IF NOT EXISTS problematic_driving_s
WITH (kafka_topic='problematic_driving',
value_format='AVRO',
partitions=8)
AS
SELECT *
FROM vehicle_tracking_refined_s
WHERE eventtype != 'Normal';
We can see that the stream now only contains the messages filtered down to the relevant ones:
SELECT * FROM problematic_driving_s
EMIT CHANGES;
We can also see the same information by directly getting the data from the underlaying kafka topic problematic_driving
:
docker exec -ti kcat kcat -b kafka-1 -t problematic_driving -s value=avro -r http://schema-registry-1:8081
The same logic can also be implemented using Kafka Streams. In the folder java
you will find the Kafka Streams project kafka-streams-vehicle-tracking
with the implementation. The value we consume from the vehicle_tracking_refined
topic is serialized as Avro. Therefore we configure Kafka Streams to use the SpecificAvroSerde
.
package com.trivadis.kafkastreams;
import com.trivadis.avro.VehicleTrackingRefined;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import org.apache.commons.cli.*;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import java.util.Properties;
public class DetectProblematicDriving {
static final String VEHICLE_TRACKING_REFINED_STREAM = "vehicle_tracking_refined";
static final String PROBLEMATIC_DRIVING_STREAM = "problematic_driving-kstreams";
public static void main(final String[] args) {
final String applicationId = "test";
final String clientId = "test";
final String bootstrapServer = "dataplatform:9092";
final String schemaRegistryUrl = "http://dataplatform:8081";
final boolean cleanup = false;
final String stateDirPath = "C:\\tmp\\kafka-streams";
final KafkaStreams streams = buildFeed(applicationId, clientId, bootstrapServer, schemaRegistryUrl, stateDirPath);
if (cleanup) {
streams.cleanUp();
}
streams.start();
// Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
streams.close();
}
}));
}
private static KafkaStreams buildFeed(final String applicationId, final String clientId, final String bootstrapServers, final String schemaRegistryUrl,
final String stateDir) {
final Properties streamsConfiguration = new Properties();
// Give the Streams application a unique name. The name must be unique in the
// Kafka cluster
// against which the application is run.
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, clientId);
// Where to find Kafka broker(s).
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// Where to find the Confluent schema registry instance(s)
streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
// Specify default (de)serializers for record keys and for record values.
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Records should be flushed every 10 seconds. This is less than the default
// in order to keep this example interactive.
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
// If Confluent monitoring interceptors are on the classpath,
// then the producer and consumer interceptors are added to the
// streams application.
// MonitoringInterceptorUtils.maybeConfigureInterceptorsStreams(streamsConfiguration);
final StreamsBuilder builder = new StreamsBuilder();
// read the source stream (keyed by objectId)
final KStream<String, VehicleTrackingRefined> vehicleTracking = builder.stream(VEHICLE_TRACKING_REFINED_STREAM);
vehicleTracking.peek((k,v) -> System.out.println("vehicleTracking.peek(...) : " + k + " : " + v));
// filter out all events where eventType equals "Normal"
final KStream<String, VehicleTrackingRefined> vehicleTrackingFiltered = vehicleTracking.filterNot((k,v) -> "Normal".equals (v.getEVENTTYPE().toString()));
// Send the Matches to the Kafka Topic
vehicleTrackingFiltered.to(PROBLEMATIC_DRIVING_STREAM);
// read the driver
//final KTable<String, Driver> driver = builder.table(DRIVER_STREAM);
// Left Join Positions Mecomo Raw with Barge to get the barge id
//KStream<String, PositionMecomo> positionsMecomo = positionsMecomoRaw.leftJoin(barge,
// (leftValue, rightValue) -> createFrom(leftValue, (rightValue != null ? rightValue.getId() : -1) ),
// Joined.<String, PositionMecomoRaw, Barge>keySerde(Serdes.String())
//);
return new KafkaStreams(builder.build(), streamsConfiguration);
}
}
To not give any conflicts with the ksqlDB version, the Kafka Streams implementation publishes to its own topic problematic_driving-kstreams
. So let's create that first
docker exec -it kafka-1 kafka-topics --bootstrap-server kafka-1:19092 --create --topic problematic_driving-kstreams --partitions 8 --replication-factor 3
Now you can run the Kafka Streams application and you should see the problematic driving in the problematic_driving-kstreams
topic
docker exec -ti kcat kcat -b kafka-1 -t problematic_driving-kstreams -s value=avro -r http://schema-registry-1:8081 -o end -q
The same logic can also be implemented using Faust. In the folder python
you will find the Faust project faust-vehicle-tracking
with the implementation. The value we have availabke from the vehicle_tracking_refined
topic is serialized as Avro. Avro is supported by Faut, but the current implementation works on Json. Therefore we first convert the Avro messges into Json using ksqlDB.
CREATE STREAM vehicle_tracking_refined_json_s
WITH (kafka_topic='vehicle_tracking_refined_json',
value_format='JSON',
partitions=8, replicas=3)
AS
SELECT *
FROM vehicle_tracking_refined_s
EMIT CHANGES;
You can install Faust either via the Python Package Index (PyPI) or from source.
pip install -U faust
Faust also defines a group of setuptools extensions that can be used to install Faust and the dependencies for a given feature. Fine more about it here.
In your home directory, create a folder faust-vehicle-tracking
and navigate into the folder
cd
mkdir -p faust-vehicle-tracking/src
cd faust-vehicle-tracking/src
Create a file __main__.py
and add the following code
from src.app import app
app.main()
Create a file app.py
and add the following code
import faust
kafka_brokers = ['dataplatform:29092']
# convenience func for launching the app
def main() -> None:
app.main()
app = faust.App('vehicle-tracking-app', broker=kafka_brokers)
# GameEvent Schema
class VehiclePosition(faust.Record, validation=True, serializer='json'):
TIMESTAMP: str
VEHICLEID: str
DRIVERID: int
ROUTEID: int
EVENTTYPE: str
LATITUDE: float
LONGITUDE: float
rawVehiclePositionTopic = app.topic('vehicle_tracking_refined_json', value_type= VehiclePosition)
problematicDrivingTopic = app.topic('problematic_driving_faust', value_type= VehiclePosition)
@app.agent(rawVehiclePositionTopic)
async def process(positions):
async for position in positions:
print(f'Position for {position. VEHICLEID}')
if position.EVENTTYPE != 'Normal':
await problematicDrivingTopic.send(value=position)
Create the new topic problematic_driving_faust
where the dangerous drving behaviour will be published to:
docker exec -ti kafka-1 kafka-topics --bootstrap-server kafka-1:19092 --create --topic problematic_driving_faust --partitions 8 --replication-factor 3
Now you can run the Faust application the application. From the src
folder run
cd ..
python3 -m src worker -l info
and you should see the problematic driving in the problematic_driving-kstreams
topic
docker exec -ti kcat kcat -b kafka-1 -t problematic_driving_faust -o end -q
In this part of the workshop, we are integrating the driver
information from the Dispatching system into a Kafka topic, so it is available for enrichments of data streams.
We will use the Kafka Connect JDBC Source Connector for periodically retrieving the data from the database table and publish it to the Kafka topic logisticsdb_driver
. The connector is pre-installed as part of the dataplatform.
Instead of configuring the connector through the REST API, as we have seen before with the MQTT connector, we will use the ksqlDB integration with the CREATE CONNECTOR command.
First let's create the Kafka topic logisticsdb_driver
.
docker exec -it kafka-1 kafka-topics --bootstrap-server kafka-1:19092 --create --topic logisticsdb_driver --partitions 8 --replication-factor 3 --config cleanup.policy=compact --config segment.ms=100 --config delete.retention.ms=100 --config min.cleanable.dirty.ratio=0.001
Now in the ksqlDB create the connector (CREATE CONNECTOR)
DROP CONNECTOR jdbc_logistics_sc;
CREATE SOURCE CONNECTOR jdbc_logistics_sc WITH (
"connector.class"='io.confluent.connect.jdbc.JdbcSourceConnector',
"tasks.max" = '1',
"connection.url" = 'jdbc:postgresql://postgresql/demodb?user=demo&password=abc123!',
"mode" = 'timestamp',
"timestamp.column.name" = 'last_update',
"schema.pattern" = 'dispatching',
"table.whitelist" = 'driver',
"validate.non.null" = 'false',
"topic.prefix" = 'logisticsdb_',
"poll.interval.ms" = '10000',
"key.converter" = 'org.apache.kafka.connect.converters.LongConverter',
"key.converter.schemas.enable" = 'false',
"value.converter" = 'org.apache.kafka.connect.json.JsonConverter',
"value.converter.schemas.enable" = 'false',
"transforms" = 'createKey,extractInt',
"transforms.createKey.type" = 'org.apache.kafka.connect.transforms.ValueToKey',
"transforms.createKey.fields" = 'id',
"transforms.extractInt.type" = 'org.apache.kafka.connect.transforms.ExtractField$Key',
"transforms.extractInt.field" = 'id'
);
we can see that all the drivers from the driver
table have been produced into the logisticsdb_driver
topic by using kcat
:
docker exec -ti kcat kcat -b kafka-1 -t logisticsdb_driver -o beginning
you can also use the PRINT
command from ksqlDB instead
PRINT logisticsdb_driver FROM BEGINNING;
back in the ksqlDB console, create a ksqlDB table on the topic
DROP TABLE IF EXISTS driver_t;
CREATE TABLE IF NOT EXISTS driver_t (id BIGINT PRIMARY KEY,
first_name VARCHAR,
last_name VARCHAR,
available VARCHAR,
birthdate VARCHAR)
WITH (kafka_topic='logisticsdb_driver',
value_format='JSON');
SELECT * FROM driver_t EMIT CHANGES;
Now perform an update on one of the drivers in the PostgreSQL database (original source):
docker exec -ti postgresql psql -d demodb -U demo -c "UPDATE logistics_db.driver SET available = 'N', last_update = CURRENT_TIMESTAMP WHERE id = 11"
In this part of the workshop, we are joining the driver
ksqlDB table with the problematic_driving_s
ksqlDB stream to enrich it with valuable information.
Now with the ksqlDB table in place, let's join it with the problematic_driving_s
ksqlDB stream to enrich it with driver information available in the driver_t
table (first_name, last_name and availability):
SELECT pd.driverId, d.first_name, d.last_name, d.available, pd.vehicleId, pd.routeId, pd.eventType
FROM problematic_driving_s pd
LEFT JOIN driver_t d
ON pd.driverId = d.id
EMIT CHANGES;
We can see that the join looks like it has been taken from an RDMBS-based system. The enriched stream can be seen appearing in live on the ksqlDB CLI.
How can we make that enriched dataset (data stream) available in a more permanent fashion? We do that by creating a new Stream based on the SELECT statement just issued. Stop the query by entering CTRL-C
and execute the following statement:
DROP STREAM IF EXISTS problematic_driving_and_driver_s;
CREATE STREAM IF NOT EXISTS problematic_driving_and_driver_s \
WITH (kafka_topic='problematic_driving_and_driver', \
value_format='AVRO', \
partitions=8) \
AS
SELECT pd.driverId, d.first_name, d.last_name, d.available, pd.vehicleId, pd.routeId, pd.eventType
FROM problematic_driving_s pd
LEFT JOIN driver_t d
ON pd.driverId = d.id;
we can use kcat
to show the data stream in the newly created Kafka topic problematic_driving_and_driver_ksql
to show the enrichment in action:
docker exec -ti kcat kcat -b kafka-1 -t problematic_driving_and_driver -s value=avro -r http://schema-registry-1:8081
In this part of the workshop, we are using the aggregate operators count
to perform aggregations over time windows.
The first one is a tumbling window of 1 hour
DROP TABLE IF EXISTS event_type_by_1hour_tumbl_t DELETE TOPIC;
CREATE TABLE event_type_by_1hour_tumbl_t AS
SELECT windowstart AS winstart
, windowend AS winend
, eventType
, count(*) AS nof
FROM problematic_driving_s
WINDOW TUMBLING (SIZE 60 minutes)
GROUP BY eventType
EMIT CHANGES;
The second one is a tumbling window of 1 hour with a slide of 30 minutes.
CREATE TABLE event_type_by_1hour_hopp_t AS
SELECT windowstart AS winstart
, windowend AS winend
, eventType
, count(*) AS nof
FROM problematic_driving_s
WINDOW HOPPING (SIZE 60 minutes, ADVANCE BY 30 minutes)
GROUP BY eventType;
If you are doing a select on the table, you can format the time elements of the time window as shown below
SELECT TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd HH:mm:SS','CET') wsf
, TIMESTAMPTOSTRING(WINDOWEND,'yyyy-MM-dd HH:mm:SS','CET') wef
, WINDOWSTART
, WINDOWEND
, eventType
, nof
FROM event_type_by_1hour_tumbl_t
EMIT CHANGES;
In this part of the workshop we are integrating the shipment
information from the Shipment system into a Kafka topic, so it is available for anayltics.
We will use the Kafka Connect Debezium MySQL CDC Source Connector to monitor and record all row-level changes in on the shipment
database table and publish it to the Kafka topic sample.sample.shipment
(implementation of the log-based change data capture). The connector is pre-installed as part of the dataplatform.
We are again using the CREATE CONNECTOR command for configuring the connector instead of the REST API.
First let's create the new Kafka topic
docker exec -it kafka-1 kafka-topics --bootstrap-server kafka-1:19092 --create --topic sample.sample.shipment --partitions 8 --replication-factor 3
alternatively I could also be created as a compacted log topic
docker exec -it kafka-1 kafka-topics --bootstrap-server kafka-1:19092 --create --topic sample.sample.shipment --partitions 8 --replication-factor 3 --config cleanup.policy=compact --config segment.ms=100 --config delete.retention.ms=100 --config min.cleanable.dirty.ratio=0.001
Now we can create the connector
DROP CONNECTOR debz_shipment_sc;
CREATE SOURCE CONNECTOR debz_shipment_sc WITH (
'connector.class' = 'io.debezium.connector.mysql.MySqlConnector',
'database.hostname' = 'mysql',
'database.port' = '3306',
'database.user' = 'debezium',
'database.password' = 'dbz',
'database.server.id' = '42',
'database.server.name' = 'sample',
'table.whitelist' = 'sample.shipment',
'database.history.kafka.bootstrap.servers' = 'kafka-1:19092',
'database.history.kafka.topic' = 'dbhistory.sample' ,
'schema_only_recovery' = 'true',
'include.schema.changes' = 'false',
'transforms'= 'unwrap, extractkey',
'transforms.unwrap.type'= 'io.debezium.transforms.ExtractNewRecordState',
'transforms.extractkey.type'= 'org.apache.kafka.connect.transforms.ExtractField$Key',
'transforms.extractkey.field'= 'id',
'key.converter'= 'org.apache.kafka.connect.storage.StringConverter',
'value.converter'= 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url'= 'http://schema-registry-1:8081'
);
Now let's create the corresponding ksqlDB table
DROP TABLE IF EXISTS shipment_t;
CREATE TABLE IF NOT EXISTS shipment_t (id VARCHAR PRIMARY KEY,
vehicle_id INTEGER,
target_wkt VARCHAR)
WITH (kafka_topic='sample.sample.shipment',
value_format='AVRO');
Check with MySQL that CDC works
docker exec -it mysql bash -c 'mysql -u root -pmanager'
And use a select to test that it is working
SELECT * FROM shipment_t EMIT CHANGES;
If you perform an INSERT in MySQL, you should see a change immediately
USE sample;
INSERT INTO shipment (id, vehicle_id, target_wkt) VALUES (9, 49, 'POLYGON ((-91.29638671875 39.04478604850143, -91.4501953125 38.46219172306828, -90.98876953125 37.94419750075404, -89.912109375 37.78808138412046, -88.9892578125 38.37611542403604, -88.92333984375 38.77121637244273, -89.71435546875 39.470125122358176, -90.19775390625 39.825413103424786, -91.29638671875 39.04478604850143))');
In this part of the workshop we are using the shipment
information to detect when a vehicle is near the destination of the shipment.
For that we have implemented some additional user defined functions (UDFs) which can be sue din the same way as the built-in funcitons of ksqlDB.
show functions;
DROP TABLE IF EXISTS shipment_by_vehicle_t;
CREATE TABLE shipment_by_vehicle_t
AS SELECT vehicle_id, collect_list(target_wkt) AS target_wkts
FROM shipment_t
GROUP BY vehicle_id;
SELECT vtr.vehicleId
,array_lag(collect_list(geo_fence(vtr.latitude, vtr.longitude, sbv.target_wkts[1])),1) AS status_before
,array_lag(collect_list(geo_fence(vtr.latitude, vtr.longitude, sbv.target_wkts[1])),0) AS status_now
FROM vehicle_tracking_refined_s vtr
LEFT JOIN shipment_by_vehicle_t sbv
ON CAST (vtr.vehicleId AS INTEGER) = sbv.vehicle_id
WHERE sbv.target_wkts IS NOT NULL
GROUP BY vehicleId
EMIT CHANGES;
CREATE TABLE geo_fence_status_t AS
SELECT vtr.vehicleId
, geo_fence (array_lag(collect_list(geo_fence(vtr.latitude, vtr.longitude, sbv.target_wkts[1])),1) ,
array_lag(collect_list(geo_fence(vtr.latitude, vtr.longitude, sbv.target_wkts[1])),0)
) AS status
FROM vehicle_tracking_refined_s vtr
LEFT JOIN shipment_by_vehicle_t sbv
ON CAST (vtr.vehicleId AS INTEGER) = sbv.vehicle_id
WHERE sbv.target_wkts IS NOT NULL
GROUP BY vehicleId
EMIT CHANGES;
SELECT vehicleId, geo_fence(array_lag(collect_list(geo_fence(latitude, longitude, 'POLYGON ((-90.626220703125 38.80118939192329, -90.62347412109375 38.460041065720446, -90.06866455078125 38.436379603, -90.04669189453125 38.792626957868904, -90.626220703125 38.80118939192329))')),1), array_lag(collect_list(geo_fence(latitude, longitude, 'POLYGON ((-90.626220703125 38.80118939192329, -90.62347412109375 38.460041065720446, -90.06866455078125 38.436379603, -90.04669189453125 38.792626957868904, -90.626220703125 38.80118939192329))')),0)) status FROM vehicle_tracking_refined_s group by vehicleId EMIT CHANGES;
First let's create a stream backed by the dashboard
topic, which will be the channel to the Tipboard dashboard solution.
DROP STREAM IF EXISTS dashboard_s;
CREATE STREAM IF NOT EXISTS dashboard_s
(ROWKEY BIGINT KEY,
tile VARCHAR,
key VARCHAR,
data VARCHAR)
WITH (kafka_topic='dashboard'
, partitions=1
, value_format='JSON');
Now import the StreamSets connector between this new stream and the Tipboard dashboard.
SELECT first_name, last_name, eventType
FROM problematic_driving_and_driver_s
EMIT CHANGES;
CREATE STREAM tipboard_text_s
WITH (value_format = 'JSON', kafka_topic = 'tipboard_text_s', partitions=1)
AS
SELECT driverId AS ROWKEY
, 'text' AS tile
, 'tweet' AS key
, tipboard_text(concat(first_name, ' ', last_name, ' ', eventType)) AS data
FROM problematic_driving_and_driver_s
EMIT CHANGES;
DROP STREAM geo_fence_status_s;
CREATE STREAM geo_fence_status_s (vehicleId STRING KEY
, status STRING)
WITH (kafka_topic='GEO_FENCE_STATUS_T'
, partitions=8
, value_format='AVRO');
INSERT INTO dashboard_s
SELECT CAST (vehicleId AS BIGINT) AS ROWKEY
, 'text' AS tile
, 'tweet' AS key
, tipboard_text(concat('Vehicle ', vehicleId, ' is near its destination')) AS data
FROM geo_fence_status_s
WHERE status = 'ENTERING'
PARTITION BY CAST (vehicleId AS BIGINT)
EMIT CHANGES;
DROP STREAM event_type_by_1hour_tumbl_s;
CREATE STREAM event_type_by_1hour_tumbl_s (eventType STRING KEY
, winstart BIGINT
, winend BIGINT
, nof BIGINT)
WITH (kafka_topic='event_type_by_1hour_tumbl_t'
, partitions=8
, value_format='AVRO'
, window_type='Tumbling'
, window_size='60 minutes');
SELECT winstart
, collect_list(eventType)
, collect_list(nof)
FROM event_type_by_1hour_tumbl_s
GROUP BY winstart
EMIT CHANGES;
SELECT winstart, as_map(collect_list(eventType), collect_list(nof) ) as counts
FROM event_type_by_1hour_tumbl_s
GROUP BY winstart
EMIT CHANGES;
DROP TABLE tipboard_pie_t DELETE TOPIC;
CREATE TABLE tipboard_pie_t
WITH (value_format = 'JSON', kafka_topic = 'tipboard_pie_t', partitions=1)
AS
SELECT winstart
, 'pie_chart' AS tile
, 'pie' AS key
, tipboard_pie_chart('Last Hour', as_map(collect_list(eventType), collect_list(nof) )) as data
FROM event_type_by_1hour_tumbl_s
GROUP BY winstart
EMIT CHANGES;