Skip to content

Commit

Permalink
feat: exactly once delivery (feature complete)
Browse files Browse the repository at this point in the history
  • Loading branch information
Rishabh-V committed Aug 17, 2022
1 parent a750fad commit e93c4d3
Show file tree
Hide file tree
Showing 7 changed files with 571 additions and 71 deletions.

Large diffs are not rendered by default.

39 changes: 34 additions & 5 deletions apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/AckError.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,28 @@ internal sealed class AckError
/// </summary>
internal IEnumerable<KeyValuePair<string, string>> PermanentFailures { get; }

/// <summary>
/// Gets the AckIds/ExtendIds/NackIds that are successful.
/// This collection will be populated with AckIds/ExtendIds/NackIds only if they are successful,
/// else this collection will be empty (but not null).
/// </summary>
internal IEnumerable<string> SuccessfulIds => MessageIds.Except(PermanentFailureIds).Except(TemporaryFailureIds);

/// <summary>
/// Gets all the AckIds/ExtendIds/NackIds for which the response encountered a <see cref="RpcException"/>.
/// This collection is always populated with AckIds/ExtendIds/NackIds and is never null.
/// </summary>
internal IEnumerable<string> MessageIds { get; }

/// <summary>
/// Initializes a new instance of the <see cref="AckError"/> class.
/// </summary>
/// <param name="messageIds">The collection of AckId/NackId/ExtendId for which response is being processed.</param>
/// <param name="temporaryFailures">The collection of temporary failed AckId/NackId/ExtendId as key and their corresponding failure reason as value.</param>
/// <param name="permanentFailures">The collection of permanent failed AckId/NackId/ExtendId as key and their corresponding failure reason as value.</param>
internal AckError(IEnumerable<KeyValuePair<string, string>> temporaryFailures, IEnumerable<KeyValuePair<string, string>> permanentFailures)
internal AckError(IEnumerable<string> messageIds, IEnumerable<KeyValuePair<string, string>> temporaryFailures, IEnumerable<KeyValuePair<string, string>> permanentFailures)
{
MessageIds = messageIds;
TemporaryFailures = temporaryFailures;
PermanentFailures = permanentFailures;
}
Expand All @@ -81,9 +96,23 @@ internal AckError(IEnumerable<KeyValuePair<string, string>> temporaryFailures, I
/// <returns>The sequence of <see cref="AcknowledgementException"/> if there are permanent failures, empty otherwise.</returns>
internal IEnumerable<AcknowledgementException> GetPermanentExceptions() =>
HasPermanentFailures
? PermanentFailures.Select(AcknowledgementException.FromIdErrorPair).ToList()
? PermanentFailures.Select(AcknowledgementException.FromIdPermanentErrorPair).ToList()
: Enumerable.Empty<AcknowledgementException>();

/// <summary>
/// Gets the sequence of <see cref="AckNackResponse"/>
/// from the current instance of <seealso cref="AckError"/>.
/// It returns the AckNackResponse of only the successful or permanently failed message IDs.
/// </summary>
/// <remarks>
/// This method will always return a sequence of <see cref="AckNackResponse"/>. This sequence will be empty (but not null) if there are no successful responses or permanent failures
/// in the current instance of <seealso cref="AckError"/>.
/// </remarks>
/// <returns>The sequence of <see cref="AckNackResponse"/> if there are successful responses or permanent failures, empty otherwise.</returns>
internal IEnumerable<AckNackResponse> GetAckNackResponses() =>
PermanentFailures.Select(AckNackResponse.FromIdPermanentErrorPair)
.Concat(SuccessfulIds.Select(AckNackResponse.FromSuccessfulId));

/// <summary>
/// Gets the <see cref="AckError"/> from the <see cref="RpcException"/> while calling Acknowledge or ModifyAckDeadline RPC.
/// </summary>
Expand All @@ -100,7 +129,7 @@ internal static AckError ForRpcException(RpcException rpcException, IEnumerable<
if (retryAll)
{
// Return all ids in temporary failures and empty permanent failures, so that the caller can retry the whole temporary batch.
return new AckError(ids.ToDictionary(key => key, value => rpcException.Status.StatusCode.ToString()), Enumerable.Empty<KeyValuePair<string, string>>());
return new AckError(ids, ids.ToDictionary(key => key, value => rpcException.Status.StatusCode.ToString()), Enumerable.Empty<KeyValuePair<string, string>>());
}

// We can have complete or partial failure. We classify the error as temporary or permanent
Expand All @@ -121,13 +150,13 @@ internal static AckError ForRpcException(RpcException rpcException, IEnumerable<
if (temporaryErrors.Any() || permanentErrors.Any())
{
// If temporary or permanent error messages are present, it means the entire request hasn't failed with a permanent error.
return new AckError(temporaryErrors, permanentErrors);
return new AckError(ids, temporaryErrors, permanentErrors);
}
else
{
// Now, if temporary or permanent error messages are *not* present, it means the entire request has failed with a permanent error.
// It may be due to a gRPC error like a failed precondition or a permission denied error or other permanent error.
return new AckError(Enumerable.Empty<KeyValuePair<string, string>>(), ids.ToDictionary(key => key, value => rpcException.Status.StatusCode.ToString()));
return new AckError(ids, temporaryErrors, ids.ToDictionary(key => key, value => rpcException.Status.StatusCode.ToString()));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License").
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System.Collections.Generic;

namespace Google.Cloud.PubSub.V1;

/// <summary>
/// A public representation of a response to an ACKnowledge (ACK) or Not ACKnowledge (NACK) request.
/// </summary>
public sealed class AckNackResponse
{
/// <summary>
/// The ID of the message being ACK-ed or NACK-ed.
/// </summary>
public string MessageId { get; }

/// <summary>
/// The status of the ACK/NACK response.
/// </summary>
public AcknowledgementStatus Status { get; }

/// <summary>
/// The failure message, if any. This will be null if the response is successful.
/// </summary>
public string FailureMessage { get; }

/// <summary>
/// Initializes a new instance of the <see cref="AckNackResponse"/> class.
/// </summary>
/// <param name="messageId">The ID of the message being ACK-ed/NACK-ed.</param>
/// <param name="status">The status of ACK/NACK response.</param>
/// <param name="failureMessage">The failure message. This can be null if ACK/NACK is successful.</param>
public AckNackResponse(string messageId, AcknowledgementStatus status, string failureMessage)
{
MessageId = messageId;
Status = status;
FailureMessage = failureMessage;
}

/// <summary>
/// Initializes a new instance of the <see cref="AckNackResponse"/> class from the pair of message ID and its corresponding permanent failure.
/// </summary>
/// <param name="idPermanentErrorPair">The <see cref="KeyValuePair{TKey, TValue}"/> of message ID as key and permanent error message as value.</param>
/// <returns>A new instance of the <see cref="AckNackResponse"/>.</returns>
internal static AckNackResponse FromIdPermanentErrorPair(KeyValuePair<string, string> idPermanentErrorPair)
{
var acknowledgementException = AcknowledgementException.FromIdPermanentErrorPair(idPermanentErrorPair);
return new AckNackResponse(idPermanentErrorPair.Key, acknowledgementException.Status, acknowledgementException.Message);
}

/// <summary>
/// Initializes a new instance of the <see cref="AckNackResponse"/> class from the message ID that is successfully ACK-ed/NACK-ed.
/// </summary>
/// <param name="successfulId">The successful message ID.</param>
/// <returns>A new instance of the <see cref="AckNackResponse"/>.</returns>
internal static AckNackResponse FromSuccessfulId(string successfulId) =>
new AckNackResponse(successfulId, AcknowledgementStatus.Success, default);
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using Grpc.Core;
using System;
using System.Collections.Generic;

Expand All @@ -36,26 +37,34 @@ public sealed class AcknowledgementException : Exception
/// </summary>
public AcknowledgementStatus Status { get; }

/// <summary>
/// Gets the message ID associated with this instance of <see cref="AcknowledgementException"/>.
/// </summary>
public string MessageId { get; }

/// <summary>
/// Initializes a new instance of the <see cref="AcknowledgementException"/> class.
/// </summary>
/// <param name="status">The <see cref="AcknowledgementStatus"/> for the exception.</param>
/// <param name="message">A descriptive message about the exception.</param>
public AcknowledgementException(AcknowledgementStatus status, string message) : base($"{status}: {message}: {GetErrorMessageFromStatus(status)}") =>
/// <param name="messageId">The message ID associated with this instance of exception.</param>
public AcknowledgementException(AcknowledgementStatus status, string messageId) : base($"{status}: {messageId}: {GetErrorMessageFromStatus(status)}")
{
Status = status;
MessageId = messageId;
}

/// <summary>
/// Gets the <see cref="AcknowledgementException"/> from the ackId and permanent error message pair.
/// Gets the <see cref="AcknowledgementException"/> from the ID and permanent error message pair.
/// </summary>
/// <param name="idErrorPair">The <see cref="KeyValuePair{TKey, TValue}"/> of ackId as key and permanent error message as value.</param>
/// <param name="idPermanentErrorPair">The <see cref="KeyValuePair{TKey, TValue}"/> of ID as key and permanent error message as value.</param>
/// <returns>The <see cref="AcknowledgementException"/>.</returns>
internal static AcknowledgementException FromIdErrorPair(KeyValuePair<string, string> idErrorPair) => idErrorPair.Value switch
internal static AcknowledgementException FromIdPermanentErrorPair(KeyValuePair<string, string> idPermanentErrorPair) => idPermanentErrorPair.Value switch
{
// TODO: We have an enum and string in switch. Check if we can improve it.
"PERMANENT_FAILURE_INVALID_ACK_ID" => new AcknowledgementException(AcknowledgementStatus.InvalidAckId, idErrorPair.Key),
nameof(AcknowledgementStatus.FailedPrecondition) => new AcknowledgementException(AcknowledgementStatus.FailedPrecondition, idErrorPair.Key),
nameof(AcknowledgementStatus.PermissionDenied) => new AcknowledgementException(AcknowledgementStatus.PermissionDenied, idErrorPair.Key),
_ => new AcknowledgementException(AcknowledgementStatus.Other, idErrorPair.Key),
"PERMANENT_FAILURE_INVALID_ACK_ID" => new AcknowledgementException(AcknowledgementStatus.InvalidAckId, idPermanentErrorPair.Key),
nameof(StatusCode.FailedPrecondition) => new AcknowledgementException(AcknowledgementStatus.FailedPrecondition, idPermanentErrorPair.Key),
nameof(StatusCode.PermissionDenied) => new AcknowledgementException(AcknowledgementStatus.PermissionDenied, idPermanentErrorPair.Key),
_ => new AcknowledgementException(AcknowledgementStatus.Other, idPermanentErrorPair.Key),
};

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License").
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.Threading;
using System.Threading.Tasks;
using static Google.Cloud.PubSub.V1.SubscriberClient;

namespace Google.Cloud.PubSub.V1;

/// <summary>
/// This class is a simple implementation of <see cref="SubscriptionHandler"/> using a delegate.
/// This is present to provide compatibility with the older <see cref="SubscriberClient.StartAsync(Func{PubsubMessage, CancellationToken, Task{Reply}})"/> implementation.
/// </summary>
internal sealed class SimpleSubscriptionHandler : SubscriptionHandler
{
private readonly Func<PubsubMessage, CancellationToken, Task<Reply>> _handler;

/// <summary>
/// Initializes a new instance of the <see cref="SimpleSubscriptionHandler"/> class.
/// </summary>
/// <param name="handler">The handler delegate that is passed all the received messages.</param>
internal SimpleSubscriptionHandler(Func<PubsubMessage, CancellationToken, Task<Reply>> handler) =>
_handler = handler;

/// <inheritdoc/>
public override Task<Reply> HandleMessage(PubsubMessage message, CancellationToken cancellationToken) =>
_handler(message, cancellationToken);
}
Loading

0 comments on commit e93c4d3

Please sign in to comment.