Skip to content
Abele Mălan edited this page Feb 8, 2021 · 5 revisions

Getting started

The following example will go through setting up and using KafkaQuery. We will consume data from an example topic called person which contains messages of the following format

{	
	"name":"John Smith",
	"age":32,
	"height":"172cm"
}

Set up your environment

Download the latest KafkaQuery release here and extract its content.

For any usage of KafkaQuery you need to execute the kafkaquery script that can be found in the bin folder.

Specify your ZooKeeper and Kafka addresses:

By either

Setting environment variables for your ZooKeeper and Kafka addresses:
Property Default value Environment variable name (optional)
Kafka Address localhost:9092 KAFKA_ADDR
ZooKeeper Address localhost:2181 ZK_ADDR

Or

Specifying your ZooKeeper and Kafka addresses for every execution:

Always append the following options to your command when running the program

--zookeeper <address> --kafka <address>


For information on how to use the commands check out the help command:

./kafkaquery --help
KafkaQuery CLI
Usage: kafkaquery [options]

  -q, --query <query>      Allows querying available data sources through Flink SQL. query - valid Flink SQL query. More information about Flink SQL can be found at: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#operations. 
  -o, --output:<sink>=<param>
                           Writes the output data of the query to the given sink.
  -t, --timeout <seconds>  Specifies a timeout in seconds. If no message is received for the duration of the timeout the program terminates.
  -s, --start <value>      Specifies the start strategy for retrieving records from Kafka.p
  --schema <topic_name>    Output the specified topic's schema which entails the field names and types.
  --topics                 List all topic names for which a schema is available.
  --update-schema:<topic_name>=<avro_Schema_file>
                           Updates the schema for the specified topic with the given Avro schema (as a file).
  --infer-schema <topic_name>
                           Infers and registers an Avro schema for the specified topic.
  --kafka <Kafka_address>  Sets the Kafka address for the execution.
  --zookeeper <ZK_address>
                           Sets the ZooKeeper address for the execution.
  --udf <function_file1,function_file2...>
                           Registers the specified User defined functions for usage in queries.
  -h, --help

Your first query

To perform queries on a topic a corresponding schema is needed!

Infer the schema of the person topic (at least one message is necessary)

./kafkaquery --infer-schema person

Successfully generated schema for topic person

Examine the generated schema

./kafkaquery --schema person
{
  "type" : "record",
  "name" : "person",
  "namespace" : "infer",
  "fields" : [ {
    "name" : "name",
    "type" : "string"
  }, {
    "name" : "age",
    "type" : "long"
  }, {
    "name" : "height",
    "type" : "string"
  } ]
}

Perform your first query

./kafkaquery --query "SELECT name, age FROM person"
John Smith,32
Galen Evan,24
Rowen Alexa,16
Celine Lita,26
Paula Bess,15
Noble Leanna,52
Tami Bethany,39
Jessye Joby,41
Ike Marlowe,12
Emmeline Vale,23

Congratulations on your first output! The program is still running and trying to fetch data from the topic. Make sure to stop it (Ctrl+c).

Next steps

Creating a schema for your topic

Check out the Flink SQL documentation

KafkaQuery commands

User-defined functions