From c1ff2d3349ff36cde16609c61029ad2f869e1439 Mon Sep 17 00:00:00 2001 From: Adrian Hall Date: Fri, 30 Aug 2024 13:52:32 -0700 Subject: [PATCH] (#90) Fixed threading issue in PushAsync --- .../Operations/PullOperationManager.cs | 3 +- .../OperationsQueue/OperationsQueueManager.cs | 30 ++++++-- .../Helpers/ServiceApplicationFactory.cs | 9 +++ .../Helpers/ServiceTest.cs | 22 ++++-- .../Offline/Integration_Push_Tests.cs | 74 +++++++++++++++++++ 5 files changed, 123 insertions(+), 15 deletions(-) diff --git a/src/CommunityToolkit.Datasync.Client/Offline/Operations/PullOperationManager.cs b/src/CommunityToolkit.Datasync.Client/Offline/Operations/PullOperationManager.cs index 01a2f6c8..ec44fa0e 100644 --- a/src/CommunityToolkit.Datasync.Client/Offline/Operations/PullOperationManager.cs +++ b/src/CommunityToolkit.Datasync.Client/Offline/Operations/PullOperationManager.cs @@ -161,11 +161,12 @@ internal async Task> GetPageAsync(HttpClient client, Uri requestUri object? result = await JsonSerializer.DeserializeAsync(response.ContentStream, pageType, context.JsonSerializerOptions, cancellationToken).ConfigureAwait(false) ?? throw new DatasyncPullException("JSON result is null") { ServiceResponse = response }; - return new Page() + Page page = new Page() { Items = (IEnumerable)itemsPropInfo.GetValue(result)!, NextLink = (string?)nextLinkPropInfo.GetValue(result) }; + return page; } catch (JsonException ex) { diff --git a/src/CommunityToolkit.Datasync.Client/Offline/OperationsQueue/OperationsQueueManager.cs b/src/CommunityToolkit.Datasync.Client/Offline/OperationsQueue/OperationsQueueManager.cs index 1ffc3e65..1a378b75 100644 --- a/src/CommunityToolkit.Datasync.Client/Offline/OperationsQueue/OperationsQueueManager.cs +++ b/src/CommunityToolkit.Datasync.Client/Offline/OperationsQueue/OperationsQueueManager.cs @@ -19,6 +19,11 @@ namespace CommunityToolkit.Datasync.Client.Offline.OperationsQueue; /// internal class OperationsQueueManager : IOperationsQueueManager { + /// + /// A lock object for locking against concurrent changes to the queue. + /// + private readonly object pushlock = new(); + /// /// The map of valid entities that can be synchronized to the service. /// @@ -296,10 +301,14 @@ internal async Task PushAsync(IEnumerable entityTypes, PushOpt if (!response.IsSuccessful) { - operation.LastAttempt = DateTimeOffset.UtcNow; - operation.HttpStatusCode = response.StatusCode; - operation.State = OperationState.Failed; - _ = this._context.Update(operation); + lock (this.pushlock) + { + operation.LastAttempt = DateTimeOffset.UtcNow; + operation.HttpStatusCode = response.StatusCode; + operation.State = OperationState.Failed; + _ = this._context.Update(operation); + } + return response; } @@ -311,7 +320,11 @@ internal async Task PushAsync(IEnumerable entityTypes, PushOpt ReplaceDatabaseValue(oldValue, newValue); } - _ = this._context.DatasyncOperationsQueue.Remove(operation); + lock (this.pushlock) + { + _ = this._context.DatasyncOperationsQueue.Remove(operation); + } + return null; } @@ -327,8 +340,11 @@ internal void ReplaceDatabaseValue(object? oldValue, object? newValue) throw new DatasyncException("Internal Datasync Error: invalid values for replacement."); } - EntityEntry tracker = this._context.Entry(oldValue); - tracker.CurrentValues.SetValues(newValue); + lock (this.pushlock) + { + EntityEntry tracker = this._context.Entry(oldValue); + tracker.CurrentValues.SetValues(newValue); + } } /// diff --git a/tests/CommunityToolkit.Datasync.Client.Test/Helpers/ServiceApplicationFactory.cs b/tests/CommunityToolkit.Datasync.Client.Test/Helpers/ServiceApplicationFactory.cs index 1353f802..1bc7bdeb 100644 --- a/tests/CommunityToolkit.Datasync.Client.Test/Helpers/ServiceApplicationFactory.cs +++ b/tests/CommunityToolkit.Datasync.Client.Test/Helpers/ServiceApplicationFactory.cs @@ -70,6 +70,15 @@ internal static bool IsValid(IMovie movie) && movie.Duration >= 60 && movie.Duration <= 360; } + internal void ResetInMemoryMovies() + { + using IServiceScope scope = Services.CreateScope(); + InMemoryRepository repository = scope.ServiceProvider.GetRequiredService>() as InMemoryRepository; + List sourceData = TestCommon.TestData.Movies.OfType(); + repository.Clear(); + sourceData.ForEach(movie => repository.StoreEntity(movie)); + } + internal void RunWithRepository(Action> action) where TEntity : InMemoryTableData { using IServiceScope scope = Services.CreateScope(); diff --git a/tests/CommunityToolkit.Datasync.Client.Test/Helpers/ServiceTest.cs b/tests/CommunityToolkit.Datasync.Client.Test/Helpers/ServiceTest.cs index 869bbbe0..7ea4ec20 100644 --- a/tests/CommunityToolkit.Datasync.Client.Test/Helpers/ServiceTest.cs +++ b/tests/CommunityToolkit.Datasync.Client.Test/Helpers/ServiceTest.cs @@ -20,16 +20,18 @@ public abstract class ServiceTest(ServiceApplicationFactory factory) protected DateTimeOffset StartTime { get; } = DateTimeOffset.UtcNow; - internal IntegrationDbContext GetOfflineContext(bool useRealFile = false) + internal IntegrationDbContext GetOfflineContext(bool useRealFile = false, bool enableLogging = false) { string filename = null; string connectionString = "Data Source=:memory:"; if (useRealFile) { filename = Path.GetTempFileName(); - SqliteConnectionStringBuilder builder = new(); - builder.DataSource = filename; - builder.Mode = SqliteOpenMode.ReadWriteCreate; + SqliteConnectionStringBuilder builder = new() + { + DataSource = filename, + Mode = SqliteOpenMode.ReadWriteCreate + }; connectionString = builder.ConnectionString; } @@ -38,9 +40,12 @@ internal IntegrationDbContext GetOfflineContext(bool useRealFile = false) DbContextOptionsBuilder optionsBuilder = new(); optionsBuilder.UseSqlite(connection); - optionsBuilder.LogTo(Console.WriteLine); - optionsBuilder.EnableSensitiveDataLogging(); - optionsBuilder.EnableDetailedErrors(); + if (enableLogging) + { + optionsBuilder.LogTo(Console.WriteLine); + optionsBuilder.EnableSensitiveDataLogging(); + optionsBuilder.EnableDetailedErrors(); + } IntegrationDbContext context = new(optionsBuilder.Options) { @@ -71,6 +76,9 @@ internal InMemoryMovie GetRandomMovie() internal TEntity GetServerEntityById(string id) where TEntity : InMemoryTableData => factory.GetServerEntityById(id); + internal void ResetInMemoryMovies() + => factory.ResetInMemoryMovies(); + protected void SeedKitchenSinkWithCountryData() { factory.RunWithRepository(repository => diff --git a/tests/CommunityToolkit.Datasync.Client.Test/Offline/Integration_Push_Tests.cs b/tests/CommunityToolkit.Datasync.Client.Test/Offline/Integration_Push_Tests.cs index edf1d3cb..1eae2959 100644 --- a/tests/CommunityToolkit.Datasync.Client.Test/Offline/Integration_Push_Tests.cs +++ b/tests/CommunityToolkit.Datasync.Client.Test/Offline/Integration_Push_Tests.cs @@ -43,6 +43,8 @@ public void Dispose() [Fact] public async Task PushAsync_Complex_Situation() { + ResetInMemoryMovies(); + PullResult initialPullResults = await this.context.Movies.PullAsync(); initialPullResults.IsSuccessful.Should().BeTrue(); initialPullResults.Additions.Should().Be(248); @@ -109,4 +111,76 @@ public async Task PushAsync_Complex_Situation() // The service always replaces additions and replacements - updating the last updatedAt. pullResults.Replacements.Should().Be(moviesToReplace.Count + 1); } + + [Fact] + public async Task PushAsync_Multithreaded() + { + ResetInMemoryMovies(); + + PullResult initialPullResults = await this.context.Movies.PullAsync(); + initialPullResults.IsSuccessful.Should().BeTrue(); + initialPullResults.Additions.Should().Be(248); + initialPullResults.Deletions.Should().Be(0); + initialPullResults.Replacements.Should().Be(0); + + // Let's add some new movies + ClientMovie blackPanther = new(TestCommon.TestData.Movies.BlackPanther) { Id = Guid.NewGuid().ToString("N") }; + this.context.Movies.Add(blackPanther); + await this.context.SaveChangesAsync(); + + // And remove any movie that matches some criteria + List moviesToDelete = await this.context.Movies.Where(x => x.Duration > 180).ToListAsync(); + this.context.Movies.RemoveRange(moviesToDelete); + await this.context.SaveChangesAsync(); + + // Then replace all the Unrated movies with a rating of NC17 + List moviesToReplace = await this.context.Movies.Where(x => x.Rating == MovieRating.Unrated).ToListAsync(); + moviesToReplace.ForEach(r => + { + r.Rating = MovieRating.NC17; + r.Title = r.Title.PadLeft('-'); + this.context.Movies.Update(r); + }); + await this.context.SaveChangesAsync(); + + // Check the queue. + List operations = await this.context.DatasyncOperationsQueue.ToListAsync(); + operations.Count.Should().Be(1 + moviesToDelete.Count + moviesToReplace.Count); + operations.Count(x => x.Kind is OperationKind.Add).Should().Be(1); + operations.Count(x => x.Kind is OperationKind.Delete).Should().Be(moviesToDelete.Count); + operations.Count(x => x.Kind is OperationKind.Replace).Should().Be(moviesToReplace.Count); + + // Now push the results and check what we did + PushResult pushResults = await this.context.Movies.PushAsync(new PushOptions { ParallelOperations = 8 }); + + // This little snippet of code is to aid debugging if this test fails + if (!pushResults.IsSuccessful) + { + foreach (KeyValuePair failedRequest in pushResults.FailedRequests) + { + string id = failedRequest.Key; + ServiceResponse response = failedRequest.Value; + string jsonContent = string.Empty; + if (response.HasContent) + { + using StreamReader reader = new(response.ContentStream); + jsonContent = reader.ReadToEnd(); + } + + Console.WriteLine($"FAILED REQUEST FOR ID: {id}: {response.StatusCode}\n{jsonContent}"); + } + } + + pushResults.IsSuccessful.Should().BeTrue(); + pushResults.CompletedOperations.Should().Be(1 + moviesToDelete.Count + moviesToReplace.Count); + this.context.DatasyncOperationsQueue.Should().BeEmpty(); + + // Now use PullAsync() again - these should all be pulled down again + PullResult pullResults = await this.context.PullAsync(); + pullResults.IsSuccessful.Should().BeTrue(); + pullResults.Additions.Should().Be(0); + pullResults.Deletions.Should().Be(0); + // The service always replaces additions and replacements - updating the last updatedAt. + pullResults.Replacements.Should().Be(moviesToReplace.Count + 1); + } }