Skip to content

Commit

Permalink
Feature/add-kafka-pubsub-schema-registry (#3946)
Browse files Browse the repository at this point in the history
* Added doc for kafka pubsub Avro schema registry support

Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com>

* Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md

Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com>
Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com>

* Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md

Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com>
Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com>

* Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md

Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com>
Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com>

* Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md

Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com>
Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com>

* Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md

Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com>
Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com>

* Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md

Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com>
Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com>

* Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md

Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com>
Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com>

* Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md

Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com>
Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com>

* Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md

Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com>
Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com>

* Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md

Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com>
Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com>

* Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md

Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com>
Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com>

* Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md

Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com>
Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com>

* Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md

Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com>
Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com>

* Update setup-apache-kafka.md

Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com>

* Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md

Co-authored-by: Mark Fussell <markfussell@gmail.com>
Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com>

---------

Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com>
Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com>
Co-authored-by: Mark Fussell <markfussell@gmail.com>
  • Loading branch information
3 people committed Jan 12, 2024
1 parent cc3d110 commit ae4e137
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ spec:
value: "2.0.0"
- name: direction
value: "input, output"
- name: schemaRegistryURL # Optional. When using Schema Registry Avro serialization/deserialization. The Schema Registry URL.
value: http://localhost:8081
- name: schemaRegistryAPIKey # Optional. When using Schema Registry Avro serialization/deserialization. The Schema Registry API Key.
value: XYAXXAZ
- name: schemaRegistryAPISecret # Optional. When using Schema Registry Avro serialization/deserialization. The Schema Registry credentials API Secret.
value: "ABCDEFGMEADFF"
- name: schemaCachingEnabled # Optional. When using Schema Registry Avro serialization/deserialization. Enables caching for schemas.
value: true
- name: schemaLatestVersionCacheTTL # Optional. When using Schema Registry Avro serialization/deserialization. The TTL for schema caching when publishing a message with latest schema available.
value: 5m
```

## Spec metadata fields
Expand All @@ -75,6 +85,11 @@ spec:
| `version` | N | Input/Output | Kafka cluster version. Defaults to 2.0.0. Please note that this needs to be mandatorily set to `1.0.0` for EventHubs with Kafka. | `"1.0.0"` |
| `direction` | N | Input/Output | The direction of the binding. | `"input"`, `"output"`, `"input, output"` |
| `oidcExtensions` | N | Input/Output | String containing a JSON-encoded dictionary of OAuth2/OIDC extensions to request with the access token | `{"cluster":"kafka","poolid":"kafkapool"}` |
| `schemaRegistryURL` | N | Required when using Schema Registry Avro serialization/deserialization. The Schema Registry URL. | `http://localhost:8081` |
| `schemaRegistryAPIKey` | N | When using Schema Registry Avro serialization/deserialization. The Schema Registry credentials API Key. | `XYAXXAZ` |
| `schemaRegistryAPISecret` | N | When using Schema Registry Avro serialization/deserialization. The Schema Registry credentials API Secret. | `ABCDEFGMEADFF` |
| `schemaCachingEnabled` | N | When using Schema Registry Avro serialization/deserialization. Enables caching for schemas. Default is `true` | `true` |
| `schemaLatestVersionCacheTTL` | N | When using Schema Registry Avro serialization/deserialization. The TTL for schema caching when publishing a message with latest schema available. Default is 5 min | `5m` |

#### Note
The metadata `version` must be set to `1.0.0` when using Azure EventHubs with Kafka.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,17 @@ spec:
value: 2.0.0
- name: disableTls # Optional. Disable TLS. This is not safe for production!! You should read the `Mutual TLS` section for how to use TLS.
value: "true"
- name: schemaRegistryURL # Optional. When using Schema Registry Avro serialization/deserialization. The Schema Registry URL.
value: http://localhost:8081
- name: schemaRegistryAPIKey # Optional. When using Schema Registry Avro serialization/deserialization. The Schema Registry API Key.
value: XYAXXAZ
- name: schemaRegistryAPISecret # Optional. When using Schema Registry Avro serialization/deserialization. The Schema Registry credentials API Secret.
value: "ABCDEFGMEADFF"
- name: schemaCachingEnabled # Optional. When using Schema Registry Avro serialization/deserialization. Enables caching for schemas.
value: true
- name: schemaLatestVersionCacheTTL # Optional. When using Schema Registry Avro serialization/deserialization. The TTL for schema caching when publishing a message with latest schema available.
value: 5m

```

> For details on using `secretKeyRef`, see the guide on [how to reference secrets in components]({{< ref component-secrets.md >}}).
Expand Down Expand Up @@ -81,6 +92,11 @@ spec:
| oidcClientSecret | N | The OAuth2 client secret that has been provisioned in the identity provider: Required when `authType` is set to `oidc` | `"KeFg23!"` |
| oidcScopes | N | Comma-delimited list of OAuth2/OIDC scopes to request with the access token. Recommended when `authType` is set to `oidc`. Defaults to `"openid"` | `"openid,kafka-prod"` |
| oidcExtensions | N | Input/Output | String containing a JSON-encoded dictionary of OAuth2/OIDC extensions to request with the access token | `{"cluster":"kafka","poolid":"kafkapool"}` |
| schemaRegistryURL | N | Required when using Schema Registry Avro serialization/deserialization. The Schema Registry URL. | `http://localhost:8081` |
| schemaRegistryAPIKey | N | When using Schema Registry Avro serialization/deserialization. The Schema Registry credentials API Key. | `XYAXXAZ` |
| schemaRegistryAPISecret | N | When using Schema Registry Avro serialization/deserialization. The Schema Registry credentials API Secret. | `ABCDEFGMEADFF` |
| schemaCachingEnabled | N | When using Schema Registry Avro serialization/deserialization. Enables caching for schemas. Default is `true` | `true` |
| schemaLatestVersionCacheTTL | N | When using Schema Registry Avro serialization/deserialization. The TTL for schema caching when publishing a message with latest schema available. Default is 5 min | `5m` |

The `secretKeyRef` above is referencing a [kubernetes secrets store]({{< ref kubernetes-secret-store.md >}}) to access the tls information. Visit [here]({{< ref setup-secret-store.md >}}) to learn more about how to configure a secret store component.

Expand Down Expand Up @@ -348,6 +364,102 @@ curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.correla
}'
```

## Avro Schema Registry serialization/deserialization
You can configure pub/sub to publish or consume data encoded using [Avro binary serialization](https://avro.apache.org/docs/), leveraging an [Apache Schema Registry](https://developer.confluent.io/courses/apache-kafka/schema-registry/) (for example, [Confluent Schema Registry](https://developer.confluent.io/courses/apache-kafka/schema-registry/), [Apicurio](https://www.apicur.io/registry/)).

### Configuration

{{% alert title="Important" color="warning" %}}
Currently, only message value serialization/deserialization is supported. Since cloud events are not supported, the `rawPayload=true` metadata must be passed.
{{% /alert %}}

When configuring the Kafka pub/sub component metadata, you must define:
- The schema registry URL
- The API key/secret, if applicable

Schema subjects are automatically derived from topic names, using the standard naming convention. For example, for a topic named `my-topic`, the schema subject will be `my-topic-value`.
When interacting with the message payload within the service, it is in JSON format. The payload is transparently serialized/deserialized within the Dapr component.
Date/Datetime fields must be passed as their [Epoch Unix timestamp](https://en.wikipedia.org/wiki/Unix_time) equivalent (rather than typical Iso8601). For example:
- `2024-01-10T04:36:05.986Z` should be passed as `1704861365986` (the number of milliseconds since Jan 1st, 1970)
- `2024-01-10` should be passed as `19732` (the number of days since Jan 1st, 1970)

### Publishing Avro messages
In order to indicate to the Kafka pub/sub component that the message should be using Avro serialization, the `valueSchemaType` metadata must be set to `Avro`.

{{< tabs curl "Python SDK">}}

{{% codetab %}}
```bash
curl -X "POST" http://localhost:3500/v1.0/publish/pubsub/my-topic?metadata.rawPayload=true&metadata.valueSchemaType=Avro -H "Content-Type: application/json" -d '{"order_number": "345", "created_date": 1704861365986}'
```
{{% /codetab %}}

{{% codetab %}}
```python
from dapr.clients import DaprClient

