Skip to content

Commit

Permalink
Upgrade Resiliency: Refactors GatewayAddressCache to Mark TransportAd…
Browse files Browse the repository at this point in the history
…dresses to Unhealthy when Connection Reset Event Occurs (#3768)

* Code changes to mark the transport uri to unhealthy for which a connection reset event occures through the connection state listener.

* Code changes to bump up the direct version.

* Code changes to clean up Gateway Address Cache.

* Code changes to fix pipeline build.

* Code changes to fix serilization test failures.

* Code changes to add force refresh to the gateway function callback delegate.
  • Loading branch information
kundadebdatta committed Mar 23, 2023
1 parent d0c0578 commit 4ab6293
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 62 deletions.
2 changes: 1 addition & 1 deletion Directory.Build.props
Expand Up @@ -3,7 +3,7 @@
<ClientOfficialVersion>3.32.2</ClientOfficialVersion>
<ClientPreviewVersion>3.32.2</ClientPreviewVersion>
<ClientPreviewSuffixVersion>preview</ClientPreviewSuffixVersion>
<DirectVersion>3.30.2</DirectVersion>
<DirectVersion>3.30.4</DirectVersion>
<EncryptionOfficialVersion>2.0.1</EncryptionOfficialVersion>
<EncryptionPreviewVersion>2.0.1</EncryptionPreviewVersion>
<EncryptionPreviewSuffixVersion>preview</EncryptionPreviewSuffixVersion>
Expand Down
61 changes: 33 additions & 28 deletions Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs
Expand Up @@ -387,15 +387,21 @@ public void SetOpenConnectionsHandler(IOpenConnectionsHandler openConnectionsHan
}
}

public void TryRemoveAddresses(
/// <summary>
/// Marks the <see cref="TransportAddressUri"/> to Unhealthy that matches with the faulted
/// server key.
/// </summary>
/// <param name="serverKey">An instance of <see cref="ServerKey"/> that contains the host and
/// port of the backend replica.</param>
public async Task MarkAddressesToUnhealthyAsync(
ServerKey serverKey)
{
if (serverKey == null)
{
throw new ArgumentNullException(nameof(serverKey));
}

if (this.serverPartitionAddressToPkRangeIdMap.TryRemove(serverKey, out HashSet<PartitionKeyRangeIdentity> pkRangeIds))
if (this.serverPartitionAddressToPkRangeIdMap.TryGetValue(serverKey, out HashSet<PartitionKeyRangeIdentity> pkRangeIds))
{
PartitionKeyRangeIdentity[] pkRangeIdsCopy;
lock (pkRangeIds)
Expand All @@ -405,36 +411,35 @@ public void SetOpenConnectionsHandler(IOpenConnectionsHandler openConnectionsHan

foreach (PartitionKeyRangeIdentity pkRangeId in pkRangeIdsCopy)
{
DefaultTrace.TraceInformation("Remove addresses for collectionRid :{0}, pkRangeId: {1}, serviceEndpoint: {2}",
pkRangeId.CollectionRid,
pkRangeId.PartitionKeyRangeId,
this.serviceEndpoint);

this.serverPartitionAddressCache.TryRemove(pkRangeId);
}
}
}

public async Task<PartitionAddressInformation> UpdateAsync(
PartitionKeyRangeIdentity partitionKeyRangeIdentity,
CancellationToken cancellationToken)
{
if (partitionKeyRangeIdentity == null)
{
throw new ArgumentNullException(nameof(partitionKeyRangeIdentity));
}

cancellationToken.ThrowIfCancellationRequested();

return await this.serverPartitionAddressCache.GetAsync(
key: partitionKeyRangeIdentity,
// The forceRefresh flag is set to true for the callback delegate is because, if the GetAsync() from the async
// non-blocking cache fails to look up the pkRangeId, then there are some inconsistency present in the cache, and it is
// more safe to do a force refresh to fetch the addresses from the gateway, instead of fetching it from the cache itself.
// Please note that, the chances of encountering such scenario is highly unlikely.
PartitionAddressInformation addressInfo = await this.serverPartitionAddressCache.GetAsync(
key: pkRangeId,
singleValueInitFunc: (_) => this.GetAddressesForRangeIdAsync(
null,
cachedAddresses: null,
partitionKeyRangeIdentity.CollectionRid,
partitionKeyRangeIdentity.PartitionKeyRangeId,
pkRangeId.CollectionRid,
pkRangeId.PartitionKeyRangeId,
forceRefresh: true),
forceRefresh: (_) => true);
forceRefresh: (_) => false);

IReadOnlyList<TransportAddressUri> transportAddresses = addressInfo.Get(Protocol.Tcp)?.ReplicaTransportAddressUris;
foreach (TransportAddressUri address in from TransportAddressUri transportAddress in transportAddresses
where serverKey.Equals(transportAddress.ReplicaServerKey)
select transportAddress)
{
DefaultTrace.TraceInformation("Marking a backend replica to Unhealthy for collectionRid :{0}, pkRangeId: {1}, serviceEndpoint: {2}, transportAddress: {3}",
pkRangeId.CollectionRid,
pkRangeId.PartitionKeyRangeId,
this.serviceEndpoint,
address.ToString());

address.SetUnhealthy();
}
}
}
}

private async Task<Tuple<PartitionKeyRangeIdentity, PartitionAddressInformation>> ResolveMasterAsync(DocumentServiceRequest request, bool forceRefresh)
Expand Down
25 changes: 4 additions & 21 deletions Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs
Expand Up @@ -228,33 +228,16 @@ public void SetOpenConnectionsHandler(IOpenConnectionsHandler openConnectionsHan
}

