Skip to content

Commit

Permalink
Adds support for IOptionMonitor (#58)
Browse files Browse the repository at this point in the history
* optionsmonitor

* version bump

* added implementation

* tests

* bumping dependency

* changelog

* Changes on SDK 3.22

* inline dispose

* builder

* nullable

* add note
  • Loading branch information
ealsur committed Oct 28, 2021
1 parent 8cda3b3 commit 891c991
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 27 deletions.
7 changes: 7 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## Unreleased

## <a name="1.1.1"/> 1.1.1 - 2021-10-28

### Added

- [#58](https://github.com/Azure/Microsoft.Extensions.Caching.Cosmos/pull/58) Added support for IOptionsMonitor


## <a name="1.0.1"/> 1.0.1 - 2021-08-24

### Fixed
Expand Down
84 changes: 67 additions & 17 deletions src/CosmosCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,47 +21,57 @@ public class CosmosCache : IDistributedCache, IDisposable
private const string ContainerPartitionKeyPath = "/id";
private const int DefaultTimeToLive = -1;
private readonly SemaphoreSlim connectionLock = new SemaphoreSlim(initialCount: 1, maxCount: 1);
private readonly CosmosCacheOptions options;
private readonly IDisposable monitorListener;
private CosmosCacheOptions options;
private CosmosClient cosmosClient;
private Container cosmosContainer;
private bool initializedClient;
private bool isDisposed = false;

/// <summary>
/// Initializes a new instance of the <see cref="CosmosCache"/> class.
/// </summary>
/// <param name="optionsAccessor">Options accessor.</param>
public CosmosCache(IOptions<CosmosCacheOptions> optionsAccessor)
{
if (optionsAccessor == null)
{
throw new ArgumentNullException(nameof(optionsAccessor));
}

if (string.IsNullOrEmpty(optionsAccessor.Value.DatabaseName))
{
throw new ArgumentNullException(nameof(optionsAccessor.Value.DatabaseName));
}
this.Initialize(optionsAccessor);
}

if (string.IsNullOrEmpty(optionsAccessor.Value.ContainerName))
/// <summary>
/// Initializes a new instance of the <see cref="CosmosCache"/> class.
/// </summary>
/// <remarks>
/// Using the <see cref="IOptionsMonitor{T}"/> would make the internal client reference to be updated if any of the options change.
/// </remarks>
/// <param name="optionsMonitor">Options monitor.</param>
public CosmosCache(IOptionsMonitor<CosmosCacheOptions> optionsMonitor)
{
if (optionsMonitor == null)
{
throw new ArgumentNullException(nameof(optionsAccessor.Value.ContainerName));
throw new ArgumentNullException(nameof(optionsMonitor));
}

if (optionsAccessor.Value.ClientBuilder == null && optionsAccessor.Value.CosmosClient == null)
{
throw new ArgumentNullException("You need to specify either a CosmosConfiguration or an existing CosmosClient in the CosmosCacheOptions.");
}
this.Initialize(optionsMonitor.CurrentValue);

this.options = optionsAccessor.Value;
this.monitorListener = optionsMonitor.OnChange(this.OnOptionsChange);
}

/// <inheritdoc/>
public void Dispose()
{
if (this.isDisposed)
{
return;
}

if (this.initializedClient && this.cosmosClient != null)
{
this.cosmosClient.Dispose();
}

this.monitorListener?.Dispose();

this.isDisposed = true;
}

/// <inheritdoc/>
Expand Down Expand Up @@ -367,6 +377,31 @@ private static CosmosCacheSession BuildCosmosCacheSession(string key, byte[] con
return absoluteExpiration;
}

private void Initialize(IOptions<CosmosCacheOptions> optionsAccessor)
{
if (optionsAccessor == null)
{
throw new ArgumentNullException(nameof(optionsAccessor));
}

if (string.IsNullOrEmpty(optionsAccessor.Value.DatabaseName))
{
throw new ArgumentNullException(nameof(optionsAccessor.Value.DatabaseName));
}

if (string.IsNullOrEmpty(optionsAccessor.Value.ContainerName))
{
throw new ArgumentNullException(nameof(optionsAccessor.Value.ContainerName));
}

if (optionsAccessor.Value.ClientBuilder == null && optionsAccessor.Value.CosmosClient == null)
{
throw new ArgumentNullException("You need to specify either a CosmosConfiguration or an existing CosmosClient in the CosmosCacheOptions.");
}

this.options = optionsAccessor.Value;
}

private async Task ConnectAsync(CancellationToken token = default(CancellationToken))
{
token.ThrowIfCancellationRequested();
Expand All @@ -390,6 +425,21 @@ private async Task ConnectAsync(CancellationToken token = default(CancellationTo
}
}

private void OnOptionsChange(CosmosCacheOptions options)
{
// Did we create our own internal client? If so, we need to dispose it.
if (this.initializedClient && this.cosmosClient != null)
{
// In case this becomes an issue with concurrent access to the client, we can see if ReaderWriterLockSlim can be leveraged.
this.cosmosClient.Dispose();
}

this.options = options;

// Force re-initialization on the next Connect
this.cosmosContainer = null;
}

private async Task<Container> CosmosContainerInitializeAsync()
{
this.initializedClient = this.options.CosmosClient == null;
Expand Down
13 changes: 12 additions & 1 deletion src/CosmosCacheServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ namespace Microsoft.Extensions.DependencyInjection
using System;
using Microsoft.Extensions.Caching.Cosmos;
using Microsoft.Extensions.Caching.Distributed;
using Microsoft.Extensions.Options;

/// <summary>
/// Extension methods for setting up Azure Cosmos DB distributed cache related services in an <see cref="IServiceCollection" />.
Expand Down Expand Up @@ -34,7 +35,17 @@ public static IServiceCollection AddCosmosCache(this IServiceCollection services

services.AddOptions();
services.Configure(setupAction);
services.Add(ServiceDescriptor.Singleton<IDistributedCache, CosmosCache>());
services.Add(ServiceDescriptor.Singleton<IDistributedCache, CosmosCache>((IServiceProvider provider) =>
{
IOptionsMonitor<CosmosCacheOptions> optionsMonitor = provider.GetService<IOptionsMonitor<CosmosCacheOptions>>();
if (optionsMonitor != null)
{
return new CosmosCache(optionsMonitor);
}
IOptions<CosmosCacheOptions> options = provider.GetRequiredService<IOptions<CosmosCacheOptions>>();
return new CosmosCache(options);
}));

return services;
}
Expand Down
4 changes: 2 additions & 2 deletions src/CosmosDistributedCache.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<Copyright>© Microsoft Corporation. All rights reserved.</Copyright>
<CurrentDate>$([System.DateTime]::Now.ToString(yyyyMMdd))</CurrentDate>
<NeutralLanguage>en-US</NeutralLanguage>
<ClientVersion>1.0.1</ClientVersion>
<ClientVersion>1.1.0</ClientVersion>
<VersionSuffix Condition=" '$(IsPreview)' == 'true' ">preview</VersionSuffix>
<Version Condition=" '$(VersionSuffix)' == '' ">$(ClientVersion)</Version>
<Version Condition=" '$(VersionSuffix)' != '' ">$(ClientVersion)-$(VersionSuffix)</Version>
Expand Down Expand Up @@ -40,7 +40,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.20.1" />
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.22.1" />
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="3.1.16" />
<PackageReference Include="Microsoft.Extensions.Options" Version="3.1.16" />
</ItemGroup>
Expand Down
118 changes: 111 additions & 7 deletions tests/unit/CosmosCacheTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ public class CosmosCacheTests
public void RequiredParameters()
{
// Null-check
Assert.Throws<ArgumentNullException>(() => new CosmosCache(null));
Assert.Throws<ArgumentNullException>(() => new CosmosCache((IOptions<CosmosCacheOptions>)null));
Assert.Throws<ArgumentNullException>(() => new CosmosCache((IOptionsMonitor<CosmosCacheOptions>)null));

IOptions<CosmosCacheOptions> options = Options.Create(new CosmosCacheOptions(){});
// Database
Expand All @@ -49,14 +50,72 @@ public void RequiredParameters()
}));
}

[Fact]
public async Task OnChangeOptionsReInitializes()
{
DiagnosticsSink diagnosticsSink = new DiagnosticsSink();
string etag = "etag";
CosmosCacheSession existingSession = new CosmosCacheSession();
existingSession.SessionKey = "key";
existingSession.Content = new byte[0];
Mock<CosmosClient> mockedClient = new Mock<CosmosClient>();
Mock<Container> mockedContainer = new Mock<Container>();
Mock<ContainerResponse> mockedResponse = new Mock<ContainerResponse>();
Mock<CosmosDiagnostics> mockedContainerDiagnostics = new Mock<CosmosDiagnostics>();
Mock<ItemResponse<CosmosCacheSession>> mockedItemResponse = new Mock<ItemResponse<CosmosCacheSession>>();
Mock<CosmosDiagnostics> mockedItemDiagnostics = new Mock<CosmosDiagnostics>();
mockedResponse.Setup(c => c.StatusCode).Returns(HttpStatusCode.OK);
mockedResponse.Setup(c => c.Diagnostics).Returns(mockedContainerDiagnostics.Object);
mockedContainer.Setup(c => c.ReadContainerAsync(It.IsAny<ContainerRequestOptions>(), It.IsAny<CancellationToken>())).ReturnsAsync(mockedResponse.Object);
mockedClient.Setup(c => c.GetContainer(It.IsAny<string>(), It.IsAny<string>())).Returns(mockedContainer.Object);
mockedClient.Setup(x => x.Endpoint).Returns(new Uri("http://localhost"));
mockedItemResponse.Setup(c => c.Resource).Returns(existingSession);
mockedItemResponse.Setup(c => c.ETag).Returns(etag);
mockedItemResponse.Setup(c => c.Diagnostics).Returns(mockedItemDiagnostics.Object);
mockedContainer.Setup(c => c.ReadItemAsync<CosmosCacheSession>(It.Is<string>(id => id == "key"), It.IsAny<PartitionKey>(), It.IsAny<ItemRequestOptions>(), It.IsAny<CancellationToken>())).ReturnsAsync(mockedItemResponse.Object);

CosmosCacheOptions initialOptions = new CosmosCacheOptions(){
DatabaseName = "something",
ContainerName = "something",
CosmosClient = mockedClient.Object,
DiagnosticsHandler = diagnosticsSink.CaptureDiagnostics
};

TestOptionsMonitor<CosmosCacheOptions> optionsMonitor = new TestOptionsMonitor<CosmosCacheOptions>(initialOptions);

CosmosCache cache = new CosmosCache(optionsMonitor);

await cache.GetAsync("key");

CosmosCacheOptions newOptions = new CosmosCacheOptions(){
DatabaseName = "something",
ContainerName = "somethingElse",
CosmosClient = mockedClient.Object,
DiagnosticsHandler = diagnosticsSink.CaptureDiagnostics
};

optionsMonitor.SetNewValue(newOptions);

await cache.GetAsync("key");

cache.Dispose();

mockedClient.Verify(c => c.GetContainer(It.IsAny<string>(), It.Is<string>(n => n == "something")), Times.Exactly(2));
mockedClient.Verify(c => c.GetContainer(It.IsAny<string>(), It.Is<string>(n => n == "somethingElse")), Times.Exactly(2));
mockedContainer.Verify(c => c.ReadContainerAsync(It.IsAny<ContainerRequestOptions>(), It.IsAny<CancellationToken>()), Times.Exactly(2));
mockedContainer.Verify(c => c.ReadItemAsync<CosmosCacheSession>(It.Is<string>(id => id == "key"), It.IsAny<PartitionKey>(), It.IsAny<ItemRequestOptions>(), It.IsAny<CancellationToken>()), Times.Exactly(2));

optionsMonitor.MockedDisposable.Verify(d => d.Dispose(), Times.Once);
}

[Fact]
public async Task ConnectAsyncThrowsIfContainerDoesNotExist()
{
DiagnosticsSink diagnosticsSink = new DiagnosticsSink();

Mock<CosmosClient> mockedClient = new Mock<CosmosClient>();
Mock<Container> mockedContainer = new Mock<Container>();
CosmosException exception = new CosmosException("test", HttpStatusCode.NotFound, 0, "", 0);
MockedException exception = new MockedException("test", HttpStatusCode.NotFound, 0, "", 0);
mockedContainer.Setup(c => c.ReadContainerAsync(It.IsAny<ContainerRequestOptions>(), It.IsAny<CancellationToken>())).ThrowsAsync(exception);
mockedClient.Setup(c => c.GetContainer(It.IsAny<string>(), It.IsAny<string>())).Returns(mockedContainer.Object);
mockedClient.Setup(x => x.Endpoint).Returns(new Uri("http://localhost"));
Expand Down Expand Up @@ -284,7 +343,7 @@ public async Task GetDoesNotRetryUpdateIfRetrySlidingExpirationUpdatesIsFalse()
mockedResponse.Setup(c => c.Diagnostics).Returns(mockedContainerDiagnostics.Object);
mockedContainer.Setup(c => c.ReadContainerAsync(It.IsAny<ContainerRequestOptions>(), It.IsAny<CancellationToken>())).ReturnsAsync(mockedResponse.Object);
mockedContainer.Setup(c => c.ReadItemAsync<CosmosCacheSession>(It.Is<string>(id => id == "key"), It.IsAny<PartitionKey>(), It.IsAny<ItemRequestOptions>(), It.IsAny<CancellationToken>())).ReturnsAsync(mockedItemResponse.Object);
CosmosException preconditionFailedException = new CosmosException("test", HttpStatusCode.PreconditionFailed, 0, "", 0);
MockedException preconditionFailedException = new MockedException("test", HttpStatusCode.PreconditionFailed, 0, "", 0);
mockedContainer.Setup(c => c.ReplaceItemAsync<CosmosCacheSession>(It.Is<CosmosCacheSession>(item => item == existingSession), It.Is<string>(id => id == "key"), It.IsAny<PartitionKey?>(), It.IsAny<ItemRequestOptions>(), It.IsAny<CancellationToken>())).ThrowsAsync(preconditionFailedException);
mockedDatabaseResponse.Setup(c => c.Diagnostics).Returns(mockedDatabaseDiagnostics.Object);
mockedClient.Setup(c => c.GetContainer(It.IsAny<string>(), It.IsAny<string>())).Returns(mockedContainer.Object);
Expand Down Expand Up @@ -339,7 +398,7 @@ public async Task GetDoesRetryUpdateIfRetrySlidingExpirationUpdatesIsTrue()
mockedResponse.Setup(c => c.Diagnostics).Returns(mockedContainerDiagnostics.Object);
mockedContainer.Setup(c => c.ReadContainerAsync(It.IsAny<ContainerRequestOptions>(), It.IsAny<CancellationToken>())).ReturnsAsync(mockedResponse.Object);
mockedContainer.Setup(c => c.ReadItemAsync<CosmosCacheSession>(It.Is<string>(id => id == "key"), It.IsAny<PartitionKey>(), It.IsAny<ItemRequestOptions>(), It.IsAny<CancellationToken>())).ReturnsAsync(mockedItemResponse.Object);
CosmosException preconditionException = new CosmosException("test", HttpStatusCode.PreconditionFailed, 0, "", 0);
MockedException preconditionException = new MockedException("test", HttpStatusCode.PreconditionFailed, 0, "", 0);
mockedContainer.SetupSequence(c => c.ReplaceItemAsync<CosmosCacheSession>(It.Is<CosmosCacheSession>(item => item == existingSession), It.Is<string>(id => id == "key"), It.IsAny<PartitionKey?>(), It.IsAny<ItemRequestOptions>(), It.IsAny<CancellationToken>()))
.ThrowsAsync(preconditionException)
.ReturnsAsync(mockedItemResponse.Object);
Expand Down Expand Up @@ -385,9 +444,9 @@ public async Task GetReturnsNullIfKeyDoesNotExist()
mockedResponse.Setup(c => c.StatusCode).Returns(HttpStatusCode.OK);
mockedResponse.Setup(c => c.Diagnostics).Returns(mockedContainerDiagnostics.Object);
mockedContainer.Setup(c => c.ReadContainerAsync(It.IsAny<ContainerRequestOptions>(), It.IsAny<CancellationToken>())).ReturnsAsync(mockedResponse.Object);
CosmosException notFoundException = new CosmosException("test", HttpStatusCode.NotFound, 0, "", 0);
MockedException notFoundException = new MockedException("test", HttpStatusCode.NotFound, 0, "", 0);
mockedContainer.Setup(c => c.ReadItemAsync<CosmosCacheSession>(It.Is<string>(id => id == "key"), It.IsAny<PartitionKey>(), It.IsAny<ItemRequestOptions>(), It.IsAny<CancellationToken>())).ThrowsAsync(notFoundException);
mockedContainer.Setup(c => c.ReplaceItemAsync<CosmosCacheSession>(It.IsAny<CosmosCacheSession>(), It.Is<string>(id => id == "key"), It.IsAny<PartitionKey?>(), It.IsAny<ItemRequestOptions>(), It.IsAny<CancellationToken>())).ThrowsAsync(new CosmosException("test", HttpStatusCode.NotFound, 0, "", 0));
mockedContainer.Setup(c => c.ReplaceItemAsync<CosmosCacheSession>(It.IsAny<CosmosCacheSession>(), It.Is<string>(id => id == "key"), It.IsAny<PartitionKey?>(), It.IsAny<ItemRequestOptions>(), It.IsAny<CancellationToken>())).ThrowsAsync(new MockedException("test", HttpStatusCode.NotFound, 0, "", 0));
mockedClient.Setup(c => c.GetContainer(It.IsAny<string>(), It.IsAny<string>())).Returns(mockedContainer.Object);
mockedClient.Setup(c => c.GetDatabase(It.IsAny<string>())).Returns(mockedDatabase.Object);
mockedClient.Setup(x => x.Endpoint).Returns(new Uri("http://localhost"));
Expand Down Expand Up @@ -454,7 +513,7 @@ public async Task RemoveAsyncNotExistItem()
mockedResponse.Setup(c => c.StatusCode).Returns(HttpStatusCode.OK);
mockedResponse.Setup(c => c.Diagnostics).Returns(mockedContainerDiagnostics.Object);
mockedContainer.Setup(c => c.ReadContainerAsync(It.IsAny<ContainerRequestOptions>(), It.IsAny<CancellationToken>())).ReturnsAsync(mockedResponse.Object);
CosmosException notExistException = new CosmosException("test remove not exist", HttpStatusCode.NotFound, 0, "", 0);
MockedException notExistException = new MockedException("test remove not exist", HttpStatusCode.NotFound, 0, "", 0);
mockedContainer.Setup(c => c.DeleteItemAsync<CosmosCacheSession>("not-exist-key", It.IsAny<PartitionKey>(), It.IsAny<ItemRequestOptions>(), It.IsAny<CancellationToken>()))
.ThrowsAsync(notExistException)
.Verifiable();
Expand Down Expand Up @@ -587,5 +646,50 @@ public void CaptureDiagnostics(CosmosDiagnostics diagnostics)
this.capturedDiagnostics.Add(diagnostics);
}
}

