From 0588ece072dcd1a4c8c93fa943f579329c16c1f4 Mon Sep 17 00:00:00 2001 From: Jatin Sanghvi <20547963+JatinSanghvi@users.noreply.github.com> Date: Thu, 15 Sep 2022 08:01:14 +0530 Subject: [PATCH] Refactor locking logic in class `SqlTableChangeMonitor` --- src/TriggerBinding/SqlTableChangeMonitor.cs | 82 ++++++++++----------- 1 file changed, 39 insertions(+), 43 deletions(-) diff --git a/src/TriggerBinding/SqlTableChangeMonitor.cs b/src/TriggerBinding/SqlTableChangeMonitor.cs index 908f10248..08f6f60ee 100644 --- a/src/TriggerBinding/SqlTableChangeMonitor.cs +++ b/src/TriggerBinding/SqlTableChangeMonitor.cs @@ -51,14 +51,18 @@ internal sealed class SqlTableChangeMonitor : IDisposable private readonly CancellationTokenSource _cancellationTokenSourceRenewLeases; private CancellationTokenSource _cancellationTokenSourceExecutor; - // The semaphore ensures that mutable class members such as this._rows are accessed by only one thread at a time. + // The semaphore gets used by lease-renewal loop to ensure that '_state' stays set to 'ProcessingChanges' while + // the leases are being renewed. The change-consumption loop requires to wait for the semaphore before modifying + // the value of '_state' back to 'CheckingForChanges'. Since the field '_rows' is only updated if the value of + // '_state' is set to 'CheckingForChanges', this guarantees that '_rows' will stay same while it is being + // iterated over inside the lease-renewal loop. private readonly SemaphoreSlim _rowsLock; private readonly IDictionary _telemetryProps; private IReadOnlyList> _rows; private int _leaseRenewalCount; - private State _state = State.CheckingForChanges; + private State _state; /// /// Initializes a new instance of the > class. @@ -116,7 +120,7 @@ public SqlTableChangeMonitor( this._telemetryProps = telemetryProps; - this._rowsLock = new SemaphoreSlim(1); + this._rowsLock = new SemaphoreSlim(1, 1); this._rows = new List>(); this._leaseRenewalCount = 0; this._state = State.CheckingForChanges; @@ -156,6 +160,7 @@ private async Task RunChangeConsumptionLoopAsync() this._logger.LogDebugWithThreadId("BEGIN OpenChangeConsumptionConnection"); await connection.OpenAsync(token); this._logger.LogDebugWithThreadId("END OpenChangeConsumptionConnection"); + // Check for cancellation request only after a cycle of checking and processing of changes completes. while (!token.IsCancellationRequested) { @@ -314,7 +319,7 @@ private async Task ProcessTableChangesAsync(SqlConnection connection, Cancellati { this._logger.LogError($"Failed to compose trigger parameter value for table: '{this._userTable.FullName} due to exception: {e.GetType()}. Exception message: {e.Message}"); TelemetryInstance.TrackException(TelemetryErrorName.ProcessChanges, e, this._telemetryProps); - await this.ClearRowsAsync(true); + await this.ClearRowsAsync(); } if (changes != null) @@ -346,7 +351,7 @@ private async Task ProcessTableChangesAsync(SqlConnection connection, Cancellati this._logger.LogError($"Failed to trigger user function for table: '{this._userTable.FullName} due to exception: {result.Exception.GetType()}. Exception message: {result.Exception.Message}"); TelemetryInstance.TrackException(TelemetryErrorName.ProcessChanges, result.Exception, this._telemetryProps, measures); - await this.ClearRowsAsync(true); + await this.ClearRowsAsync(); } } } @@ -370,11 +375,9 @@ private async void RunLeaseRenewalLoopAsync() this._logger.LogDebugWithThreadId("BEGIN OpenLeaseRenewalLoopConnection"); await connection.OpenAsync(token); this._logger.LogDebugWithThreadId("END OpenLeaseRenewalLoopConnection"); + while (!token.IsCancellationRequested) { - this._logger.LogDebugWithThreadId("BEGIN WaitRowsLock - LeaseRenewal"); - await this._rowsLock.WaitAsync(token); - this._logger.LogDebugWithThreadId("END WaitRowsLock - LeaseRenewal"); await this.RenewLeasesAsync(connection, token); await Task.Delay(TimeSpan.FromSeconds(LeaseRenewalIntervalInSeconds), token); } @@ -398,9 +401,13 @@ private async void RunLeaseRenewalLoopAsync() private async Task RenewLeasesAsync(SqlConnection connection, CancellationToken token) { - try + this._logger.LogDebugWithThreadId("BEGIN WaitRowsLock - RenewLeases"); + await this._rowsLock.WaitAsync(token); + this._logger.LogDebugWithThreadId("END WaitRowsLock - RenewLeases"); + + if (this._state == State.ProcessingChanges) { - if (this._state == State.ProcessingChanges) + try { // I don't think I need a transaction for renewing leases. If this worker reads in a row from the // leases table and determines that it corresponds to its batch of changes, but then that row gets @@ -423,19 +430,16 @@ private async Task RenewLeasesAsync(SqlConnection connection, CancellationToken TelemetryInstance.TrackEvent(TelemetryEventName.RenewLeasesEnd, this._telemetryProps, measures); } } - } - catch (Exception e) - { - // This catch block is necessary so that the finally block is executed even in the case of an exception - // (see https://docs.microsoft.com/dotnet/csharp/language-reference/keywords/try-finally, third - // paragraph). If we fail to renew the leases, multiple workers could be processing the same change - // data, but we have functionality in place to deal with this (see design doc). - this._logger.LogError($"Failed to renew leases due to exception: {e.GetType()}. Exception message: {e.Message}"); - TelemetryInstance.TrackException(TelemetryErrorName.RenewLeases, e, this._telemetryProps); - } - finally - { - if (this._state == State.ProcessingChanges) + catch (Exception e) + { + // This catch block is necessary so that the finally block is executed even in the case of an exception + // (see https://docs.microsoft.com/dotnet/csharp/language-reference/keywords/try-finally, third + // paragraph). If we fail to renew the leases, multiple workers could be processing the same change + // data, but we have functionality in place to deal with this (see design doc). + this._logger.LogError($"Failed to renew leases due to exception: {e.GetType()}. Exception message: {e.Message}"); + TelemetryInstance.TrackException(TelemetryErrorName.RenewLeases, e, this._telemetryProps); + } + finally { // Do we want to update this count even in the case of a failure to renew the leases? Probably, // because the count is simply meant to indicate how much time the other thread has spent processing @@ -455,30 +459,27 @@ private async Task RenewLeasesAsync(SqlConnection connection, CancellationToken this._cancellationTokenSourceExecutor = new CancellationTokenSource(); } } - - // Want to always release the lock at the end, even if renewing the leases failed. - this._logger.LogDebugWithThreadId("ReleaseRowLock - RenewLeases"); - this._rowsLock.Release(); } + + // Want to always release the lock at the end, even if renewing the leases failed. + this._logger.LogDebugWithThreadId("ReleaseRowsLock - RenewLeases"); + this._rowsLock.Release(); } /// /// Resets the in-memory state of the change monitor and sets it to start polling for changes again. /// - /// True if ClearRowsAsync should acquire the "_rowsLock" (only true in the case of a failure) - private async Task ClearRowsAsync(bool acquireLock) + private async Task ClearRowsAsync() { - if (acquireLock) - { - this._logger.LogDebugWithThreadId("BEGIN WaitRowsLock - ClearRows"); - await this._rowsLock.WaitAsync(); - this._logger.LogDebugWithThreadId("END WaitRowsLock - ClearRows"); - } + this._logger.LogDebugWithThreadId("BEGIN WaitRowsLock - ClearRows"); + await this._rowsLock.WaitAsync(); + this._logger.LogDebugWithThreadId("END WaitRowsLock - ClearRows"); this._leaseRenewalCount = 0; this._state = State.CheckingForChanges; this._rows = new List>(); - this._logger.LogDebugWithThreadId("ReleaseRowLock - ClearRows"); + + this._logger.LogDebugWithThreadId("ReleaseRowsLock - ClearRows"); this._rowsLock.Release(); } @@ -489,10 +490,6 @@ private async Task ClearRowsAsync(bool acquireLock) private async Task ReleaseLeasesAsync(SqlConnection connection, CancellationToken token) { TelemetryInstance.TrackEvent(TelemetryEventName.ReleaseLeasesStart, this._telemetryProps); - this._logger.LogDebugWithThreadId("BEGIN WaitRowsLock - ReleaseLeases"); - // Don't want to change the "_rows" while another thread is attempting to renew leases on them. - await this._rowsLock.WaitAsync(token); - this._logger.LogDebugWithThreadId("END WaitRowsLock - ReleaseLeases"); long newLastSyncVersion = this.RecomputeLastSyncVersion(); bool retrySucceeded = false; @@ -569,9 +566,8 @@ private async Task ReleaseLeasesAsync(SqlConnection connection, CancellationToke } } } - // Want to do this before releasing the lock in case the renew leases thread wakes up. It will see that - // the state is checking for changes and not renew the (just released) leases. - await this.ClearRowsAsync(false); + + await this.ClearRowsAsync(); } ///