public async Task UpdateAsync(
IReadOnlyList<AddressCacheToken> addressCacheTokens,
CancellationToken cancellationToken)
{
List<Task> tasks = new List<Task>();

foreach (AddressCacheToken cacheToken in addressCacheTokens)
{
if (this.addressCacheByEndpoint.TryGetValue(cacheToken.ServiceEndpoint, out EndpointCache endpointCache))
{
tasks.Add(endpointCache.AddressCache.UpdateAsync(cacheToken.PartitionKeyRangeIdentity, cancellationToken));
}
}

await Task.WhenAll(tasks);
}

public Task UpdateAsync(
ServerKey serverKey,
CancellationToken cancellationToken)
{
foreach (KeyValuePair<Uri, EndpointCache> addressCache in this.addressCacheByEndpoint)
{
// since we don't know which address cache contains the pkRanges mapped to this node, we do a tryRemove on all AddressCaches of all regions
addressCache.Value.AddressCache.TryRemoveAddresses(serverKey);
// since we don't know which address cache contains the pkRanges mapped to this node,
// we mark all transport uris that has the same server key to unhealthy status in the
// AddressCaches of all regions.
await addressCache.Value.AddressCache.MarkAddressesToUnhealthyAsync(serverKey);
}

return Task.CompletedTask;
}

/// <summary>
Expand Down
Expand Up @@ -18,6 +18,8 @@ namespace Microsoft.Azure.Cosmos
using Microsoft.Azure.Cosmos.Tests;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using Microsoft.Azure.Documents.Rntbd;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;

Expand Down Expand Up @@ -108,8 +110,11 @@ public void TestGatewayAddressCacheAutoRefreshOnSuboptimalPartition()
public async Task TestGatewayAddressCacheUpdateOnConnectionResetAsync()
{
FakeMessageHandler messageHandler = new FakeMessageHandler();
HttpClient httpClient = new HttpClient(messageHandler);
httpClient.Timeout = TimeSpan.FromSeconds(120);
HttpClient httpClient = new HttpClient(messageHandler)
{
Timeout = TimeSpan.FromSeconds(120)
};

GatewayAddressCache cache = new GatewayAddressCache(
new Uri(GatewayAddressCacheTests.DatabaseAccountApiEndpoint),
Documents.Client.Protocol.Tcp,
Expand All @@ -129,8 +134,9 @@ public async Task TestGatewayAddressCacheUpdateOnConnectionResetAsync()

Assert.IsNotNull(addresses.AllAddresses.Select(address => address.PhysicalUri == "https://blabla.com"));

// call updateAddress
cache.TryRemoveAddresses(new Documents.Rntbd.ServerKey(new Uri("https://blabla.com")));
// Mark transport addresses to Unhealthy depcting a connection reset event.
ServerKey faultyServerKey = new (new Uri("https://blabla2.com"));
await cache.MarkAddressesToUnhealthyAsync(faultyServerKey);

// check if the addresss is updated
addresses = await cache.TryGetAddressesAsync(
Expand All @@ -140,7 +146,15 @@ public async Task TestGatewayAddressCacheUpdateOnConnectionResetAsync()
false,
CancellationToken.None);

Assert.IsNotNull(addresses.AllAddresses.Select(address => address.PhysicalUri == "https://blabla5.com"));
// Validate that the above transport uri with host blabla2.com has been marked Unhealthy.
IReadOnlyList<TransportAddressUri> transportAddressUris = addresses
.Get(Protocol.Tcp)?
.ReplicaTransportAddressUris;

TransportAddressUri transportAddressUri = transportAddressUris
.Single(x => x.ReplicaServerKey.Equals(faultyServerKey));

Assert.IsTrue(condition: transportAddressUri.GetCurrentHealthState().GetHealthStatus().Equals(TransportAddressHealthState.HealthStatus.Unhealthy));
}

[TestMethod]
Expand Down Expand Up @@ -1458,23 +1472,23 @@ public int GetMethodInvocationCount()
}

Task IOpenConnectionsHandler.TryOpenRntbdChannelsAsync(
IReadOnlyList<TransportAddressUri> addresses)
IEnumerable<TransportAddressUri> addresses)
{
this.totalReceivedAddressesCounter = addresses.Count;
for (int i = 0; i < addresses.Count; i++)
this.totalReceivedAddressesCounter = addresses.Count();
for (int i = 0; i < addresses.Count(); i++)
{
if (this.useAttemptBasedFailingIndexs)
{
if (this.failIndexesByAttempts.ContainsKey(i) && this.failIndexesByAttempts[i].Contains(this.methodInvocationCounter))
{
this.ExecuteFailureCondition(
addresses: addresses,
addresses: addresses.ToList(),
index: i);
}
else
{
this.ExecuteSuccessCondition(
addresses: addresses,
addresses: addresses.ToList(),
index: i);
}
}
Expand All @@ -1483,13 +1497,13 @@ public int GetMethodInvocationCount()
if (this.failingIndexes.Contains(i))
{
this.ExecuteFailureCondition(
addresses: addresses,
addresses: addresses.ToList(),
index: i);
}
else
{
this.ExecuteSuccessCondition(
addresses: addresses,
addresses: addresses.ToList(),
index: i);
}
}
Expand Down
Expand Up @@ -125,6 +125,8 @@ public void ValidateStoreResultSerialization()
storeResultProperties.Remove(nameof(storeResult.Target.Exception));
storeResultProperties.Add("transportRequestTimeline");
storeResultProperties.Remove(nameof(storeResult.Target.TransportRequestStats));
storeResultProperties.Add("ReplicaHealthStatuses");
storeResultProperties.Remove(nameof(storeResult.Target.ReplicaHealthStatuses));

foreach (string key in jsonPropertyNames)
{
Expand Down

0 comments on commit 4ab6293

Please sign in to comment.