private class TestOptionsMonitor<CosmosCacheOptions> : IOptionsMonitor<CosmosCacheOptions>
{
private readonly List<Action<CosmosCacheOptions, string>> _callbacks = new List<Action<CosmosCacheOptions, string>>();
public Mock<IDisposable> MockedDisposable { get; } = new Mock<IDisposable>();

public TestOptionsMonitor(CosmosCacheOptions currentValue)
{
CurrentValue = currentValue;
}

public CosmosCacheOptions Get(string name)
{
return CurrentValue;
}

public IDisposable OnChange(Action<CosmosCacheOptions, string> listener)
{
this._callbacks.Add(listener);
return this.MockedDisposable.Object;
}

public void SetNewValue(CosmosCacheOptions newValue)
{
this.CurrentValue = newValue;
foreach (Action<CosmosCacheOptions, string> callback in this._callbacks)
{
callback(newValue, string.Empty);
}
}

public CosmosCacheOptions CurrentValue { get; private set; }
}

private class MockedException : CosmosException
{
private readonly Mock<CosmosDiagnostics> mockedDiagnostics = new Mock<CosmosDiagnostics>();

public MockedException(string message, HttpStatusCode statusCode, int subStatusCode, string activityId, double requestCharge)
: base(message, statusCode, subStatusCode, activityId, requestCharge)
{
}

public override CosmosDiagnostics Diagnostics => this.mockedDiagnostics.Object;
}
}
}

0 comments on commit 891c991

Please sign in to comment.