Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: #46 support nack delay and ack timeouts #83

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/DotPulsar/Abstractions/IConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

namespace DotPulsar.Abstractions
{
using DotPulsar.Internal.PulsarApi;
using System;
using System.Collections.Generic;
using System.Threading;
Expand Down Expand Up @@ -59,9 +60,20 @@ public interface IConsumer : IGetLastMessageId, ISeek, IState<ConsumerState>, IA
/// </summary>
ValueTask RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds, CancellationToken cancellationToken = default);

/// <summary>
/// Redeliver the pending messages that were pushed to this consumer that are not yet acknowledged.
/// </summary>
ValueTask RedeliverUnacknowledgedMessages(IEnumerable<MessageIdData> messageIds, CancellationToken cancellationToken);

/// <summary>
/// Redeliver all pending messages that were pushed to this consumer that are not yet acknowledged.
/// </summary>
ValueTask RedeliverUnacknowledgedMessages(CancellationToken cancellationToken = default);

/// <summary>
/// Acknowledge the failure to consume a single message using the MessageId.
/// When a message is "negatively acked" it will be marked for redelivery after some fixed delay.
/// </summary>
void NegativeAcknowledge(MessageId messageId);
}
}
13 changes: 13 additions & 0 deletions src/DotPulsar/Abstractions/IConsumerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

