Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
95e8b88
Remove backoff from pubsub
shivamkm07 Dec 2, 2022
291574a
Update daprdocs/content/en/reference/components-reference/supported-p…
shivamkm07 Dec 11, 2022
7b96abc
Merge branch 'v1.9' into fix-mqttsetup-1.9
msfussell Dec 14, 2022
3375770
Merge branch 'v1.9' into fix-mqttsetup-1.9
hhunter-ms Dec 14, 2022
948f8d3
Merge branch 'v1.9' into fix-mqttsetup-1.9
msfussell Dec 16, 2022
ad68e44
Merge branch 'v1.9' into fix-mqttsetup-1.9
msfussell Dec 16, 2022
629a267
Merge branch 'v1.9' into fix-mqttsetup-1.9
shivamkm07 Dec 20, 2022
eada02a
Merge branch 'v1.9' into fix-mqttsetup-1.9
shivamkm07 Dec 28, 2022
08f079e
Merge branch 'v1.9' into fix-mqttsetup-1.9
hhunter-ms Jan 3, 2023
e466609
Merge branch 'v1.9' into fix-mqttsetup-1.9
hhunter-ms Jan 3, 2023
933250e
Merge branch 'v1.9' into fix-mqttsetup-1.9
msfussell Jan 4, 2023
d7e71a6
Adding retry messages
Jan 14, 2023
78923a8
Merge branch 'v1.9' into fix-mqttsetup-1.9
shivamkm07 Jan 14, 2023
bd089d7
Merge branch 'v1.9' into fix-mqttsetup-1.9
shivamkm07 Jan 17, 2023
79a79fe
Merge branch 'v1.9' into fix-mqttsetup-1.9
hhunter-ms Jan 17, 2023
2351868
Merge branch 'v1.9' into fix-mqttsetup-1.9
shivamkm07 Jan 19, 2023
784387d
Update daprdocs/content/en/reference/components-reference/supported-p…
msfussell Jan 20, 2023
3ccce5b
Merge branch 'v1.9' into fix-mqttsetup-1.9
msfussell Jan 24, 2023
01b7887
Apply suggestions from code review
shivamkm07 Jan 24, 2023
2a45d79
Merge branch 'v1.9' into fix-mqttsetup-1.9
shivamkm07 Jan 26, 2023
937ad7a
Merge branch 'v1.9' into fix-mqttsetup-1.9
shivamkm07 Jan 30, 2023
d78a255
Fixing broken link
shivamkm07 Feb 1, 2023
11c4ee4
Merge branch 'v1.9' into fix-mqttsetup-1.9
hhunter-ms Feb 1, 2023
38e36c2
Merge branch 'v1.9' into fix-mqttsetup-1.9
msfussell Feb 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ spec:
value: "false"
- name: cleanSession
value: "false"
- name: backOffMaxRetries
value: "0"
```

{{% alert title="Warning" color="warning" %}}
Expand All @@ -49,6 +47,19 @@ The above example uses secrets as plain strings. It is recommended to use a secr
| caCert | Required for using TLS | Certificate Authority (CA) certificate in PEM format for verifying server TLS certificates. | `"-----BEGIN CERTIFICATE-----\n<base64-encoded DER>\n-----END CERTIFICATE-----"`
| clientCert | Required for using TLS | TLS client certificate in PEM format. Must be used with `clientKey`. | `"-----BEGIN CERTIFICATE-----\n<base64-encoded DER>\n-----END CERTIFICATE-----"`
| clientKey | Required for using TLS | TLS client key in PEM format. Must be used with `clientCert`. Can be `secretKeyRef` to use a secret reference. | `"-----BEGIN RSA PRIVATE KEY-----\n<base64-encoded PKCS8>\n-----END RSA PRIVATE KEY-----"`

### Enabling message delivery retries

The MQTT pub/sub component has no built-in support for retry strategies. This means that the sidecar sends a message to the service only once. If the service marks the message as not processed, the message won't be acknowledged back to the broker. Only if broker resends the message, would it would be retried.

To make Dapr use more spohisticated retry policies, you can apply a [retry resiliency policy]({{< ref "policies.md#retries" >}}) to the MQTT pub/sub component.

There is a crucial difference between the two ways of retries:

1. Re-delivery of unacknowledged messages is completely dependent on the broker. Dapr does not guarantee it. Some brokers like [emqx](https://www.emqx.io/), [vernemq](https://vernemq.com/) etc. support it but it not a part of [MQTT3 spec](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718103).

2. Using a [retry resiliency policy]({{< ref "policies.md#retries" >}}) makes the same Dapr sidecar retry redelivering the messages. So it is the same Dapr sidecar and the same app receiving the same message.

### Communication using TLS

To configure communication using TLS, ensure that the MQTT broker (e.g. mosquitto) is configured to support certificates and provide the `caCert`, `clientCert`, `clientKey` metadata in the component configuration. For example:
Expand All @@ -70,8 +81,6 @@ spec:
value: "false"
- name: cleanSession
value: "false"
- name: backoffMaxRetries
value: "0"
- name: caCert
value: ${{ myLoadedCACert }}
- name: clientCert
Expand Down Expand Up @@ -109,8 +118,6 @@ spec:
value: "false"
- name: cleanSession
value: "true"
- name: backoffMaxRetries
value: "0"
```

