From 1f2a65be442efec9cfc092410b68135105dc3513 Mon Sep 17 00:00:00 2001 From: Anton Troshin Date: Fri, 30 Aug 2024 20:10:05 -0500 Subject: [PATCH 1/3] Add Kafka escapeHeaders documentation Signed-off-by: Anton Troshin --- .../supported-bindings/kafka.md | 3 + .../supported-pubsub/setup-apache-kafka.md | 62 +++++++++++++++++++ 2 files changed, 65 insertions(+) diff --git a/daprdocs/content/en/reference/components-reference/supported-bindings/kafka.md b/daprdocs/content/en/reference/components-reference/supported-bindings/kafka.md index addfba98a8c..937d666819e 100644 --- a/daprdocs/content/en/reference/components-reference/supported-bindings/kafka.md +++ b/daprdocs/content/en/reference/components-reference/supported-bindings/kafka.md @@ -63,6 +63,8 @@ spec: value: true - name: schemaLatestVersionCacheTTL # Optional. When using Schema Registry Avro serialization/deserialization. The TTL for schema caching when publishing a message with latest schema available. value: 5m + - name: escapeHeaders # Optional. + value: false ``` ## Spec metadata fields @@ -99,6 +101,7 @@ spec: | `consumerFetchDefault` | N | Input/Output | The default number of message bytes to fetch from the broker in each request. Default is `"1048576"` bytes. | `"2097152"` | | `heartbeatInterval` | N | Input | The interval between heartbeats to the consumer coordinator. At most, the value should be set to a 1/3 of the `sessionTimeout` value. Defaults to `"3s"`. | `"5s"` | | `sessionTimeout` | N | Input | The timeout used to detect client failures when using Kafka’s group management facility. If the broker fails to receive any heartbeats from the consumer before the expiration of this session timeout, then the consumer is removed and initiates a rebalance. Defaults to `"10s"`. | `"20s"` | +| `escapeHeaders` | N | Input/Output | Enables URL escaping of the message header values. It allows sending headers with special characters that are usually not allowed in HTTP headers. Default is `false` | `true` | #### Note The metadata `version` must be set to `1.0.0` when using Azure EventHubs with Kafka. diff --git a/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md b/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md index cafcee537fe..945833b3c1c 100644 --- a/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md +++ b/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md @@ -63,6 +63,8 @@ spec: value: true - name: schemaLatestVersionCacheTTL # Optional. When using Schema Registry Avro serialization/deserialization. The TTL for schema caching when publishing a message with latest schema available. value: 5m + - name: escapeHeaders # Optional. + value: false ``` @@ -112,6 +114,7 @@ spec: | consumerFetchDefault | N | The default number of message bytes to fetch from the broker in each request. Default is `"1048576"` bytes. | `"2097152"` | | heartbeatInterval | N | The interval between heartbeats to the consumer coordinator. At most, the value should be set to a 1/3 of the `sessionTimeout` value. Defaults to "3s". | `"5s"` | | sessionTimeout | N | The timeout used to detect client failures when using Kafka’s group management facility. If the broker fails to receive any heartbeats from the consumer before the expiration of this session timeout, then the consumer is removed and initiates a rebalance. Defaults to "10s". | `"20s"` | +| escapeHeaders | N | Enables URL escaping of the message header values. It allows sending headers with special characters that are usually not allowed in HTTP headers. Default is `false` | `true` | The `secretKeyRef` above is referencing a [kubernetes secrets store]({{< ref kubernetes-secret-store.md >}}) to access the tls information. Visit [here]({{< ref setup-secret-store.md >}}) to learn more about how to configure a secret store component. @@ -597,6 +600,65 @@ To run Kafka on Kubernetes, you can use any Kafka operator, such as [Strimzi](ht {{< /tabs >}} +## Sending headers with special characters +The application may require to transfer a special characters in message metadata, it will be sent to the subscriber application via HTTP headers. +HTTP header values must follow certain specifications and some characters are not allowed, see [here](https://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.2). +Using `escapeHeaders` settings will allow to attach metadata with the special characters, which will be safely URL Encoded, and passed to the subscriber. + +Setting `escapeHeaders` to `true` +```yaml +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: kafka-pubsub-escape-headers +spec: + type: pubsub.kafka + version: v1 + metadata: + - name: brokers # Required. Kafka broker connection setting + value: "dapr-kafka.myapp.svc.cluster.local:9092" + - name: consumerGroup # Optional. Used for input bindings. + value: "group1" + - name: clientID # Optional. Used as client tracing ID by Kafka brokers. + value: "my-dapr-app-id" + - name: authType # Required. + value: "none" + - name: escapeHeaders + value: "true" +``` + +Example Go publisher: +```Go +package main + +import ( + "context" + "log/slog" + + dapr "github.com/dapr/go-sdk/client" +) + +func main() { + client, err := dapr.NewClient() + if err != nil { + slog.Error("error creating dapr client", "error", err) + return + } + + defer client.Close() + + var metadata = map[string]string{"my-blog-header":"my/cool+blog&about,stuff"} + + ctx := context.Background() + err = client.PublishEvent(ctx, "kafka-pubsub", "kafka-topic", []byte("blog message"), dapr.PublishEventWithMetadata(metadata)) + if err != nil { + slog.Error("error publishing event") + } +} + +``` + + ## Related links - [Basic schema for a Dapr component]({{< ref component-schema >}}) - Read [this guide]({{< ref "howto-publish-subscribe.md##step-1-setup-the-pubsub-component" >}}) for instructions on configuring pub/sub components From 2df318f87f749d4814d9c61cf651b9d557633888 Mon Sep 17 00:00:00 2001 From: Anton Troshin Date: Sat, 31 Aug 2024 02:09:03 -0500 Subject: [PATCH 2/3] update the escapeHeaders setting docs Signed-off-by: Anton Troshin --- .../supported-bindings/kafka.md | 2 +- .../supported-pubsub/setup-apache-kafka.md | 90 +++++++------------ 2 files changed, 32 insertions(+), 60 deletions(-) diff --git a/daprdocs/content/en/reference/components-reference/supported-bindings/kafka.md b/daprdocs/content/en/reference/components-reference/supported-bindings/kafka.md index 937d666819e..851d0b41ae3 100644 --- a/daprdocs/content/en/reference/components-reference/supported-bindings/kafka.md +++ b/daprdocs/content/en/reference/components-reference/supported-bindings/kafka.md @@ -101,7 +101,7 @@ spec: | `consumerFetchDefault` | N | Input/Output | The default number of message bytes to fetch from the broker in each request. Default is `"1048576"` bytes. | `"2097152"` | | `heartbeatInterval` | N | Input | The interval between heartbeats to the consumer coordinator. At most, the value should be set to a 1/3 of the `sessionTimeout` value. Defaults to `"3s"`. | `"5s"` | | `sessionTimeout` | N | Input | The timeout used to detect client failures when using Kafka’s group management facility. If the broker fails to receive any heartbeats from the consumer before the expiration of this session timeout, then the consumer is removed and initiates a rebalance. Defaults to `"10s"`. | `"20s"` | -| `escapeHeaders` | N | Input/Output | Enables URL escaping of the message header values. It allows sending headers with special characters that are usually not allowed in HTTP headers. Default is `false` | `true` | +| `escapeHeaders` | N | Input | Enables URL escaping of the message header values received by the consumer. It allows receiving content with special characters that are usually not allowed in HTTP headers. Default is `false` | `true` | #### Note The metadata `version` must be set to `1.0.0` when using Azure EventHubs with Kafka. diff --git a/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md b/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md index 945833b3c1c..e86281d8ab4 100644 --- a/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md +++ b/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md @@ -114,7 +114,7 @@ spec: | consumerFetchDefault | N | The default number of message bytes to fetch from the broker in each request. Default is `"1048576"` bytes. | `"2097152"` | | heartbeatInterval | N | The interval between heartbeats to the consumer coordinator. At most, the value should be set to a 1/3 of the `sessionTimeout` value. Defaults to "3s". | `"5s"` | | sessionTimeout | N | The timeout used to detect client failures when using Kafka’s group management facility. If the broker fails to receive any heartbeats from the consumer before the expiration of this session timeout, then the consumer is removed and initiates a rebalance. Defaults to "10s". | `"20s"` | -| escapeHeaders | N | Enables URL escaping of the message header values. It allows sending headers with special characters that are usually not allowed in HTTP headers. Default is `false` | `true` | +| escapeHeaders | N | Enables URL escaping of the message header values received by the consumer. It allows receiving content with special characters that are usually not allowed in HTTP headers. Default is `false` | `true` | The `secretKeyRef` above is referencing a [kubernetes secrets store]({{< ref kubernetes-secret-store.md >}}) to access the tls information. Visit [here]({{< ref setup-secret-store.md >}}) to learn more about how to configure a secret store component. @@ -488,6 +488,36 @@ curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.correla }' ``` +## Receiving message headers with special characters + +The consumer application may be required to receive message headers that include special characters, which may cause HTTP protocol validation errors. +HTTP header values must follow specifications and some characters are not allowed, see [here](https://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.2). +In this case, you can enable `escapeHeaders` configuration setting which will use URL escaping to encode header values on the consumer side. + +**Note**: When using this setting, the received message headers will be URL escaped, and you will need to URL unescape it to get the original value. + +Setting `escapeHeaders` to `true` +```yaml +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: kafka-pubsub-escape-headers +spec: + type: pubsub.kafka + version: v1 + metadata: + - name: brokers # Required. Kafka broker connection setting + value: "dapr-kafka.myapp.svc.cluster.local:9092" + - name: consumerGroup # Optional. Used for input bindings. + value: "group1" + - name: clientID # Optional. Used as client tracing ID by Kafka brokers. + value: "my-dapr-app-id" + - name: authType # Required. + value: "none" + - name: escapeHeaders + value: "true" +``` + ## Avro Schema Registry serialization/deserialization You can configure pub/sub to publish or consume data encoded using [Avro binary serialization](https://avro.apache.org/docs/), leveraging an [Apache Schema Registry](https://developer.confluent.io/courses/apache-kafka/schema-registry/) (for example, [Confluent Schema Registry](https://developer.confluent.io/courses/apache-kafka/schema-registry/), [Apicurio](https://www.apicur.io/registry/)). @@ -600,64 +630,6 @@ To run Kafka on Kubernetes, you can use any Kafka operator, such as [Strimzi](ht {{< /tabs >}} -## Sending headers with special characters -The application may require to transfer a special characters in message metadata, it will be sent to the subscriber application via HTTP headers. -HTTP header values must follow certain specifications and some characters are not allowed, see [here](https://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.2). -Using `escapeHeaders` settings will allow to attach metadata with the special characters, which will be safely URL Encoded, and passed to the subscriber. - -Setting `escapeHeaders` to `true` -```yaml -apiVersion: dapr.io/v1alpha1 -kind: Component -metadata: - name: kafka-pubsub-escape-headers -spec: - type: pubsub.kafka - version: v1 - metadata: - - name: brokers # Required. Kafka broker connection setting - value: "dapr-kafka.myapp.svc.cluster.local:9092" - - name: consumerGroup # Optional. Used for input bindings. - value: "group1" - - name: clientID # Optional. Used as client tracing ID by Kafka brokers. - value: "my-dapr-app-id" - - name: authType # Required. - value: "none" - - name: escapeHeaders - value: "true" -``` - -Example Go publisher: -```Go -package main - -import ( - "context" - "log/slog" - - dapr "github.com/dapr/go-sdk/client" -) - -func main() { - client, err := dapr.NewClient() - if err != nil { - slog.Error("error creating dapr client", "error", err) - return - } - - defer client.Close() - - var metadata = map[string]string{"my-blog-header":"my/cool+blog&about,stuff"} - - ctx := context.Background() - err = client.PublishEvent(ctx, "kafka-pubsub", "kafka-topic", []byte("blog message"), dapr.PublishEventWithMetadata(metadata)) - if err != nil { - slog.Error("error publishing event") - } -} - -``` - ## Related links - [Basic schema for a Dapr component]({{< ref component-schema >}}) From 058f593b85c9e3d3592be5d9ad58ac13c2b8cc8e Mon Sep 17 00:00:00 2001 From: Anton Troshin Date: Wed, 4 Sep 2024 16:46:32 -0500 Subject: [PATCH 3/3] review fixes Signed-off-by: Anton Troshin --- .../supported-bindings/kafka.md | 2 +- .../supported-pubsub/setup-apache-kafka.md | 13 ++++++++----- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/daprdocs/content/en/reference/components-reference/supported-bindings/kafka.md b/daprdocs/content/en/reference/components-reference/supported-bindings/kafka.md index 851d0b41ae3..413e1893fe6 100644 --- a/daprdocs/content/en/reference/components-reference/supported-bindings/kafka.md +++ b/daprdocs/content/en/reference/components-reference/supported-bindings/kafka.md @@ -101,7 +101,7 @@ spec: | `consumerFetchDefault` | N | Input/Output | The default number of message bytes to fetch from the broker in each request. Default is `"1048576"` bytes. | `"2097152"` | | `heartbeatInterval` | N | Input | The interval between heartbeats to the consumer coordinator. At most, the value should be set to a 1/3 of the `sessionTimeout` value. Defaults to `"3s"`. | `"5s"` | | `sessionTimeout` | N | Input | The timeout used to detect client failures when using Kafka’s group management facility. If the broker fails to receive any heartbeats from the consumer before the expiration of this session timeout, then the consumer is removed and initiates a rebalance. Defaults to `"10s"`. | `"20s"` | -| `escapeHeaders` | N | Input | Enables URL escaping of the message header values received by the consumer. It allows receiving content with special characters that are usually not allowed in HTTP headers. Default is `false` | `true` | +| `escapeHeaders` | N | Input | Enables URL escaping of the message header values received by the consumer. Allows receiving content with special characters that are usually not allowed in HTTP headers. Default is `false`. | `true` | #### Note The metadata `version` must be set to `1.0.0` when using Azure EventHubs with Kafka. diff --git a/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md b/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md index e86281d8ab4..e6091d87e29 100644 --- a/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md +++ b/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md @@ -114,7 +114,7 @@ spec: | consumerFetchDefault | N | The default number of message bytes to fetch from the broker in each request. Default is `"1048576"` bytes. | `"2097152"` | | heartbeatInterval | N | The interval between heartbeats to the consumer coordinator. At most, the value should be set to a 1/3 of the `sessionTimeout` value. Defaults to "3s". | `"5s"` | | sessionTimeout | N | The timeout used to detect client failures when using Kafka’s group management facility. If the broker fails to receive any heartbeats from the consumer before the expiration of this session timeout, then the consumer is removed and initiates a rebalance. Defaults to "10s". | `"20s"` | -| escapeHeaders | N | Enables URL escaping of the message header values received by the consumer. It allows receiving content with special characters that are usually not allowed in HTTP headers. Default is `false` | `true` | +| escapeHeaders | N | Enables URL escaping of the message header values received by the consumer. Allows receiving content with special characters that are usually not allowed in HTTP headers. Default is `false`. | `true` | The `secretKeyRef` above is referencing a [kubernetes secrets store]({{< ref kubernetes-secret-store.md >}}) to access the tls information. Visit [here]({{< ref setup-secret-store.md >}}) to learn more about how to configure a secret store component. @@ -491,12 +491,15 @@ curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.correla ## Receiving message headers with special characters The consumer application may be required to receive message headers that include special characters, which may cause HTTP protocol validation errors. -HTTP header values must follow specifications and some characters are not allowed, see [here](https://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.2). -In this case, you can enable `escapeHeaders` configuration setting which will use URL escaping to encode header values on the consumer side. +HTTP header values must follow specifications, making some characters not allowed. [Learn more about the protocols](https://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.2). +In this case, you can enable `escapeHeaders` configuration setting, which uses URL escaping to encode header values on the consumer side. -**Note**: When using this setting, the received message headers will be URL escaped, and you will need to URL unescape it to get the original value. +{{% alert title="Note" color="primary" %}} +When using this setting, the received message headers are URL escaped, and you need to URL "un-escape" it to get the original value. +{{% /alert %}} + +Set `escapeHeaders` to `true` to URL escape. -Setting `escapeHeaders` to `true` ```yaml apiVersion: dapr.io/v1alpha1 kind: Component