Skip to content

Commit

Permalink
Update docs of CustomHeadersBuilder option.
Browse files Browse the repository at this point in the history
  • Loading branch information
yang-xiaodong committed Apr 22, 2024
1 parent 27ae79a commit 4c2feeb
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 52 deletions.
13 changes: 6 additions & 7 deletions docs/content/user-guide/en/transport/azure-service-bus.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ The AzureServiceBus configuration options provided directly by the CAP:
| MaxAutoLockRenewalDuration | The maximum duration within which the lock will be renewed automatically. This value should be greater than the longest message lock duration. | TimeSpan | 5 minutes |
| ManagementTokenProvider | Token provider | ITokenProvider | null |
| AutoCompleteMessages | Gets a value that indicates whether the processor should automatically complete messages after the message handler has completed processing | bool | true |
| CustomHeaders | Adds custom and/or mandatory Headers for incoming messages from heterogeneous systems. | `Func<Message, List<KeyValuePair<string, string>>>?` | null |
| CustomHeadersBuilder | Adds custom and/or mandatory Headers for incoming messages from heterogeneous systems. | `Func<Message, IServiceProvider, List<KeyValuePair<string, string>>>?` | null |
| Namespace | Namespace of Servicebus , Needs to be set when using with TokenCredential Property | string | null |
| DefaultCorrelationHeaders | Adds additional correlation properties to all [correlation filters](https://learn.microsoft.com/en-us/azure/service-bus-messaging/topic-filters#correlation-filters). | IDictionary<string, string> | Dictionary<string, string>.Empty |
| SQLFilters | Custom SQL Filters by name and expression on Topic Subscribtion | List<KeyValuePair<string, string>> | null |
Expand Down Expand Up @@ -84,12 +84,11 @@ Sometimes you might want to listen to a message that was published by an externa
c.UseAzureServiceBus(asb =>
{
asb.ConnectionString = ...
asb.CustomHeaders = message => new List<KeyValuePair<string, string>>()
{
new(DotNetCore.CAP.Messages.Headers.MessageId,
SnowflakeId.Default().NextId().ToString()),
new(DotNetCore.CAP.Messages.Headers.MessageName, message.Label)
};
asb.CustomHeadersBuilder = (msg, sp) =>
[
new(DotNetCore.CAP.Messages.Headers.MessageId, sp.GetRequiredService<ISnowflakeId>().NextId().ToString()),
new(DotNetCore.CAP.Messages.Headers.MessageName, msg.RoutingKey)
];
});
```

Expand Down
6 changes: 3 additions & 3 deletions docs/content/user-guide/en/transport/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ NAME | DESCRIPTION | TYPE | DEFAULT
:---|:---|---|:---
Servers | Broker server address | string |
ConnectionPoolSize | connection pool size | int | 10
CustomHeaders | Custom subscribe headers | Func<> | N/A
CustomHeadersBuilder | Custom subscribe headers | Func<> | N/A

#### CustomHeaders Options
#### CustomHeadersBuilder Options

When the message sent from a heterogeneous system, because of the CAP needs to define additional headers, so an exception will occur at this time. By providing this parameter to set the custom headersn to make the subscriber works.

Expand All @@ -57,7 +57,7 @@ x.UseKafka(opt =>
{
//...
opt.CustomHeaders = kafkaResult => new List<KeyValuePair<string, string>>
opt.CustomHeadersBuilder = (kafkaResult,sp) => new List<KeyValuePair<string, string>>
{
new KeyValuePair<string, string>("my.kafka.offset", kafkaResult.Offset.ToString()),
new KeyValuePair<string, string>("my.kafka.partition", kafkaResult.Partition.ToString())
Expand Down
14 changes: 7 additions & 7 deletions docs/content/user-guide/en/transport/rabbitmq.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ Port | Port | int | -1
ExchangeName | Default exchange name | string | cap.default.topic
QueueArguments | Extra queue `x-arguments` | QueueArgumentsOptions | N/A
ConnectionFactoryOptions | RabbitMQClient native connection options | ConnectionFactory | N/A
CustomHeaders | Custom subscribe headers | See the blow | N/A
CustomHeadersBuilder | Custom subscribe headers | See the blow | N/A
PublishConfirms | Enable [publish confirms](https://www.rabbitmq.com/confirms.html#publisher-confirms) | bool | false
BasicQosOptions | Specify [Qos](https://www.rabbitmq.com/consumer-prefetch.html) of message prefetch | BasicQos | N/A

Expand All @@ -74,7 +74,7 @@ services.AddCap(x =>

```

#### CustomHeaders Option
#### CustomHeadersBuilder Option

When the message sent from the RabbitMQ management console or a heterogeneous system, because of the CAP needs to define additional headers, so an exception will occur at this time. By providing this parameter to set the custom headersn to make the subscriber works.

Expand All @@ -85,11 +85,11 @@ Example:
```cs
x.UseRabbitMQ(aa =>
{
aa.CustomHeaders = e => new List<KeyValuePair<string, string>>
{
new KeyValuePair<string, string>(Headers.MessageId, SnowflakeId.Default().NextId().ToString()),
new KeyValuePair<string, string>(Headers.MessageName, e.RoutingKey),
};
aa.CustomHeadersBuilder = (msg, sp) =>
[
new(DotNetCore.CAP.Messages.Headers.MessageId, sp.GetRequiredService<ISnowflakeId>().NextId().ToString()),
new(DotNetCore.CAP.Messages.Headers.MessageName, msg.RoutingKey)
];
});
```

Expand Down
13 changes: 6 additions & 7 deletions docs/content/user-guide/zh/transport/azure-service-bus.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ CAP 直接对外提供的 Azure Service Bus 配置参数如下:
| TopicPath | Topic entity path | string | cap |
| ManagementTokenProvider | Token provider | ITokenProvider | null |
| AutoCompleteMessages | 获取一个值,该值指示处理器是否应在消息处理程序完成处理后自动完成消息 | bool | false |
| CustomHeaders | 为来自异构系统的传入消息添加自定义头 | `Func<Message, List<KeyValuePair<string, string>>>?` | null |
| CustomHeadersBuilder | 为来自异构系统的传入消息添加自定义头 | `Func<Message, List<KeyValuePair<string, string>>>?` | null |
| Namespace | Servicebus 的命名空间,与 TokenCredential 属性一起使用时需要设置 | string | null |
| SQLFilters | 根据名称和表达式自定义 SQL 过滤器 | List<KeyValuePair<string, string>> | null |

Expand Down Expand Up @@ -79,12 +79,11 @@ capBus.Publish(yourEventName, yourEvent, extraHeaders);
c.UseAzureServiceBus(asb =>
{
asb.ConnectionString = ...
asb.CustomHeaders = message => new List<KeyValuePair<string, string>>()
{
new(DotNetCore.CAP.Messages.Headers.MessageId,
SnowflakeId.Default().NextId().ToString()),
new(DotNetCore.CAP.Messages.Headers.MessageName, message.Label)
};
asb.CustomHeadersBuilder = (msg, sp) =>
[
new(DotNetCore.CAP.Messages.Headers.MessageId, sp.GetRequiredService<ISnowflakeId>().NextId().ToString()),
new(DotNetCore.CAP.Messages.Headers.MessageName, msg.RoutingKey)
];
});
```

Expand Down
6 changes: 3 additions & 3 deletions docs/content/user-guide/zh/transport/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ NAME | DESCRIPTION | TYPE | DEFAULT
:---|:---|---|:---
Servers | Broker 地址 | string |
ConnectionPoolSize | 用户名 | int | 10
CustomHeaders | 设置自定义头 | Function |
CustomHeadersBuilder | 设置自定义头 | Function |

有关 `CustomHeaders` 的说明:
有关 `CustomHeadersBuilder` 的说明:

如果你想在消费消息的时候,通过从 `CapHeader` 获取 Kafka 中例如 Offset 或者 Partition 等信息,你可以通过自定义此函数来实现这一点。

Expand All @@ -54,7 +54,7 @@ x.UseKafka(opt =>
{
//...
opt.CustomHeaders = kafkaResult => new List<KeyValuePair<string, string>>
opt.CustomHeadersBuilder = (kafkaResult,sp) => new List<KeyValuePair<string, string>>
{
new KeyValuePair<string, string>("my.kafka.offset", kafkaResult.Offset.ToString()),
new KeyValuePair<string, string>("my.kafka.partition", kafkaResult.Partition.ToString())
Expand Down
14 changes: 7 additions & 7 deletions docs/content/user-guide/zh/transport/rabbitmq.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ Port | 端口号 | int | -1
ExchangeName | CAP默认Exchange名称 | string | cap.default.topic
QueueArguments | 队列额外参数 x-arguments | QueueArgumentsOptions | N/A
ConnectionFactoryOptions | RabbitMQClient原生参数 | ConnectionFactory | N/A
CustomHeaders | 订阅者自定义头信息 | 见下文 | N/A
CustomHeadersBuilder | 订阅者自定义头信息 | 见下文 | N/A
PublishConfirms | 是否启用[发布确认](https://www.rabbitmq.com/confirms.html#publisher-confirms) | bool | false
BasicQosOptions | 指定消费的[Qos](https://www.rabbitmq.com/consumer-prefetch.html) | BasicQos | N/A

Expand All @@ -75,7 +75,7 @@ services.AddCap(x =>

```

#### CustomHeaders Option
#### CustomHeadersBuilder Option

当需要从异构系统或者直接接收从RabbitMQ 控制台发送的消息时,由于 CAP 需要定义额外的头信息才能正常订阅,所以此时会出现异常。通过提供此参数来进行自定义头信息的设置来使订阅者正常工作。

Expand All @@ -86,11 +86,11 @@ services.AddCap(x =>
```cs
x.UseRabbitMQ(aa =>
{
aa.CustomHeaders = e => new List<KeyValuePair<string, string>>
{
new KeyValuePair<string, string>(Headers.MessageId, SnowflakeId.Default().NextId().ToString()),
new KeyValuePair<string, string>(Headers.MessageName, e.RoutingKey),
};
aa.CustomHeadersBuilder = (msg, sp) =>
[
new(DotNetCore.CAP.Messages.Headers.MessageId, sp.GetRequiredService<ISnowflakeId>().NextId().ToString()),
new(DotNetCore.CAP.Messages.Headers.MessageName, msg.RoutingKey)
];
});
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,24 +265,17 @@ private TransportMessage ConvertMessage(ServiceBusReceivedMessage message)

headers.Add(Headers.Group, _subscriptionName);

List<KeyValuePair<string, string>>? customHeaders = null;
#pragma warning disable CS0618 // Type or member is obsolete
if (_asbOptions.CustomHeaders != null) customHeaders = _asbOptions.CustomHeaders(message).ToList();
#pragma warning restore CS0618 // Type or member is obsolete

if (_asbOptions.CustomHeadersBuilder != null)
customHeaders = _asbOptions.CustomHeadersBuilder(message, _serviceProvider).ToList();

if (customHeaders?.Any() == true)
{
var customHeaders = _asbOptions.CustomHeadersBuilder(message, _serviceProvider);
foreach (var customHeader in customHeaders)
{
var added = headers.TryAdd(customHeader.Key, customHeader.Value);

if (!added)
_logger.LogWarning(
"Not possible to add the custom header {Header}. A value with the same key already exists in the Message headers.",
customHeader.Key);
_logger.LogWarning("Not possible to add the custom header {Header}. A value with the same key already exists in the Message headers.",customHeader.Key);
}
}

return new TransportMessage(headers, message.Body);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,6 @@ public class AzureServiceBusOptions
/// </summary>
public TokenCredential? TokenCredential { get; set; }

/// <summary>
/// Use this function to write additional headers from the original ASB Message or any Custom Header, i.e. to allow
/// compatibility with heterogeneous systems, into <see cref="CapHeader" />
/// </summary>
[Obsolete("Will be dropped in the next releases in favor of the CustomHeadersBuilder property")]
public Func<ServiceBusReceivedMessage, List<KeyValuePair<string, string>>>? CustomHeaders { get; set; }

/// <summary>
/// Use this function to write additional headers from the original ASB Message or any Custom Header, i.e. to allow
/// compatibility with heterogeneous systems, into <see cref="CapHeader" />
Expand Down

0 comments on commit 4c2feeb

Please sign in to comment.