Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Kafka message headers #1718

Merged
merged 2 commits into from
May 6, 2024
Merged

Support for Kafka message headers #1718

merged 2 commits into from
May 6, 2024

Conversation

ryzhyk
Copy link
Contributor

@ryzhyk ryzhyk commented May 6, 2024

Added headers field to Kafka output connector config to specify a list
of headers to be added to each message.

Kafka headers are a list of key-value pairs. Keys are strings. The
same key can occur multiple times, therefore the ordering is important.
The value component of the header is an optional byte array. For
convenience, we allow specifying it as an actual array of bytes in JSON
([1,2,3,4,5]) or as a UTF8-encoded string ("foobar"). Here is an
example Kafka connector config with headers that I added to the SecOps
demo:

"config": {
    "format": {
        "name": "avro",
        "config": {
            "schema": schema,
            "registry_urls": [pipeline_to_schema_registry],
        }
    },
    "transport": {
        "name": "kafka_output",
        "config": {
            "bootstrap.servers": pipeline_to_redpanda_server,
            "topic": "secops_vulnerability_stats_avro",
            "headers": [{"key": "header1", "value": "this is a string"},
                        {"key": "header2", "value": list(b'byte array')}]
        }
    }
}

Is this a user-visible change (yes/no): yes

@ryzhyk ryzhyk added the adapters Issues related to the adapters crate label May 6, 2024
@ryzhyk ryzhyk requested a review from lalithsuresh May 6, 2024 16:55
Leonid Ryzhyk added 2 commits May 6, 2024 12:06
Th `optimized` profile creates artifacts in `target/optimized`, not
`target/release`.

Signed-off-by: Leonid Ryzhyk <leonid@feldera.com>
Added `headers` field to Kafka output connector config to specify a list
of headers to be added to each message.

Kafka headers are a list of key-value pairs.  Keys are strings.  The
same key can occur multiple times, therefore the ordering is important.
The value component of the header is an optional byte array.  For
convenience, we allow specifying it as an actual array of bytes in JSON
(`[1,2,3,4,5]`) or as a UTF8-encoded string ("foobar").  Here is an
example Kafka connector config with headers that I added to the SecOps
demo:

```
"config": {
    "format": {
        "name": "avro",
        "config": {
            "schema": schema,
            "registry_urls": [pipeline_to_schema_registry],
        }
    },
    "transport": {
        "name": "kafka_output",
        "config": {
            "bootstrap.servers": pipeline_to_redpanda_server,
            "topic": "secops_vulnerability_stats_avro",
            "headers": [{"key": "header1", "value": "this is a string"},
                        {"key": "header2", "value": list(b'byte array')}]
        }
    }
}
```

Signed-off-by: Leonid Ryzhyk <leonid@feldera.com>
@ryzhyk ryzhyk marked this pull request as ready for review May 6, 2024 19:06
@ryzhyk ryzhyk merged commit b90668f into main May 6, 2024
5 checks passed
@ryzhyk ryzhyk deleted the kafka_headers branch May 6, 2024 19:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
adapters Issues related to the adapters crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants