Skip to content

Commit

Permalink
NCBC-3393: OperationTaskException may continue after rebalance completes
Browse files Browse the repository at this point in the history
Motivation
----------
Fixes a bug where the SDK incorrectly assumes it has the latest config
if there is a failure processing the cluster map. Note that if another
config was received and successfully processed, things would progress
successfully.

Modifications
-------------
 - Ensure OperationTaskException's are turned into UnambiguousTimeoutExceptions
if they never make it onto the wire. This can happen if a circuit breaker is
tripped.
 - Ensure that exceptions that happen in the ClusterContext are thrown and
handled by the calling code within the CouchbaseBucket
 - Fix unit tests

Change-Id: I245411d924e46c94f4e17c2c79a662354e891bd5
Reviewed-on: https://review.couchbase.org/c/couchbase-net-client/+/191786
Reviewed-by: Richard Ponton <richard.ponton@couchbase.com>
Tested-by: Build Bot <build@couchbase.com>
  • Loading branch information
jeffrymorris committed Jun 1, 2023
1 parent 57b3400 commit 7f3bf0f
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 9 deletions.
1 change: 1 addition & 0 deletions src/Couchbase/Core/ClusterContext.cs
Expand Up @@ -813,6 +813,7 @@ public async Task ProcessClusterMapAsync(BucketBase bucket, BucketConfig config)
catch (Exception e)
{
_logger.LogError(LoggingEvents.ConfigEvent, "ERROR:{e}", e);
throw;
}
finally
{
Expand Down
14 changes: 9 additions & 5 deletions src/Couchbase/Core/IO/Operations/OperationBase.cs
Expand Up @@ -39,7 +39,6 @@ internal abstract class OperationBase : IOperation, IValueTaskSource<ResponseSta
private bool _isOrphaned;
private volatile string? _lastDispatchedFrom;
private volatile string? _lastDispatchedTo;
private long _totalExpiredTime;

protected OperationBase()
{
Expand Down Expand Up @@ -203,7 +202,11 @@ public CancellationToken Token
/// </summary>
public ObjectPool<OperationBuilder> OperationBuilderPool { get; set; } = null!; // Assumes we always initialize with OperationConfigurator

public TimeSpan Elapsed => TimeSpan.FromMilliseconds(_totalExpiredTime);
public TimeSpan Elapsed
{
get;
private set;
} = TimeSpan.Zero;

#endregion

Expand Down Expand Up @@ -688,9 +691,6 @@ public void HandleOperationCompleted(in SlicedMemoryOwner<byte> data)
{
//for measuring latency using an LoggingMeter or similar.
StopRecording();

//Since an operation may be retried, we want to add to the total elapsed time.
_totalExpiredTime = Interlocked.Add(ref _totalExpiredTime, _stopwatch.ElapsedMilliseconds);
}
}

Expand All @@ -707,6 +707,7 @@ public bool WasNmvb()
public string? LastDispatchedTo => _lastDispatchedTo;

public string? LastErrorMessage { get; set; }

public virtual bool CanStream => false;

public bool IsCompleted => _isCompleted == 1;
Expand All @@ -719,6 +720,9 @@ public bool WasNmvb()
public void StopRecording()
{
_stopwatch.Stop();

//Since an operation may be retried, we want to add to the total elapsed time.
Elapsed = Elapsed.Add(_stopwatch.Elapsed);
MetricTracker.KeyValue.TrackOperation(OpCode, _stopwatch.Elapsed);
}
#endregion
Expand Down
5 changes: 5 additions & 0 deletions src/Couchbase/Core/Retry/IRequest.cs
Expand Up @@ -14,6 +14,11 @@ public interface IRequest
List<RetryReason> RetryReasons { get; set; }
IRetryStrategy RetryStrategy { get; set; }
TimeSpan Timeout { get; set; }

/// <summary>
/// The total time expired at the time the operation is called. If another retry happens,
/// it will be updated once the response is received.
/// </summary>
TimeSpan Elapsed { get; }
CancellationToken Token
{
Expand Down
@@ -1,4 +1,5 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Couchbase.Core;
using Couchbase.Core.Bootstrapping;
Expand Down Expand Up @@ -512,11 +513,24 @@ public async Task Test_HasConfigChanges()

CouchbaseBucket CreateBucket(BucketConfig bootstrapConfig)
{
var clusterNodeFactory = new Mock<IClusterNodeFactory>();
var node = new Mock<IClusterNode>();
node.Setup(x => x.KeyEndPoints).Returns(new List<HostEndpointWithPort>
{ new("127.0.0.1", 11210) });

clusterNodeFactory.Setup(x => x.CreateAndConnectAsync(
It.IsAny<HostEndpointWithPort>(), It.IsAny<NodeAdapter>(),
It.IsAny<CancellationToken>())).
Returns(Task.FromResult(node.Object));

var options = new ClusterOptions().AddClusterService(clusterNodeFactory.Object);
var clusterCtx = new ClusterContext(new CancellationTokenSource(), options)
{
SupportsCollections = true
};

var bucket = new CouchbaseBucket("default",
new ClusterContext
{
SupportsCollections = true
},
clusterCtx,
new Mock<IScopeFactory>().Object,
new Mock<IRetryOrchestrator>().Object,
new Mock<IVBucketKeyMapperFactory>().Object,
Expand All @@ -528,6 +542,9 @@ CouchbaseBucket CreateBucket(BucketConfig bootstrapConfig)
new BestEffortRetryStrategy(),
bootstrapConfig);

node.Setup(x => x.Owner).Returns(bucket);


return bucket;
}
}
Expand Up @@ -40,6 +40,7 @@ public CancellationToken Token
public CancellationTokenPair TokenPair { get; set; }
public string? ClientContextId { get; set; }
public string? Statement { get; set; }

public bool PreserveTtl { get; }
public OpCode OpCode { get; }
public string? BucketName { get; }
Expand Down
1 change: 1 addition & 0 deletions tests/Couchbase.UnitTests/Utils/FakeOperation.cs
Expand Up @@ -64,6 +64,7 @@ public CancellationToken Token
public CancellationTokenPair TokenPair { get; set; }
public string ClientContextId { get; set; }
public string Statement { get; set; }

public bool PreserveTtl { get; }
public OpCode OpCode { get; }
public string Key { get; }
Expand Down

0 comments on commit 7f3bf0f

Please sign in to comment.