Skip to content

Kafka Connect plugin that adds support for JMESPath queries

License

Notifications You must be signed in to change notification settings

denisw/kafka-connect-jmespath

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

5 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Kafka Connect JMESPath Plugin

A Kafka Connect plugin makes the JMESPath query language available to connectors. It currently comes with the following components:

Installation

You can install or download the latest version of the plugin from Confluent Hub.

MatchesJMESPath Predicates

The de.denisw.kafka.connect.jmespath.MatchesJMESPath$Key and de.denisw.kafka.connect.jmespath.MatchesJMESPath$Value transformation predicates applies a JMESPath query to the key or value of each record. It matches if the query yields a "true value" according to the JMESPath specification, that is, any value except the following:

  • null
  • false
  • empty string
  • empty array ([])
  • empty object ({})

The MatchesJMESPath predicates are designed to be used with Kafka Connect's built-in Filter transformation to drop all records matching a JMESPath query (see the examples below). However, they can more generally be used to apply any Single Message Transformation conditionally based on the data in the record key or value.

Note that unlike the filter operation known from many programming languages and libraries (including Kafka Streams), the Filter transformation drops records matching the predicate rather than keeping them. The JMESPath query should thus match records that should NOT be kept. Alternatively, you can enable the negate option of the Filter transformation to reverse the predicate.

Configuration Examples

Skip records whose nested publishDate.year field is below 2000:

"transforms": "Filter",
"transforms.Filter.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.Filter.predicate": "Before2000",

"predicates": "Before2000",
"predicates.Before2000.type": "de.denisw.kafka.connect.jmespath.MatchesJMESPath$Value",
"predicates.Before2000.query": "publishDate.year < `2000`"

Process only records whose author equals "Stephen Hawking", using Filter's negate option to reverse the predicate:

"transforms": "Filter",
"transforms.Filter.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.Filter.predicate": "Author",
"transforms.Filter.negate": "true",
        
"predicates": "Author",
"predicates.Author.type": "de.denisw.kafka.connect.jmespath.MatchesJMESPath$Value",
"predicates.Author.query": "author == 'Stephen Hawking'"

Drop records with null keys, using JMESPath's @ syntax to match the whole key:

"transforms": "Filter",
"transforms.Filter.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.Filter.predicate": "NullKey",

"predicates": "NullKey",
"predicates.NullKey.type": "de.denisw.kafka.connect.jmespath.MatchesJMESPath$Key",
"predicates.NullKey.query": "@ == `null`"

Configuration Reference

query

The JMESPath query to apply to the key or value data. See the JMESPath tutorial, examples and [specification][jmespath-spec] to learn about the supported syntax.

Demo

See the demo subfolder for a Docker-based setup to test the Kafka Connect JMESPath plugin locally.

License

This codebase is licensed under the Apache License 2.0. See the LICENSE file for more details.

The JMESPath logo (images/jmespath-logo.png) is taken from the JMESPath website codebase and is licensed under the Creative Commons license (CC BY 4.0).

About

Kafka Connect plugin that adds support for JMESPath queries

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Languages