Skip to content

Commit

Permalink
The Process extension method will hang when called with EnsureOrder…
Browse files Browse the repository at this point in the history
…edAcknowledgment set to true, a shared subscription and MaxDegreeOfParallelism above 1.

It now throws a `ProcessingException` when EnsureOrderedAcknowledgment is set to true and with a shared subscription type.
  • Loading branch information
blankensteiner committed Jun 17, 2024
1 parent d0d8e45 commit ed51103
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 4 deletions.
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)

### Fixed

- Fixed race condition in `Producer` between `Send(...)` and `DisposeAsync()` dispose causing an unintended
`DivideByZeroException`. It now correctly throws a `ProducerClosedException`
- Fixed race condition in `Producer` between `Send(...)` and `DisposeAsync()` dispose causing an unintended `DivideByZeroException`.
It now throws a `ProducerClosedException`.
- The `Process` extension method will hang when called with EnsureOrderedAcknowledgment set to true, a shared subscription and MaxDegreeOfParallelism above 1.
It now throws a `ProcessingException` when EnsureOrderedAcknowledgment is set to true and with a shared subscription type.

## [3.3.0] - 2024-06-10

Expand Down
5 changes: 5 additions & 0 deletions src/DotPulsar/Abstractions/IConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public interface IConsumer : IGetLastMessageIds, ISeek, IState<ConsumerState>, I
/// </summary>
public string SubscriptionName { get; }

/// <summary>
/// The consumer's subscription type.
/// </summary>
public SubscriptionType SubscriptionType { get; }

/// <summary>
/// The consumer's topic.
/// </summary>
Expand Down
23 changes: 23 additions & 0 deletions src/DotPulsar/Exceptions/ProcessingException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace DotPulsar.Exceptions;

/// <summary>
/// There was an issue while setting up for processing
/// </summary>
public sealed class ProcessingException : DotPulsarException
{
public ProcessingException(string message) : base(message) { }
}
4 changes: 3 additions & 1 deletion src/DotPulsar/Internal/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>

public Uri ServiceUrl { get; }
public string SubscriptionName { get; }
public SubscriptionType SubscriptionType { get; }
public string Topic { get; }

public Consumer(
Expand All @@ -55,6 +56,7 @@ public Consumer(
_state = CreateStateManager();
ServiceUrl = serviceUrl;
SubscriptionName = consumerOptions.SubscriptionName;
SubscriptionType = consumerOptions.SubscriptionType;
Topic = consumerOptions.Topic;
_receiveTasks = Array.Empty<Task<IMessage<TMessage>>>();
_cts = new CancellationTokenSource();
Expand Down Expand Up @@ -411,7 +413,7 @@ private SubConsumer<TMessage> CreateSubConsumer(string topic)
var stateManager = CreateStateManager();
var initialChannel = new NotReadyChannel<TMessage>();
var executor = new Executor(correlationId, _processManager, _exceptionHandler);
var subConsumer = new SubConsumer<TMessage>(correlationId, ServiceUrl, _consumerOptions.SubscriptionName, topic, _processManager, initialChannel, executor, stateManager, consumerChannelFactory);
var subConsumer = new SubConsumer<TMessage>(correlationId, ServiceUrl, _consumerOptions.SubscriptionName, _consumerOptions.SubscriptionType, topic, _processManager, initialChannel, executor, stateManager, consumerChannelFactory);
var process = new ConsumerProcess(correlationId, stateManager, subConsumer, _consumerOptions.SubscriptionType == SubscriptionType.Failover);
_processManager.Add(process);
process.Start();
Expand Down
11 changes: 10 additions & 1 deletion src/DotPulsar/Internal/MessageProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
namespace DotPulsar.Internal;

using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
using DotPulsar.Internal.Extensions;
using Microsoft.Extensions.ObjectPool;
using System.Collections.Concurrent;
Expand All @@ -39,8 +40,16 @@ public sealed class MessageProcessor<TMessage> : IDisposable
private readonly int _maxMessagesPerTask;
private readonly TaskScheduler _taskScheduler;

public MessageProcessor(IConsumer<TMessage> consumer, Func<IMessage<TMessage>, CancellationToken, ValueTask> processor, ProcessingOptions options)
public MessageProcessor(
IConsumer<TMessage> consumer,
Func<IMessage<TMessage>, CancellationToken, ValueTask> processor,
ProcessingOptions options)
{
if (options.EnsureOrderedAcknowledgment &&
(consumer.SubscriptionType == SubscriptionType.Shared ||
consumer.SubscriptionType == SubscriptionType.KeyShared))
throw new ProcessingException("Ordered acknowledgment can not be ensuring with shared subscription types");

const string operation = "process";
_operationName = $"{consumer.Topic} {operation}";

Expand Down
3 changes: 3 additions & 0 deletions src/DotPulsar/Internal/SubConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ public sealed class SubConsumer<TMessage> : IConsumer<TMessage>, IContainsChanne

public Uri ServiceUrl { get; }
public string SubscriptionName { get; }
public SubscriptionType SubscriptionType { get; }
public string Topic { get; }

public SubConsumer(
Guid correlationId,
Uri serviceUrl,
string subscriptionName,
SubscriptionType subscriptionType,
string topic,
IRegisterEvent eventRegister,
IConsumerChannel<TMessage> initialChannel,
Expand All @@ -52,6 +54,7 @@ public SubConsumer(
_correlationId = correlationId;
ServiceUrl = serviceUrl;
SubscriptionName = subscriptionName;
SubscriptionType = subscriptionType;
Topic = topic;
_eventRegister = eventRegister;
_channel = initialChannel;
Expand Down

0 comments on commit ed51103

Please sign in to comment.