Skip to content

Commit

Permalink
Recreate outbox index also when the expiryAfterSeconds column is miss…
Browse files Browse the repository at this point in the history
…ing (#521) (#523)

* Reproduction test

* Make sure index creation takes missing expireAfterSeconds into account
  • Loading branch information
danielmarbach committed Jun 23, 2023
1 parent 831e67e commit 6c12667
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
namespace NServiceBus.Storage.MongoDB.Tests
{
using System;
using System.Globalization;
using System.Linq;
using System.Threading.Tasks;
using global::MongoDB.Bson;
using global::MongoDB.Driver;
using NUnit.Framework;

[TestFixture]
public class OutboxInitializationTests
{
[OneTimeSetUp]
public async Task OneTimeSetUp()
{
databaseName = "Test_" + DateTime.UtcNow.Ticks.ToString(CultureInfo.InvariantCulture);

var databaseSettings = new MongoDatabaseSettings
{
ReadConcern = ReadConcern.Majority,
ReadPreference = ReadPreference.Primary,
WriteConcern = WriteConcern.WMajority
};

var database = ClientProvider.Client.GetDatabase(databaseName, databaseSettings);

await database.CreateCollectionAsync(CollectionNamingConvention<OutboxRecord>());

outboxCollection = ClientProvider.Client.GetDatabase(databaseName).GetCollection<OutboxRecord>(CollectionNamingConvention<OutboxRecord>());
}

static string CollectionNamingConvention<T>() => CollectionNamingConvention(typeof(T));

static string CollectionNamingConvention(Type type) => type.Name.ToLower();

[SetUp]
public async Task Setup() => await outboxCollection.Indexes.DropAllAsync();

[Theory]
public async Task Should_create_index_when_it_doesnt_exist(TimeSpan timeToKeepOutboxDeduplicationData)
{
OutboxStorage.InitializeOutboxTypes(ClientProvider.Client, databaseName, CollectionNamingConvention, timeToKeepOutboxDeduplicationData);

await AssertIndexCorrect(outboxCollection, timeToKeepOutboxDeduplicationData);
}

[Theory]
public async Task Should_recreate_when_expiry_drifts(TimeSpan timeToKeepOutboxDeduplicationData)
{
OutboxStorage.InitializeOutboxTypes(ClientProvider.Client, databaseName, CollectionNamingConvention, timeToKeepOutboxDeduplicationData.Add(TimeSpan.FromSeconds(30)));

OutboxStorage.InitializeOutboxTypes(ClientProvider.Client, databaseName, CollectionNamingConvention, timeToKeepOutboxDeduplicationData);

await AssertIndexCorrect(outboxCollection, timeToKeepOutboxDeduplicationData);
}

[Theory]
public async Task Should_recreate_when_expiry_column_dropped(TimeSpan timeToKeepOutboxDeduplicationData)
{
var indexModel = new CreateIndexModel<OutboxRecord>(Builders<OutboxRecord>.IndexKeys.Ascending(record => record.Dispatched), new CreateIndexOptions
{
Name = OutboxStorage.OutboxCleanupIndexName,
Background = true
});
await outboxCollection.Indexes.CreateOneAsync(indexModel);

OutboxStorage.InitializeOutboxTypes(ClientProvider.Client, databaseName, CollectionNamingConvention, timeToKeepOutboxDeduplicationData);

await AssertIndexCorrect(outboxCollection, timeToKeepOutboxDeduplicationData);
}

[DatapointSource]
public TimeSpan[] Expiry = new TimeSpan[] { TimeSpan.FromHours(1), TimeSpan.FromHours(3), TimeSpan.FromDays(1) };

static async Task AssertIndexCorrect(IMongoCollection<OutboxRecord> outboxCollection, TimeSpan expiry)
{
var outboxCleanupIndex = (await outboxCollection.Indexes.ListAsync()).ToList().SingleOrDefault(indexDocument => indexDocument.GetElement("name").Value == OutboxStorage.OutboxCleanupIndexName);

Assert.IsNotNull(outboxCleanupIndex);

BsonElement bsonElement = outboxCleanupIndex.GetElement("expireAfterSeconds");
Assert.NotNull(bsonElement);

Assert.AreEqual(expiry, TimeSpan.FromSeconds(bsonElement.Value.ToInt32()));
}

[OneTimeTearDown]
public async Task OneTimeTearDown() => await ClientProvider.Client.DropDatabaseAsync(databaseName);

IMongoCollection<OutboxRecord> outboxCollection;
string databaseName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using System.Threading.Tasks;
using Extensibility;
using global::MongoDB.Driver;
using MongoDB;
using NServiceBus.Outbox;

public class OutboxTestsConfiguration
Expand Down
36 changes: 19 additions & 17 deletions src/NServiceBus.Storage.MongoDB/Outbox/OutboxStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,30 +62,32 @@ internal static void InitializeOutboxTypes(IMongoClient client, string databaseN
};

var outboxCollection = client.GetDatabase(databaseName).GetCollection<OutboxRecord>(collectionNamingConvention(typeof(OutboxRecord)), collectionSettings);
var outboxCleanupIndex = outboxCollection.Indexes.List().ToList().SingleOrDefault(indexDocument => indexDocument.GetElement("name").Value == outboxCleanupIndexName);
var existingExpiration = outboxCleanupIndex?.GetElement("expireAfterSeconds").Value.ToInt32();

var createIndex = outboxCleanupIndex is null;

if (existingExpiration.HasValue && TimeSpan.FromSeconds(existingExpiration.Value) != timeToKeepOutboxDeduplicationData)
var outboxCleanupIndex = outboxCollection.Indexes.List().ToList().SingleOrDefault(indexDocument => indexDocument.GetElement("name").Value == OutboxCleanupIndexName);
var createIndex = false;
if (outboxCleanupIndex is null)
{
outboxCollection.Indexes.DropOne(outboxCleanupIndexName);
createIndex = true;
}

if (createIndex)
else if (!outboxCleanupIndex.TryGetElement("expireAfterSeconds", out var existingExpiration) || TimeSpan.FromSeconds(existingExpiration.Value.ToInt32()) != timeToKeepOutboxDeduplicationData)
{
var indexModel = new CreateIndexModel<OutboxRecord>(Builders<OutboxRecord>.IndexKeys.Ascending(record => record.Dispatched), new CreateIndexOptions
{
ExpireAfter = timeToKeepOutboxDeduplicationData,
Name = outboxCleanupIndexName,
Background = true
});
outboxCollection.Indexes.DropOne(OutboxCleanupIndexName);
createIndex = true;
}

outboxCollection.Indexes.CreateOne(indexModel);
if (!createIndex)
{
return;
}

var indexModel = new CreateIndexModel<OutboxRecord>(Builders<OutboxRecord>.IndexKeys.Ascending(record => record.Dispatched), new CreateIndexOptions
{
ExpireAfter = timeToKeepOutboxDeduplicationData,
Name = OutboxCleanupIndexName,
Background = true
});
outboxCollection.Indexes.CreateOne(indexModel);
}

const string outboxCleanupIndexName = "OutboxCleanup";
internal const string OutboxCleanupIndexName = "OutboxCleanup";
}
}

0 comments on commit 6c12667

Please sign in to comment.