Skip to content

Commit

Permalink
Fix race conditions on SlidingExpiration + AbsoluteExpiration (#80)
Browse files Browse the repository at this point in the history
* Handling race conditions

* tests

* release prep
  • Loading branch information
ealsur committed Mar 26, 2024
1 parent 6fea577 commit 7c53dcc
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 3 deletions.
7 changes: 6 additions & 1 deletion changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## Unreleased

## <a name="1.6.1"/> 1.6.1 - 2024-03-27

### Fixed

- [#80](https://github.com/Azure/Microsoft.Extensions.Caching.Cosmos/pull/80) Fixed race conditions on SlidingExpiration + AbsoluteExpiration

## <a name="1.6.0"/> 1.6.0 - 2023-11-01

### Added

- [#72](https://github.com/Azure/Microsoft.Extensions.Caching.Cosmos/pull/72) Increased SDK dependency version for critical fixes


## <a name="1.5.0"/> 1.5.0 - 2023-06-22

### Added
Expand Down
35 changes: 34 additions & 1 deletion src/CosmosCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ public async Task<byte[]> GetAsync(string key, CancellationToken token = default
DateTimeOffset absoluteExpiration = DateTimeOffset.FromUnixTimeSeconds(cosmosCacheSessionResponse.Resource.AbsoluteSlidingExpiration.GetValueOrDefault());
if (absoluteExpiration < DateTimeOffset.UtcNow)
{
cosmosCacheSessionResponse.Resource.TimeToLive = 0;
// At this point the cache item we just read expired, in which case, we should treat it as not found.
// The TTL will clean it up on the container.
return null;
}
else
{
Expand All @@ -148,6 +150,12 @@ public async Task<byte[]> GetAsync(string key, CancellationToken token = default

this.options.DiagnosticsHandler?.Invoke(replaceCacheSessionResponse.Diagnostics);
}
catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.NotFound)
{
// The cache item has expired in-between the read and replace operations.
this.options.DiagnosticsHandler?.Invoke(ex.Diagnostics);
return null;
}
catch (CosmosException cosmosException) when (cosmosException.StatusCode == HttpStatusCode.PreconditionFailed)
{
this.options.DiagnosticsHandler?.Invoke(cosmosException.Diagnostics);
Expand Down Expand Up @@ -203,6 +211,26 @@ public async Task RefreshAsync(string key, CancellationToken token = default(Can
{
try
{
if (cosmosCacheSessionResponse.Resource.AbsoluteSlidingExpiration.GetValueOrDefault() > 0)
{
long ttl = cosmosCacheSessionResponse.Resource.TimeToLive.Value;
DateTimeOffset absoluteExpiration = DateTimeOffset.FromUnixTimeSeconds(cosmosCacheSessionResponse.Resource.AbsoluteSlidingExpiration.GetValueOrDefault());
if (absoluteExpiration < DateTimeOffset.UtcNow)
{
// At this point the cache item we just read expired, in which case, we should treat it as not found.
// The TTL will clean it up on the container.
return;
}
else
{
double pendingSeconds = (absoluteExpiration - DateTimeOffset.UtcNow).TotalSeconds;
if (pendingSeconds < ttl)
{
cosmosCacheSessionResponse.Resource.TimeToLive = (long)pendingSeconds;
}
}
}

cosmosCacheSessionResponse.Resource.PartitionKeyAttribute = this.options.ContainerPartitionKeyAttribute;
ItemResponse<CosmosCacheSession> replaceCacheSessionResponse = await this.cosmosContainer.ReplaceItemAsync(
partitionKey: new PartitionKey(key),
Expand All @@ -217,6 +245,11 @@ public async Task RefreshAsync(string key, CancellationToken token = default(Can

this.options.DiagnosticsHandler?.Invoke(replaceCacheSessionResponse.Diagnostics);
}
catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.NotFound)
{
// The cache item has expired in-between the read and replace operations.
this.options.DiagnosticsHandler?.Invoke(ex.Diagnostics);
}
catch (CosmosException cosmosException) when (cosmosException.StatusCode == HttpStatusCode.PreconditionFailed)
{
// Race condition on replace, we need do not need to refresh it
Expand Down
2 changes: 1 addition & 1 deletion src/CosmosDistributedCache.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<Copyright>© Microsoft Corporation. All rights reserved.</Copyright>
<CurrentDate>$([System.DateTime]::Now.ToString(yyyyMMdd))</CurrentDate>
<NeutralLanguage>en-US</NeutralLanguage>
<ClientVersion>1.6.0</ClientVersion>
<ClientVersion>1.6.1</ClientVersion>
<VersionSuffix Condition=" '$(IsPreview)' == 'true' ">preview</VersionSuffix>
<Version Condition=" '$(VersionSuffix)' == '' ">$(ClientVersion)</Version>
<Version Condition=" '$(VersionSuffix)' != '' ">$(ClientVersion)-$(VersionSuffix)</Version>
Expand Down
80 changes: 80 additions & 0 deletions tests/unit/CosmosCacheTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,86 @@ public async Task ValidatesNoExpirationUsesNullTtl()
mockedContainer.Verify(c => c.UpsertItemAsync<CosmosCacheSession>(It.Is<CosmosCacheSession>(item => item.SessionKey == existingSession.SessionKey && item.TimeToLive == ttl), It.IsAny<PartitionKey?>(), It.IsAny<ItemRequestOptions>(), It.IsAny<CancellationToken>()), Times.Once);
}

[Fact]
public async Task SlidingExpirationWithAbsoluteExpirationOnExpiredRead()
{
const int ttlSliding = 20;
const int ttlAbsolute = 50;
string etag = "etag";
CosmosCacheSession existingSession = new CosmosCacheSession();
existingSession.SessionKey = "key";
existingSession.Content = new byte[0];
existingSession.IsSlidingExpiration = true;
existingSession.TimeToLive = ttlSliding;
existingSession.AbsoluteSlidingExpiration = DateTimeOffset.UtcNow.AddSeconds(-ttlAbsolute).ToUnixTimeSeconds();
Mock<ItemResponse<CosmosCacheSession>> mockedItemResponse = new Mock<ItemResponse<CosmosCacheSession>>();
Mock<CosmosClient> mockedClient = new Mock<CosmosClient>();
Mock<Container> mockedContainer = new Mock<Container>();
Mock<Database> mockedDatabase = new Mock<Database>();
Mock<ContainerResponse> mockedResponse = new Mock<ContainerResponse>();
mockedItemResponse.Setup(c => c.Resource).Returns(existingSession);
mockedItemResponse.Setup(c => c.ETag).Returns(etag);
mockedResponse.Setup(c => c.StatusCode).Returns(HttpStatusCode.OK);
mockedContainer.Setup(c => c.ReadContainerAsync(It.IsAny<ContainerRequestOptions>(), It.IsAny<CancellationToken>())).ReturnsAsync(mockedResponse.Object);
mockedContainer.Setup(c => c.ReadItemAsync<CosmosCacheSession>(It.Is<string>(id => id == "key"), It.IsAny<PartitionKey>(), It.IsAny<ItemRequestOptions>(), It.IsAny<CancellationToken>())).ReturnsAsync(mockedItemResponse.Object);
mockedClient.Setup(c => c.GetContainer(It.IsAny<string>(), It.IsAny<string>())).Returns(mockedContainer.Object);
mockedClient.Setup(c => c.GetDatabase(It.IsAny<string>())).Returns(mockedDatabase.Object);
mockedClient.Setup(x => x.Endpoint).Returns(new Uri("http://localhost"));
CosmosCache cache = new CosmosCache(Options.Create(new CosmosCacheOptions(){
DatabaseName = "something",
ContainerName = "something",
CreateIfNotExists = true,
CosmosClient = mockedClient.Object
}));

Assert.Null(await cache.GetAsync("key"));
// Checks for Db existence due to CreateIfNotExists
mockedClient.Verify(c => c.CreateDatabaseIfNotExistsAsync(It.IsAny<string>(), It.IsAny<int?>(), It.IsAny<RequestOptions>(), It.IsAny<CancellationToken>()), Times.Once);
mockedContainer.Verify(c => c.ReadItemAsync<CosmosCacheSession>(It.Is<string>(id => id == "key"), It.IsAny<PartitionKey>(), It.IsAny<ItemRequestOptions>(), It.IsAny<CancellationToken>()), Times.Once);
mockedContainer.Verify(c => c.ReplaceItemAsync<CosmosCacheSession>(It.Is<CosmosCacheSession>(item => item.TimeToLive == ttlSliding), It.Is<string>(id => id == "key"), It.IsAny<PartitionKey?>(), It.IsAny<ItemRequestOptions>(), It.IsAny<CancellationToken>()), Times.Never);
}

[Fact]
public async Task SlidingExpirationWithAbsoluteExpirationOnReplaceNotFound()
{
const int ttlSliding = 20;
const int ttlAbsolute = 50;
string etag = "etag";
CosmosCacheSession existingSession = new CosmosCacheSession();
existingSession.SessionKey = "key";
existingSession.Content = new byte[0];
existingSession.IsSlidingExpiration = true;
existingSession.TimeToLive = ttlSliding;
existingSession.AbsoluteSlidingExpiration = DateTimeOffset.UtcNow.AddSeconds(ttlAbsolute).ToUnixTimeSeconds();
Mock<ItemResponse<CosmosCacheSession>> mockedItemResponse = new Mock<ItemResponse<CosmosCacheSession>>();
Mock<CosmosClient> mockedClient = new Mock<CosmosClient>();
Mock<Container> mockedContainer = new Mock<Container>();
Mock<Database> mockedDatabase = new Mock<Database>();
Mock<ContainerResponse> mockedResponse = new Mock<ContainerResponse>();
mockedItemResponse.Setup(c => c.Resource).Returns(existingSession);
mockedItemResponse.Setup(c => c.ETag).Returns(etag);
mockedResponse.Setup(c => c.StatusCode).Returns(HttpStatusCode.OK);
mockedContainer.Setup(c => c.ReadContainerAsync(It.IsAny<ContainerRequestOptions>(), It.IsAny<CancellationToken>())).ReturnsAsync(mockedResponse.Object);
mockedContainer.Setup(c => c.ReadItemAsync<CosmosCacheSession>(It.Is<string>(id => id == "key"), It.IsAny<PartitionKey>(), It.IsAny<ItemRequestOptions>(), It.IsAny<CancellationToken>())).ReturnsAsync(mockedItemResponse.Object);
MockedException exception = new MockedException("test", HttpStatusCode.NotFound, 0, "", 0);
mockedContainer.Setup(c => c.ReplaceItemAsync<CosmosCacheSession>(It.Is<CosmosCacheSession>(item => item == existingSession), It.Is<string>(id => id == "key"), It.IsAny<PartitionKey?>(), It.IsAny<ItemRequestOptions>(), It.IsAny<CancellationToken>())).ThrowsAsync(exception);
mockedClient.Setup(c => c.GetContainer(It.IsAny<string>(), It.IsAny<string>())).Returns(mockedContainer.Object);
mockedClient.Setup(c => c.GetDatabase(It.IsAny<string>())).Returns(mockedDatabase.Object);
mockedClient.Setup(x => x.Endpoint).Returns(new Uri("http://localhost"));
CosmosCache cache = new CosmosCache(Options.Create(new CosmosCacheOptions(){
DatabaseName = "something",
ContainerName = "something",
CreateIfNotExists = true,
CosmosClient = mockedClient.Object
}));

Assert.Null(await cache.GetAsync("key"));
// Checks for Db existence due to CreateIfNotExists
mockedClient.Verify(c => c.CreateDatabaseIfNotExistsAsync(It.IsAny<string>(), It.IsAny<int?>(), It.IsAny<RequestOptions>(), It.IsAny<CancellationToken>()), Times.Once);
mockedContainer.Verify(c => c.ReadItemAsync<CosmosCacheSession>(It.Is<string>(id => id == "key"), It.IsAny<PartitionKey>(), It.IsAny<ItemRequestOptions>(), It.IsAny<CancellationToken>()), Times.Once);
mockedContainer.Verify(c => c.ReplaceItemAsync<CosmosCacheSession>(It.Is<CosmosCacheSession>(item => item.TimeToLive == ttlSliding), It.Is<string>(id => id == "key"), It.IsAny<PartitionKey?>(), It.IsAny<ItemRequestOptions>(), It.IsAny<CancellationToken>()), Times.Once);
}

private class DiagnosticsSink
{
private List<CosmosDiagnostics> capturedDiagnostics = new List<CosmosDiagnostics>();
Expand Down

0 comments on commit 7c53dcc

Please sign in to comment.