with DaprClient() as d:
req_data = {
'order_number': '345',
'created_date': 1704861365986
}
# Create a typed message with content type and body
resp = d.publish_event(
pubsub_name='pubsub',
topic_name='my-topic',
data=json.dumps(req_data),
publish_metadata={'rawPayload': 'true', 'valueSchemaType': 'Avro'}
)
# Print the request
print(req_data, flush=True)
```
{{% /codetab %}}

{{< /tabs >}}


### Subscribing to Avro topics
In order to indicate to the Kafka pub/sub component that the message should be deserialized using Avro, the `valueSchemaType` metadata must be set to `Avro` in the subscription metadata.

{{< tabs "Python (FastAPI)" >}}

{{% codetab %}}

```python
from fastapi import APIRouter, Body, Response, status
import json
import sys

app = FastAPI()

router = APIRouter()


@router.get('/dapr/subscribe')
def subscribe():
subscriptions = [{'pubsubname': 'pubsub',
'topic': 'my-topic',
'route': 'my_topic_subscriber',
'metadata': {
'rawPayload': 'true',
'valueSchemaType': 'Avro',
} }]
return subscriptions

@router.post('/my_topic_subscriber')
def my_topic_subscriber(event_data=Body()):
print(event_data, flush=True)
return Response(status_code=status.HTTP_200_OK)
```

app.include_router(router)

{{% /codetab %}}

{{< /tabs >}}



## Create a Kafka instance

{{< tabs "Self-Hosted" "Kubernetes">}}
Expand Down

0 comments on commit ae4e137

Please sign in to comment.