From 560e9469952e696a2f65b7f6ead126302e555b92 Mon Sep 17 00:00:00 2001 From: Ameya Rele Date: Thu, 8 Sep 2022 16:12:07 +0530 Subject: [PATCH 1/4] Add retry when attempting to release leases --- src/TriggerBinding/SqlTableChangeMonitor.cs | 97 +++++++++++---------- 1 file changed, 50 insertions(+), 47 deletions(-) diff --git a/src/TriggerBinding/SqlTableChangeMonitor.cs b/src/TriggerBinding/SqlTableChangeMonitor.cs index d0f8241f0..c15bb98ef 100644 --- a/src/TriggerBinding/SqlTableChangeMonitor.cs +++ b/src/TriggerBinding/SqlTableChangeMonitor.cs @@ -35,7 +35,7 @@ internal sealed class SqlTableChangeMonitor : IDisposable public const int MaxLeaseRenewalCount = 10; public const int LeaseIntervalInSeconds = 60; public const int LeaseRenewalIntervalInSeconds = 15; - + public const int MaxRetryReleaseLeases = 5; private readonly string _connectionString; private readonly int _userTableId; private readonly SqlObject _userTable; @@ -494,67 +494,70 @@ private async Task ReleaseLeasesAsync(SqlConnection connection, CancellationToke try { - var transactionSw = Stopwatch.StartNew(); - long releaseLeasesDurationMs = 0L, updateLastSyncVersionDurationMs = 0L; - - using (SqlTransaction transaction = connection.BeginTransaction(System.Data.IsolationLevel.RepeatableRead)) + for (int retryCount = 1; retryCount <= MaxRetryReleaseLeases; retryCount++) { - try - { - // Release the leases held on "_rows". - using (SqlCommand releaseLeasesCommand = this.BuildReleaseLeasesCommand(connection, transaction)) - { - this._logger.LogDebugWithThreadId($"BEGIN ReleaseLeases Query={releaseLeasesCommand.CommandText}"); - var commandSw = Stopwatch.StartNew(); - await releaseLeasesCommand.ExecuteNonQueryAsync(token); - releaseLeasesDurationMs = commandSw.ElapsedMilliseconds; - this._logger.LogDebugWithThreadId($"END ReleaseLeases Duration={releaseLeasesDurationMs}ms"); - } + var transactionSw = Stopwatch.StartNew(); + long releaseLeasesDurationMs = 0L, updateLastSyncVersionDurationMs = 0L; - // Update the global state table if we have processed all changes with ChangeVersion <= newLastSyncVersion, - // and clean up the leases table to remove all rows with ChangeVersion <= newLastSyncVersion. - using (SqlCommand updateTablesPostInvocationCommand = this.BuildUpdateTablesPostInvocation(connection, transaction, newLastSyncVersion)) + using (SqlTransaction transaction = connection.BeginTransaction(System.Data.IsolationLevel.RepeatableRead)) + { + try { - this._logger.LogDebugWithThreadId($"BEGIN UpdateTablesPostInvocation Query={updateTablesPostInvocationCommand.CommandText}"); - var commandSw = Stopwatch.StartNew(); - await updateTablesPostInvocationCommand.ExecuteNonQueryAsync(token); - updateLastSyncVersionDurationMs = commandSw.ElapsedMilliseconds; - this._logger.LogDebugWithThreadId($"END UpdateTablesPostInvocation Duration={updateLastSyncVersionDurationMs}ms"); - } + // Release the leases held on "_rows". + using (SqlCommand releaseLeasesCommand = this.BuildReleaseLeasesCommand(connection, transaction)) + { + this._logger.LogDebugWithThreadId($"BEGIN ReleaseLeases Query={releaseLeasesCommand.CommandText}"); + var commandSw = Stopwatch.StartNew(); + await releaseLeasesCommand.ExecuteNonQueryAsync(token); + releaseLeasesDurationMs = commandSw.ElapsedMilliseconds; + this._logger.LogDebugWithThreadId($"END ReleaseLeases Duration={releaseLeasesDurationMs}ms"); + } - transaction.Commit(); + // Update the global state table if we have processed all changes with ChangeVersion <= newLastSyncVersion, + // and clean up the leases table to remove all rows with ChangeVersion <= newLastSyncVersion. + using (SqlCommand updateTablesPostInvocationCommand = this.BuildUpdateTablesPostInvocation(connection, transaction, newLastSyncVersion)) + { + this._logger.LogDebugWithThreadId($"BEGIN UpdateTablesPostInvocation Query={updateTablesPostInvocationCommand.CommandText}"); + var commandSw = Stopwatch.StartNew(); + await updateTablesPostInvocationCommand.ExecuteNonQueryAsync(token); + updateLastSyncVersionDurationMs = commandSw.ElapsedMilliseconds; + this._logger.LogDebugWithThreadId($"END UpdateTablesPostInvocation Duration={updateLastSyncVersionDurationMs}ms"); + } - var measures = new Dictionary - { - [TelemetryMeasureName.ReleaseLeasesDurationMs] = releaseLeasesDurationMs, - [TelemetryMeasureName.UpdateLastSyncVersionDurationMs] = updateLastSyncVersionDurationMs, - }; + transaction.Commit(); - TelemetryInstance.TrackEvent(TelemetryEventName.ReleaseLeasesEnd, this._telemetryProps, measures); - } - catch (Exception ex) - { - this._logger.LogError($"Failed to execute SQL commands to release leases for table '{this._userTable.FullName}' due to exception: {ex.GetType()}. Exception message: {ex.Message}"); - TelemetryInstance.TrackException(TelemetryErrorName.ReleaseLeases, ex, this._telemetryProps); + var measures = new Dictionary + { + [TelemetryMeasureName.ReleaseLeasesDurationMs] = releaseLeasesDurationMs, + [TelemetryMeasureName.UpdateLastSyncVersionDurationMs] = updateLastSyncVersionDurationMs, + }; - try - { - transaction.Rollback(); + TelemetryInstance.TrackEvent(TelemetryEventName.ReleaseLeasesEnd, this._telemetryProps, measures); + // Don't want to loop if no exception occurs. + break; } - catch (Exception ex2) + catch (Exception ex) when (retryCount < MaxRetryReleaseLeases) { - this._logger.LogError($"Failed to rollback transaction due to exception: {ex2.GetType()}. Exception message: {ex2.Message}"); - TelemetryInstance.TrackException(TelemetryErrorName.ReleaseLeasesRollback, ex2, this._telemetryProps); + this._logger.LogError($"Failed to execute SQL commands to release leases in attempt: {retryCount} for table '{this._userTable.FullName}' due to exception: {ex.GetType()}. Exception message: {ex.Message}"); + TelemetryInstance.TrackException(TelemetryErrorName.ReleaseLeases, ex, this._telemetryProps); + + try + { + transaction.Rollback(); + } + catch (Exception ex2) + { + this._logger.LogError($"Failed to rollback transaction due to exception: {ex2.GetType()}. Exception message: {ex2.Message}"); + TelemetryInstance.TrackException(TelemetryErrorName.ReleaseLeasesRollback, ex2, this._telemetryProps); + } + Thread.Sleep(100); } } } } catch (Exception e) { - // What should we do if releasing the leases fails? We could try to release them again or just wait, - // since eventually the lease time will expire. Then another thread will re-process the same changes - // though, so less than ideal. But for now that's the functionality. - this._logger.LogError($"Failed to release leases for table '{this._userTable.FullName}' due to exception: {e.GetType()}. Exception message: {e.Message}"); + this._logger.LogError($"Failed to release leases for table after {MaxRetryReleaseLeases} '{this._userTable.FullName}' due to exception: {e.GetType()}. Exception message: {e.Message}"); TelemetryInstance.TrackException(TelemetryErrorName.ReleaseLeases, e, this._telemetryProps); } finally From 10b9059650cb759b8bf685c2c876212654139f59 Mon Sep 17 00:00:00 2001 From: Ameya Rele Date: Fri, 9 Sep 2022 11:11:28 +0530 Subject: [PATCH 2/4] Address comments --- src/TriggerBinding/SqlTableChangeMonitor.cs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/TriggerBinding/SqlTableChangeMonitor.cs b/src/TriggerBinding/SqlTableChangeMonitor.cs index c15bb98ef..825f5019f 100644 --- a/src/TriggerBinding/SqlTableChangeMonitor.cs +++ b/src/TriggerBinding/SqlTableChangeMonitor.cs @@ -35,7 +35,7 @@ internal sealed class SqlTableChangeMonitor : IDisposable public const int MaxLeaseRenewalCount = 10; public const int LeaseIntervalInSeconds = 60; public const int LeaseRenewalIntervalInSeconds = 15; - public const int MaxRetryReleaseLeases = 5; + public const int MaxRetryReleaseLeases = 3; private readonly string _connectionString; private readonly int _userTableId; private readonly SqlObject _userTable; @@ -550,14 +550,13 @@ private async Task ReleaseLeasesAsync(SqlConnection connection, CancellationToke this._logger.LogError($"Failed to rollback transaction due to exception: {ex2.GetType()}. Exception message: {ex2.Message}"); TelemetryInstance.TrackException(TelemetryErrorName.ReleaseLeasesRollback, ex2, this._telemetryProps); } - Thread.Sleep(100); } } } } catch (Exception e) { - this._logger.LogError($"Failed to release leases for table after {MaxRetryReleaseLeases} '{this._userTable.FullName}' due to exception: {e.GetType()}. Exception message: {e.Message}"); + this._logger.LogError($"Failed to release leases for table '{this._userTable.FullName}' after {MaxRetryReleaseLeases} attempts due to exception: {e.GetType()}. Exception message: {e.Message}"); TelemetryInstance.TrackException(TelemetryErrorName.ReleaseLeases, e, this._telemetryProps); } finally From 4b4f1a70b769592ecf450547e64d88cab60ab983 Mon Sep 17 00:00:00 2001 From: Ameya Rele Date: Mon, 12 Sep 2022 21:01:20 +0530 Subject: [PATCH 3/4] Address more comments --- src/Telemetry/Telemetry.cs | 2 ++ src/TriggerBinding/SqlTableChangeMonitor.cs | 16 +++++++++++----- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/src/Telemetry/Telemetry.cs b/src/Telemetry/Telemetry.cs index 75853419b..2b3f9b8de 100644 --- a/src/Telemetry/Telemetry.cs +++ b/src/Telemetry/Telemetry.cs @@ -384,6 +384,7 @@ public enum TelemetryMeasureName GetPrimaryKeysDurationMs, InsertGlobalStateTableRowDurationMs, ReleaseLeasesDurationMs, + RetryAttemptNumber, SetLastSyncVersionDurationMs, TransactionDurationMs, UpdateLastSyncVersionDurationMs, @@ -408,6 +409,7 @@ public enum TelemetryErrorName ProcessChanges, PropsNotExistOnTable, ReleaseLeases, + ReleaseLeasesNoRetriesLeft, ReleaseLeasesRollback, RenewLeases, RenewLeasesLoop, diff --git a/src/TriggerBinding/SqlTableChangeMonitor.cs b/src/TriggerBinding/SqlTableChangeMonitor.cs index 825f5019f..5d970b12a 100644 --- a/src/TriggerBinding/SqlTableChangeMonitor.cs +++ b/src/TriggerBinding/SqlTableChangeMonitor.cs @@ -491,10 +491,11 @@ private async Task ReleaseLeasesAsync(SqlConnection connection, CancellationToke await this._rowsLock.WaitAsync(token); this._logger.LogDebugWithThreadId("END WaitRowsLock - ReleaseLeases"); long newLastSyncVersion = this.RecomputeLastSyncVersion(); + bool retrySucceeded = false; try { - for (int retryCount = 1; retryCount <= MaxRetryReleaseLeases; retryCount++) + for (int retryCount = 1; retryCount <= MaxRetryReleaseLeases && retrySucceeded == false; retryCount++) { var transactionSw = Stopwatch.StartNew(); long releaseLeasesDurationMs = 0L, updateLastSyncVersionDurationMs = 0L; @@ -533,13 +534,18 @@ private async Task ReleaseLeasesAsync(SqlConnection connection, CancellationToke }; TelemetryInstance.TrackEvent(TelemetryEventName.ReleaseLeasesEnd, this._telemetryProps, measures); - // Don't want to loop if no exception occurs. - break; + retrySucceeded = true; } catch (Exception ex) when (retryCount < MaxRetryReleaseLeases) { this._logger.LogError($"Failed to execute SQL commands to release leases in attempt: {retryCount} for table '{this._userTable.FullName}' due to exception: {ex.GetType()}. Exception message: {ex.Message}"); - TelemetryInstance.TrackException(TelemetryErrorName.ReleaseLeases, ex, this._telemetryProps); + + var measures = new Dictionary + { + [TelemetryMeasureName.RetryAttemptNumber] = retryCount, + }; + + TelemetryInstance.TrackException(TelemetryErrorName.ReleaseLeases, ex, this._telemetryProps, measures); try { @@ -557,7 +563,7 @@ private async Task ReleaseLeasesAsync(SqlConnection connection, CancellationToke catch (Exception e) { this._logger.LogError($"Failed to release leases for table '{this._userTable.FullName}' after {MaxRetryReleaseLeases} attempts due to exception: {e.GetType()}. Exception message: {e.Message}"); - TelemetryInstance.TrackException(TelemetryErrorName.ReleaseLeases, e, this._telemetryProps); + TelemetryInstance.TrackException(TelemetryErrorName.ReleaseLeasesNoRetriesLeft, e, this._telemetryProps); } finally { From ad18efba5e96a0153eccaa2c546f4c74f13eb64f Mon Sep 17 00:00:00 2001 From: Ameya Rele Date: Tue, 13 Sep 2022 11:27:37 +0530 Subject: [PATCH 4/4] Change loop structure according to comment --- src/TriggerBinding/SqlTableChangeMonitor.cs | 110 ++++++++++---------- 1 file changed, 54 insertions(+), 56 deletions(-) diff --git a/src/TriggerBinding/SqlTableChangeMonitor.cs b/src/TriggerBinding/SqlTableChangeMonitor.cs index 5d970b12a..f46da1865 100644 --- a/src/TriggerBinding/SqlTableChangeMonitor.cs +++ b/src/TriggerBinding/SqlTableChangeMonitor.cs @@ -493,50 +493,51 @@ private async Task ReleaseLeasesAsync(SqlConnection connection, CancellationToke long newLastSyncVersion = this.RecomputeLastSyncVersion(); bool retrySucceeded = false; - try + for (int retryCount = 1; retryCount <= MaxRetryReleaseLeases && !retrySucceeded; retryCount++) { - for (int retryCount = 1; retryCount <= MaxRetryReleaseLeases && retrySucceeded == false; retryCount++) - { - var transactionSw = Stopwatch.StartNew(); - long releaseLeasesDurationMs = 0L, updateLastSyncVersionDurationMs = 0L; + var transactionSw = Stopwatch.StartNew(); + long releaseLeasesDurationMs = 0L, updateLastSyncVersionDurationMs = 0L; - using (SqlTransaction transaction = connection.BeginTransaction(System.Data.IsolationLevel.RepeatableRead)) + using (SqlTransaction transaction = connection.BeginTransaction(System.Data.IsolationLevel.RepeatableRead)) + { + try { - try + // Release the leases held on "_rows". + using (SqlCommand releaseLeasesCommand = this.BuildReleaseLeasesCommand(connection, transaction)) { - // Release the leases held on "_rows". - using (SqlCommand releaseLeasesCommand = this.BuildReleaseLeasesCommand(connection, transaction)) - { - this._logger.LogDebugWithThreadId($"BEGIN ReleaseLeases Query={releaseLeasesCommand.CommandText}"); - var commandSw = Stopwatch.StartNew(); - await releaseLeasesCommand.ExecuteNonQueryAsync(token); - releaseLeasesDurationMs = commandSw.ElapsedMilliseconds; - this._logger.LogDebugWithThreadId($"END ReleaseLeases Duration={releaseLeasesDurationMs}ms"); - } + this._logger.LogDebugWithThreadId($"BEGIN ReleaseLeases Query={releaseLeasesCommand.CommandText}"); + var commandSw = Stopwatch.StartNew(); + await releaseLeasesCommand.ExecuteNonQueryAsync(token); + releaseLeasesDurationMs = commandSw.ElapsedMilliseconds; + this._logger.LogDebugWithThreadId($"END ReleaseLeases Duration={releaseLeasesDurationMs}ms"); + } - // Update the global state table if we have processed all changes with ChangeVersion <= newLastSyncVersion, - // and clean up the leases table to remove all rows with ChangeVersion <= newLastSyncVersion. - using (SqlCommand updateTablesPostInvocationCommand = this.BuildUpdateTablesPostInvocation(connection, transaction, newLastSyncVersion)) - { - this._logger.LogDebugWithThreadId($"BEGIN UpdateTablesPostInvocation Query={updateTablesPostInvocationCommand.CommandText}"); - var commandSw = Stopwatch.StartNew(); - await updateTablesPostInvocationCommand.ExecuteNonQueryAsync(token); - updateLastSyncVersionDurationMs = commandSw.ElapsedMilliseconds; - this._logger.LogDebugWithThreadId($"END UpdateTablesPostInvocation Duration={updateLastSyncVersionDurationMs}ms"); - } + // Update the global state table if we have processed all changes with ChangeVersion <= newLastSyncVersion, + // and clean up the leases table to remove all rows with ChangeVersion <= newLastSyncVersion. + using (SqlCommand updateTablesPostInvocationCommand = this.BuildUpdateTablesPostInvocation(connection, transaction, newLastSyncVersion)) + { + this._logger.LogDebugWithThreadId($"BEGIN UpdateTablesPostInvocation Query={updateTablesPostInvocationCommand.CommandText}"); + var commandSw = Stopwatch.StartNew(); + await updateTablesPostInvocationCommand.ExecuteNonQueryAsync(token); + updateLastSyncVersionDurationMs = commandSw.ElapsedMilliseconds; + this._logger.LogDebugWithThreadId($"END UpdateTablesPostInvocation Duration={updateLastSyncVersionDurationMs}ms"); + } - transaction.Commit(); + transaction.Commit(); - var measures = new Dictionary - { - [TelemetryMeasureName.ReleaseLeasesDurationMs] = releaseLeasesDurationMs, - [TelemetryMeasureName.UpdateLastSyncVersionDurationMs] = updateLastSyncVersionDurationMs, - }; + var measures = new Dictionary + { + [TelemetryMeasureName.ReleaseLeasesDurationMs] = releaseLeasesDurationMs, + [TelemetryMeasureName.UpdateLastSyncVersionDurationMs] = updateLastSyncVersionDurationMs, + [TelemetryMeasureName.TransactionDurationMs] = transactionSw.ElapsedMilliseconds, + }; - TelemetryInstance.TrackEvent(TelemetryEventName.ReleaseLeasesEnd, this._telemetryProps, measures); - retrySucceeded = true; - } - catch (Exception ex) when (retryCount < MaxRetryReleaseLeases) + TelemetryInstance.TrackEvent(TelemetryEventName.ReleaseLeasesEnd, this._telemetryProps, measures); + retrySucceeded = true; + } + catch (Exception ex) + { + if (retryCount < MaxRetryReleaseLeases) { this._logger.LogError($"Failed to execute SQL commands to release leases in attempt: {retryCount} for table '{this._userTable.FullName}' due to exception: {ex.GetType()}. Exception message: {ex.Message}"); @@ -546,31 +547,28 @@ private async Task ReleaseLeasesAsync(SqlConnection connection, CancellationToke }; TelemetryInstance.TrackException(TelemetryErrorName.ReleaseLeases, ex, this._telemetryProps, measures); + } + else + { + this._logger.LogError($"Failed to release leases for table '{this._userTable.FullName}' after {MaxRetryReleaseLeases} attempts due to exception: {ex.GetType()}. Exception message: {ex.Message}"); + TelemetryInstance.TrackException(TelemetryErrorName.ReleaseLeasesNoRetriesLeft, ex, this._telemetryProps); + } - try - { - transaction.Rollback(); - } - catch (Exception ex2) - { - this._logger.LogError($"Failed to rollback transaction due to exception: {ex2.GetType()}. Exception message: {ex2.Message}"); - TelemetryInstance.TrackException(TelemetryErrorName.ReleaseLeasesRollback, ex2, this._telemetryProps); - } + try + { + transaction.Rollback(); + } + catch (Exception ex2) + { + this._logger.LogError($"Failed to rollback transaction due to exception: {ex2.GetType()}. Exception message: {ex2.Message}"); + TelemetryInstance.TrackException(TelemetryErrorName.ReleaseLeasesRollback, ex2, this._telemetryProps); } } } } - catch (Exception e) - { - this._logger.LogError($"Failed to release leases for table '{this._userTable.FullName}' after {MaxRetryReleaseLeases} attempts due to exception: {e.GetType()}. Exception message: {e.Message}"); - TelemetryInstance.TrackException(TelemetryErrorName.ReleaseLeasesNoRetriesLeft, e, this._telemetryProps); - } - finally - { - // Want to do this before releasing the lock in case the renew leases thread wakes up. It will see that - // the state is checking for changes and not renew the (just released) leases. - await this.ClearRowsAsync(false); - } + // Want to do this before releasing the lock in case the renew leases thread wakes up. It will see that + // the state is checking for changes and not renew the (just released) leases. + await this.ClearRowsAsync(false); } ///