{{% alert title="Warning" color="warning" %}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ spec:
value: "default"
- name: persistent
value: "true"
- name: backOffPolicy
value: "constant"
- name: backOffMaxRetries
value: "-1"
- name: disableBatching
value: "false"
```
Expand All @@ -49,20 +45,16 @@ spec:
| token | N | Enable Authentication. | [How to create pulsar token](https://pulsar.apache.org/docs/en/security-jwt/#generate-tokens)|
| tenant | N | The topic tenant within the instance. Tenants are essential to multi-tenancy in Pulsar, and spread across clusters. Default: `"public"` | `"public"` |
| namespace | N | The administrative unit of the topic, which acts as a grouping mechanism for related topics. Default: `"default"` | `"default"`
| persistent | N | Pulsar supports two kind of topics: [persistent](https://pulsar.apache.org/docs/en/concepts-architecture-overview#persistent-storage) and [non-persistent](https://pulsar.apache.org/docs/en/concepts-messaging/#non-persistent-topics). With persistent topics, all messages are durably persisted on disks (if the broker is not standalone, messages are durably persisted on multiple disks), whereas data for non-persistent topics is not persisted to storage disks. Note: the default retry behavior is to retry until it succeeds, so when you use a non-persistent theme, you can reduce or prohibit retries by defining `backOffMaxRetries` to `0`. Default: `"true"` | `"true"`, `"false"`
| backOffPolicy | N | Retry policy, `"constant"` is a backoff policy that always returns the same backoff delay. `"exponential"` is a backoff policy that increases the backoff period for each retry attempt using a randomization function that grows exponentially. Defaults to `"constant"`. | `constant`、`exponential` |
| backOffDuration | N | The fixed interval only takes effect when the `backOffPolicy` is `"constant"`. There are two valid formats, one is the fraction with a unit suffix format, and the other is the pure digital format that is processed as milliseconds. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". Defaults to `"5s"`. | `"5s"`、`"5000"` |
| backOffInitialInterval | N | The backoff initial interval on retry. Only takes effect when the `backOffPolicy` is `"exponential"`. There are two valid formats, one is the fraction with a unit suffix format, and the other is the pure digital format that is processed as milliseconds. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". Defaults to `"500"` | `"50"` |
| backOffMaxInterval | N | The backoff initial interval on retry. Only takes effect when the `backOffPolicy` is `"exponential"`. There are two valid formats, one is the fraction with a unit suffix format, and the other is the pure digital format that is processed as milliseconds. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". Defaults to `"60s"` | `"60000"` |
| backOffMaxRetries | N | The maximum number of retries to process the message before returning an error. Defaults to `"0"` which means the component will not retry processing the message. `"-1"` will retry indefinitely until the message is processed or the application is shutdown. Any positive number is treated as the maximum retry count. | `"3"` |
| backOffRandomizationFactor | N | Randomization factor, between 1 and 0, including 0 but not 1. Randomized interval = RetryInterval * (1 ± backOffRandomizationFactor). Defaults to `"0.5"`. | `"0.5"` |
| backOffMultiplier | N | Backoff multiplier for the policy. Increments the interval by multiplying it with the multiplier. Defaults to `"1.5"` | `"1.5"` |
| backOffMaxElapsedTime | N | After MaxElapsedTime the ExponentialBackOff returns Stop. There are two valid formats, one is the fraction with a unit suffix format, and the other is the pure digital format that is processed as milliseconds. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". Defaults to `"15m"` | `"15m"` |
| persistent | N | Pulsar supports two kinds of topics: [persistent](https://pulsar.apache.org/docs/en/concepts-architecture-overview#persistent-storage) and [non-persistent](https://pulsar.apache.org/docs/en/concepts-messaging/#non-persistent-topics). With persistent topics, all messages are durably persisted on disks (if the broker is not standalone, messages are durably persisted on multiple disks), whereas data for non-persistent topics is not persisted to storage disks.
| disableBatching | N | disable batching.When batching enabled default batch delay is set to 10 ms and default batch size is 1000 messages,Setting `disableBatching: true` will make the producer to send messages individually. Default: `"false"` | `"true"`, `"false"`|
| batchingMaxPublishDelay | N | batchingMaxPublishDelay set the time period within which the messages sent will be batched,if batch messages are enabled. If set to a non zero value, messages will be queued until this time interval or batchingMaxMessages (see below) or batchingMaxSize (see below). There are two valid formats, one is the fraction with a unit suffix format, and the other is the pure digital format that is processed as milliseconds. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". Default: `"10ms"` | `"10ms"`, `"10"`|
| batchingMaxMessages | N | batchingMaxMessages set the maximum number of messages permitted in a batch.If set to a value greater than 1, messages will be queued until this threshold is reached or batchingMaxSize (see below) has been reached or the batch interval has elapsed. Default: `"1000"` | `"1000"`|
| batchingMaxSize | N | batchingMaxSize sets the maximum number of bytes permitted in a batch. If set to a value greater than 1, messages will be queued until this threshold is reached or batchingMaxMessages (see above) has been reached or the batch interval has elapsed. Default: `"128KB"` | `"131072"`|

### Enabling message delivery retries

The Pulsar pub/sub component has no built-in support for retry strategies. This means that sidecar sends a message to the service only once and is not retried in case of failures. To make Dapr use more spohisticated retry policies, you can apply a [retry resiliency policy]({{< ref "policies.md#retries" >}}) to the MQTT pub/sub component. Note that it will be the same Dapr sidecar retrying the redelivery the message to the same app instance and not other instances.

### Delay queue

When invoking the Pulsar pub/sub, it's possible to provide an optional delay queue by using the `metadata` query parameters in the request url.
Expand Down