From 4e53501b0e16aa2307d44a098949685650039e27 Mon Sep 17 00:00:00 2001 From: Simon Cropp Date: Wed, 22 Apr 2026 06:54:28 +1000 Subject: [PATCH 1/2] Fix connection race in concurrent GetLastTimeStamp MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Commit d9016cc cached ResolveQuery's Task via Interlocked.CompareExchange so concurrent callers would share a single query. But each caller that lost the CompareExchange race still had its own ResolveQuery task running in the background, executing Open/HasViewServerState/Close against the caller's own connection. Meanwhile the caller proceeded to Execute against that same connection, producing "The connection is closed" (production) or "does not support MultipleActiveResultSets" (local) errors. Serialize ResolveQuery with a SemaphoreSlim so at most one caller ever runs it. Strengthen ConcurrencyTests to use closed connections, a synchronous barrier on dedicated threads, and 20 iterations — reliably reproduces the race on the old code. --- docs/sqlserver-ef.md | 2 +- docs/sqlserver.md | 2 +- src/Delta/DeltaExtensions_Sql.cs | 30 +++++++++-------- src/DeltaTests/ConcurrencyTests.cs | 52 +++++++++++++++++++++++------- 4 files changed, 59 insertions(+), 27 deletions(-) diff --git a/docs/sqlserver-ef.md b/docs/sqlserver-ef.md index aaa7d58..3c77dd9 100644 --- a/docs/sqlserver-ef.md +++ b/docs/sqlserver-ef.md @@ -47,7 +47,7 @@ const string logRndLsnCommand = from sys.dm_db_log_stats(db_id()) """; ``` -snippet source | anchor +snippet source | anchor diff --git a/docs/sqlserver.md b/docs/sqlserver.md index 2c6b1be..5e09469 100644 --- a/docs/sqlserver.md +++ b/docs/sqlserver.md @@ -47,7 +47,7 @@ const string logRndLsnCommand = from sys.dm_db_log_stats(db_id()) """; ``` -snippet source | anchor +snippet source | anchor diff --git a/src/Delta/DeltaExtensions_Sql.cs b/src/Delta/DeltaExtensions_Sql.cs index 0e086cc..66d7799 100644 --- a/src/Delta/DeltaExtensions_Sql.cs +++ b/src/Delta/DeltaExtensions_Sql.cs @@ -4,23 +4,24 @@ public static partial class DeltaExtensions { public static async Task GetLastTimeStamp(this DbConnection connection, DbTransaction? transaction = null, Cancel cancel = default) { - // Cache the Task (not the result) so concurrent callers share a single ResolveQuery call. - // Interlocked.CompareExchange ensures only the first caller's Task wins; - // all others await that same already-in-flight Task. - var resolved = queryTask; - if (resolved is null) + // Serialize ResolveQuery so at most one caller ever runs it. + // A CompareExchange-on-Task approach would still let losing callers + // execute ResolveQuery to completion against their own connection, + // racing the caller's subsequent Execute on the same connection. + var execute = query; + if (execute is null) { - resolved = ResolveQuery(connection, transaction, cancel); - var original = Interlocked.CompareExchange(ref queryTask, resolved, null); - if (original is not null) + await queryLock.WaitAsync(cancel); + try { - resolved = original; + execute = query ??= await ResolveQuery(connection, transaction, cancel); + } + finally + { + queryLock.Release(); } } - // Awaiting an already-completed Task is essentially free — the runtime short-circuits it without - // touching the thread pool or allocating a state machine. same cost as reading a field. - var execute = await resolved; return await Execute(connection, transaction, execute, cancel); } @@ -143,11 +144,12 @@ static async Task HasViewServerState(DbCommand command, Cancel cancel = de return result == 1; } - static Task>>? queryTask; + static Func>? query; + static readonly SemaphoreSlim queryLock = new(1, 1); internal static void Reset() { - queryTask = null; + query = null; timeStampCache = null; } } \ No newline at end of file diff --git a/src/DeltaTests/ConcurrencyTests.cs b/src/DeltaTests/ConcurrencyTests.cs index 2289afe..abc4008 100644 --- a/src/DeltaTests/ConcurrencyTests.cs +++ b/src/DeltaTests/ConcurrencyTests.cs @@ -6,29 +6,59 @@ public async Task GetLastTimeStamp_ConcurrentAfterReset() { await using var database = await LocalDb(); - // Reset to null so all concurrent callers enter ResolveQuery - DeltaExtensions.Reset(); + // Several iterations: the race between a losing ResolveQuery task + // and the caller's own Execute on the same connection is timing + // dependent, so loop to make reproduction reliable. + for (var iteration = 0; iteration < 20; iteration++) + { + DeltaExtensions.Reset(); + await RunConcurrent(database); + } + } - // Open separate connections so concurrent calls don't share a single non-thread-safe SqlConnection - const int concurrency = 20; + static async Task RunConcurrent(SqlDatabase database) + { + // Closed connections force Execute through its Open/Close branch — + // the production scenario (EF manages connection lifetime) and + // the precondition for the race on the static query field. + const int concurrency = 64; var connections = new SqlConnection[concurrency]; for (var i = 0; i < concurrency; i++) { - connections[i] = await database.OpenNewConnection(); + connections[i] = new(database.ConnectionString); } try { + // Synchronous barrier + dedicated threads so all callers really + // race into ResolveQuery together, rather than being staggered + // by threadpool dispatch. + using var barrier = new ManualResetEventSlim(false); var tasks = connections - .Select(_ => _.GetLastTimeStamp()) + .Select(connection => Task.Factory.StartNew( + async () => + { + barrier.Wait(); + return await connection.GetLastTimeStamp(); + }, + CancellationToken.None, + TaskCreationOptions.LongRunning, + TaskScheduler.Default) + .Unwrap()) .ToArray(); - var results = await Task.WhenAll(tasks); + await Task.Delay(50); + barrier.Set(); - // All calls should return the same timestamp regardless of the race on the static query field - var distinct = results.Distinct().ToList(); - That(distinct, Has.Count.EqualTo(1)); - IsNotEmpty(distinct[0]); + // Task.WhenAll surfaces any exception from the race; the + // original bug manifested as "The connection is closed" thrown + // by ExecuteReaderAsync after an orphaned ResolveQuery closed + // the same connection mid-query. + var results = await Task.WhenAll(tasks); + foreach (var result in results) + { + IsNotEmpty(result); + } } finally { From 28cc9f0ac7cab20460b337da2530aea09603cfd4 Mon Sep 17 00:00:00 2001 From: Simon Cropp Date: Wed, 22 Apr 2026 08:28:57 +1000 Subject: [PATCH 2/2] Update ConcurrencyTests.cs --- src/DeltaTests/ConcurrencyTests.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/DeltaTests/ConcurrencyTests.cs b/src/DeltaTests/ConcurrencyTests.cs index abc4008..f03d91e 100644 --- a/src/DeltaTests/ConcurrencyTests.cs +++ b/src/DeltaTests/ConcurrencyTests.cs @@ -38,10 +38,11 @@ static async Task RunConcurrent(SqlDatabase database) .Select(connection => Task.Factory.StartNew( async () => { + // ReSharper disable once AccessToDisposedClosure barrier.Wait(); return await connection.GetLastTimeStamp(); }, - CancellationToken.None, + Cancel.None, TaskCreationOptions.LongRunning, TaskScheduler.Default) .Unwrap())