This repository contains two transformations which can be of help when dealing with the transfer of JSON records into Splunk.
The intention of this transformer is to mimic internal Splunk transformations and configuration parameters; SOURCE_KEY, DEST_KEY, FORMAT and REGEX. This allows us to offload the processing of log messages into Kafka Connect workers. The transformer was designed to work in conjunction with Kafka Connect Sink for Splunk in which you can enable the splunk.header.support configuration parameter to support the transfer of Kafka record header fields to Splunk metadata (e.g. index, source, sourcetype, host).
| Name | Description | Default Value |
|---|---|---|
source.key |
Name of the field on which (either itself or its value) we want to apply some changes. Nested fields are also supported utilizing the dotted form, e.g. "config.app.id". If the source.key parameter contains a dot (.), it is automatically considered as nested. |
|
source.preserve |
An option for preserving the original source.key field in the Kafka record body when the dest.key field is specified. The source.key field can thus be left unchanged in the Kafka record body. |
false |
dest.key |
If dest.key is specified, the transformation will rename source.key field to dest.key. dest.key cannot point to the same field as source.key does. |
|
dest.toHeader |
Set to true if you want to put the final field (source.key or dest.key if specified) into Kafka record headers. The final field is then removed from the Kafka record body. |
false |
regex.pattern |
An option to apply a regex to the value of the source.key. regex.format option needs to be specified. Capture groups are supported. |
|
regex.format |
An option to apply final formatting on the source.key value. Capture groups from the regex can be used using dollar syntax e.g. $1. |
|
regex.defaultValue |
An option to provide a default value for the target field, if the source.key value does not match the regex pattern. regex.pattern and regex.format must be specified. |
- If the
source.keyparameter contains a dot (.) character, it is automatically considered as nested. - If the
source.keyis the only field nested in the parent object and thesource.keyis renamed by using thedest.key(source.preservedefaults tofalse) then the key is put into the root of the JSON message. The parent object of thesource.keyfield is left empty, and it is not removed.- Example:
{"nested": {"renameMe": "value"}}=>{"nested": {}, "renamedKey": "value"}
- Example:
- Despite the fact that the
dest.keycan contain dots (.), it is in NO way considered as the nested field and its key is always processed at once without creating any nested structures.- Example:
{"nested": {"renameMe": "value"}}=>{"nested": {}, "renamed.key": "value"}
- Example:
source.keyparameter cannot point to the object. It must be the final element (field). If it points to the object, the Kafka record is returned unchanged.- Example:
{"nested": {"renameMe": "value"}}and (source.key="nested") => unchanged Kafka record
- Example:
regex.pattern,regex.formatandregex.defaultValueare applied in place ifdest.keyis not specified.- Example:
{"nested": {"key": "value"}}=>{"nested": {"key": "applied format or default value"}}
- Example:
java.util.regexpackage is used to work with regular expressions.- If
regex.patternis specified, but there is no match on the value of thesource.keyfield, theregex.defaultValueis returned if it is specified. Otherwise, the Kafka record is returned unchanged (without any other transformations).
The intention of this transformer is to add filtering capabilities similar to those in Kafka Connect versions 2.6 and above using the org.apache.kafka.connect.transforms.Filter transformer. For earlier versions, the KIP-585: Filter and Conditional SMTs is not yet included and org.apache.kafka.connect.transforms.Filter SMT in conjuction with Predicate interface cannot be used.
| Name | Description | Default Value |
|---|---|---|
headerKey |
Name of the header key. If such a header key exists in the Kafka record, the whole message will be discarded unless isNegate is set to true to reverse this condition. |
|
isNegate |
Set to true to negate filtering of messages with specified headerKey. |
false |
Here is an example configuration for the Splunk and Filter transformers as discussed above - the transforms field contains an ordered list of transformers you want to apply.
{
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
"tasks.max": "1",
"topics": "LogDNATopic",
"splunk.header.support": "true",
"splunk.hec.uri": "<SPLUNK_HEC_URIs>",
"splunk.hec.token": "<SPLUNK_TOKEN>",
"transforms": "my_custom_transform_1,my_custom_transform_2,discard_if_no_index_in_header",
"transforms.my_custom_transform_1.type": "com.ibm.garage.kafka.connect.transforms.Splunk",
"transforms.my_custom_transform_1.source.key": "source_key1",
"transforms.my_custom_transform_1.source.preserve": true,
"transforms.my_custom_transform_1.dest.key": "splunk.header.index",
"transforms.my_custom_transform_1.dest.toHeader": true,
"transforms.my_custom_transform_1.regex.pattern": "^(.*)$",
"transforms.my_custom_transform_1.regex.format": "my_custom_$1_format",
"transforms.my_custom_transform_1.regex.defaultValue": "my default value",
"transforms.my_custom_transform_2.type": "com.ibm.garage.kafka.connect.transforms.Splunk",
"transforms.my_custom_transform_2.source.key": "source_key1",
"transforms.my_custom_transform_2.dest.key": "source_key1_renamed",
"transforms.discard_if_no_index_in_header.type": "com.ibm.garage.kafka.connect.transforms.Filter",
"transforms.discard_if_no_index_in_header.headerKey": "splunk.header.index",
"transforms.discard_if_no_index_in_header.isNegate": true
}- The configuration includes three transformations ordered in the pipeline (see
transformsfield). - The first transformation (labeled
my_custom_transform_1):- It takes the value of the
source_key1field if it is found. - If the
regex.patternmatches, it applies theregex.format, which results inmy_custom_<value of source_key1 field>_format. - If the
regex.patterndoes not match, theregex.defaultValueis used as the value instead. - Since the
dest.keyis specified, a new field withdest.keykey is created (which meanssplunk.header.index). The value is taken from the previous points. source.preserveis set totrue, so the originalsource.keyis not removed from the Kafka record. It is left intact and available to the next transformation.dest.toHeaderis set totrue, so newly createdsplunk.header.indexfield and its new value are moved to the Kafka record headers. The field is then removed from the Kafka record body.
- It takes the value of the
- The second transformation (labeled
my_custom_transform_2):- It creates a new
source_key1_renamedfield, and it takes the value from the original source keysource_key1. - Since
source.preservedefaults tofalse, the originalsource_key1field is removed from the Kafka record.
- It creates a new
- The third transformation (labeled
discard_if_no_index_in_header):- If the header with key
splunk.header.indexexists in the Kafka record, the message is not discarded (because the condition is negated byisNegateset totrue). If the header with keysplunk.header.indexdoes not exist in the Kafka record, the message is discarded.
- If the header with key
Please, use Gradle version 7.0+:
gradle jar
gradle clean test
These transformers rely on the message being in JSON format and the connect worker to be configured to use JSON value and key converters. To do this, the connect worker will need to have the following properties set:
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
Unless you are specifying a schema, you will also need to set the following properties:
key.converter.schemas.enable=false
value.converter.schemas.enable=false
In order to make the transformers available to your Kafka connect worker, place the built jar on a supported plugin path in your worker environment.