namespace DotPulsar.Abstractions
{
using System;

/// <summary>
/// A consumer building abstraction.
/// </summary>
Expand Down Expand Up @@ -59,6 +61,17 @@ public interface IConsumerBuilder<TMessage>
/// </summary>
IConsumerBuilder<TMessage> SubscriptionType(SubscriptionType type);

/// <summary>
/// Timeout of unacked messages
/// </summary>
IConsumerBuilder<TMessage> AcknowledgementTimeout(TimeSpan timeout);

/// <summary>
/// Delay to wait before redelivering messages that failed to be processed.
/// When an application uses IConsumer.NegativeAcknowledge(Message), failed messages are redelivered after a fixed timeout.
/// </summary>
IConsumerBuilder<TMessage> NegativeAcknowledgementRedeliveryDelay(TimeSpan timeout);

/// <summary>
/// Set the topic for this consumer. This is required.
/// </summary>
Expand Down
12 changes: 12 additions & 0 deletions src/DotPulsar/ConsumerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
namespace DotPulsar
{
using DotPulsar.Abstractions;
using System;

/// <summary>
/// The consumer building options.
Expand Down Expand Up @@ -110,5 +111,16 @@ public ConsumerOptions(string subscriptionName, string topic, ISchema<TMessage>
/// Set the topic for this consumer. This is required.
/// </summary>
public string Topic { get; set; }

/// <summary>
/// Delay to wait before redelivering messages that failed to be processed.
/// When an application uses IConsumer.NegativeAcknowledge(Message), failed messages are redelivered after a fixed timeout.
/// </summary>
public TimeSpan NegativeAcknowledgementRedeliveryDelay { get; set; }

/// <summary>
/// Timeout of unacked messages
/// </summary>
public TimeSpan AcknowledgementTimeout { get; set; }
}
}
22 changes: 22 additions & 0 deletions src/DotPulsar/Internal/Abstractions/IAsyncQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace DotPulsar.Internal.Abstractions
{
using System;

public interface IAsyncQueue<T> : IEnqueue<T>, IDequeue<T>, IDisposable
{
}
}
28 changes: 28 additions & 0 deletions src/DotPulsar/Internal/Abstractions/IBatchHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace DotPulsar.Internal.Abstractions
{
using DotPulsar.Abstractions;
using PulsarApi;
using System.Buffers;

public interface IBatchHandler<TMessage>
{
IMessage<TMessage> Add(MessageIdData messageId, uint redeliveryCount, MessageMetadata metadata, ReadOnlySequence<byte> data);
IMessage<TMessage>? GetNext();
void Clear();
MessageIdData? Acknowledge(MessageIdData messageId);
}
}
2 changes: 2 additions & 0 deletions src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ namespace DotPulsar.Internal.Abstractions
using System.Threading;
using System.Threading.Tasks;


public interface IConsumerChannel<TMessage> : IAsyncDisposable
{
Task Send(CommandAck command, CancellationToken cancellationToken);
Expand All @@ -29,5 +30,6 @@ public interface IConsumerChannel<TMessage> : IAsyncDisposable
Task<MessageId> Send(CommandGetLastMessageId command, CancellationToken cancellationToken);
ValueTask<IMessage<TMessage>> Receive(CancellationToken cancellationToken);
ValueTask ClosedByClient(CancellationToken cancellationToken);
void NegativeAcknowledge(MessageIdData messageIdData);
}
}
26 changes: 26 additions & 0 deletions src/DotPulsar/Internal/Abstractions/IMessageQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace DotPulsar.Internal.Abstractions
{
using DotPulsar.Internal.PulsarApi;
using System;

public interface IMessageQueue : IDequeue<MessagePackage>, IDisposable
{
void Acknowledge(MessageIdData messageId);

void NegativeAcknowledge(MessageIdData messageId);
}
}
33 changes: 33 additions & 0 deletions src/DotPulsar/Internal/Abstractions/IMessageTracker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace DotPulsar.Internal.Abstractions
{
using DotPulsar.Abstractions;
using PulsarApi;
using System;
using System.Threading;
using System.Threading.Tasks;

public interface IMessageTracker : IDisposable
{
Task Start(IConsumer consumer, CancellationToken cancellationToken = default);

void Track(MessageIdData messageId);

void Acknowledge(MessageIdData messageId);

void NegativeAcknowledge(MessageIdData messageId);
}
}
26 changes: 26 additions & 0 deletions src/DotPulsar/Internal/Abstractions/INegativeackedMessageState.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace DotPulsar.Internal.Abstractions
{
using DotPulsar.Internal.PulsarApi;
using System.Collections.Generic;

public interface INegativeackedMessageState
{
void Add(MessageIdData messageId);

IEnumerable<MessageIdData> GetMessagesForRedelivery();
}
}
30 changes: 30 additions & 0 deletions src/DotPulsar/Internal/Abstractions/IUnackedMessageState.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace DotPulsar.Internal.Abstractions
{
using DotPulsar.Internal.PulsarApi;
using System.Collections.Generic;

public interface IUnackedMessageState
{
void Add(MessageIdData messageId);

void Remove(MessageIdData messageId);

void Acknowledge(MessageIdData messageId);

IEnumerable<MessageIdData> CheckUnackedMessages();
}
}
3 changes: 1 addition & 2 deletions src/DotPulsar/Internal/AsyncQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@ namespace DotPulsar.Internal
{
using Abstractions;
using Exceptions;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

public sealed class AsyncQueue<T> : IEnqueue<T>, IDequeue<T>, IDisposable
public sealed class AsyncQueue<T> : IAsyncQueue<T>
{
private readonly object _lock;
private readonly Queue<T> _queue;
Expand Down
35 changes: 35 additions & 0 deletions src/DotPulsar/Internal/AwaitingAck.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace DotPulsar.Internal
{
using PulsarApi;
using System;
using System.Diagnostics;

public readonly struct AwaitingAck
{
public MessageIdData MessageId { get; }
public long Timestamp { get; }

public AwaitingAck(MessageIdData messageId)
{
MessageId = messageId;
Timestamp = Stopwatch.GetTimestamp();
}

public TimeSpan Elapsed => TimeSpan.FromTicks(
(long) ((Stopwatch.GetTimestamp() - Timestamp) / (double) Stopwatch.Frequency * TimeSpan.TicksPerSecond));
}
}
2 changes: 1 addition & 1 deletion src/DotPulsar/Internal/BatchHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ namespace DotPulsar.Internal
using System.Collections;
using System.Collections.Generic;

public sealed class BatchHandler<TMessage>
public sealed class BatchHandler<TMessage> : IBatchHandler<TMessage>
{
private readonly object _lock;
private readonly bool _trackBatches;
Expand Down
13 changes: 10 additions & 3 deletions src/DotPulsar/Internal/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,19 @@ public async ValueTask Acknowledge(MessageId messageId, CancellationToken cancel
public async ValueTask AcknowledgeCumulative(MessageId messageId, CancellationToken cancellationToken)
=> await Acknowledge(messageId, CommandAck.AckType.Cumulative, cancellationToken).ConfigureAwait(false);

public async ValueTask RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds, CancellationToken cancellationToken)

public async ValueTask RedeliverUnacknowledgedMessages(IEnumerable<MessageIdData> messageIds, CancellationToken cancellationToken)
{
ThrowIfDisposed();

var command = new CommandRedeliverUnacknowledgedMessages();
command.MessageIds.AddRange(messageIds.Select(messageId => messageId.ToMessageIdData()));
command.MessageIds.AddRange(messageIds);
await _executor.Execute(() => RedeliverUnacknowledgedMessages(command, cancellationToken), cancellationToken).ConfigureAwait(false);
}

public async ValueTask RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds, CancellationToken cancellationToken)
=> await RedeliverUnacknowledgedMessages(messageIds.Select(m => m.ToMessageIdData()), cancellationToken).ConfigureAwait(false);

public async ValueTask RedeliverUnacknowledgedMessages(CancellationToken cancellationToken)
=> await RedeliverUnacknowledgedMessages(Enumerable.Empty<MessageId>(), cancellationToken).ConfigureAwait(false);

Expand All @@ -126,8 +130,11 @@ public async ValueTask Unsubscribe(CancellationToken cancellationToken)
await _executor.Execute(() => Unsubscribe(unsubscribe, cancellationToken), cancellationToken).ConfigureAwait(false);
}

public void NegativeAcknowledge(MessageId messageId) =>
_channel.NegativeAcknowledge(messageId.ToMessageIdData());

private async ValueTask Unsubscribe(CommandUnsubscribe command, CancellationToken cancellationToken)
=>await _channel.Send(command, cancellationToken).ConfigureAwait(false);
=> await _channel.Send(command, cancellationToken).ConfigureAwait(false);

public async ValueTask Seek(MessageId messageId, CancellationToken cancellationToken)
{
Expand Down