Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 39 additions & 43 deletions src/TriggerBinding/SqlTableChangeMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,18 @@ internal sealed class SqlTableChangeMonitor<T> : IDisposable
private readonly CancellationTokenSource _cancellationTokenSourceRenewLeases;
private CancellationTokenSource _cancellationTokenSourceExecutor;

// The semaphore ensures that mutable class members such as this._rows are accessed by only one thread at a time.
// The semaphore gets used by lease-renewal loop to ensure that '_state' stays set to 'ProcessingChanges' while
// the leases are being renewed. The change-consumption loop requires to wait for the semaphore before modifying
// the value of '_state' back to 'CheckingForChanges'. Since the field '_rows' is only updated if the value of
// '_state' is set to 'CheckingForChanges', this guarantees that '_rows' will stay same while it is being
// iterated over inside the lease-renewal loop.
private readonly SemaphoreSlim _rowsLock;

private readonly IDictionary<TelemetryPropertyName, string> _telemetryProps;

private IReadOnlyList<IReadOnlyDictionary<string, string>> _rows;
private int _leaseRenewalCount;
private State _state = State.CheckingForChanges;
private State _state;

/// <summary>
/// Initializes a new instance of the <see cref="SqlTableChangeMonitor{T}" />> class.
Expand Down Expand Up @@ -116,7 +120,7 @@ public SqlTableChangeMonitor(

this._telemetryProps = telemetryProps;

this._rowsLock = new SemaphoreSlim(1);
this._rowsLock = new SemaphoreSlim(1, 1);
this._rows = new List<IReadOnlyDictionary<string, string>>();
this._leaseRenewalCount = 0;
this._state = State.CheckingForChanges;
Expand Down Expand Up @@ -156,6 +160,7 @@ private async Task RunChangeConsumptionLoopAsync()
this._logger.LogDebugWithThreadId("BEGIN OpenChangeConsumptionConnection");
await connection.OpenAsync(token);
this._logger.LogDebugWithThreadId("END OpenChangeConsumptionConnection");

// Check for cancellation request only after a cycle of checking and processing of changes completes.
while (!token.IsCancellationRequested)
{
Expand Down Expand Up @@ -314,7 +319,7 @@ private async Task ProcessTableChangesAsync(SqlConnection connection, Cancellati
{
this._logger.LogError($"Failed to compose trigger parameter value for table: '{this._userTable.FullName} due to exception: {e.GetType()}. Exception message: {e.Message}");
TelemetryInstance.TrackException(TelemetryErrorName.ProcessChanges, e, this._telemetryProps);
await this.ClearRowsAsync(true);
await this.ClearRowsAsync();
}

if (changes != null)
Expand Down Expand Up @@ -346,7 +351,7 @@ private async Task ProcessTableChangesAsync(SqlConnection connection, Cancellati
this._logger.LogError($"Failed to trigger user function for table: '{this._userTable.FullName} due to exception: {result.Exception.GetType()}. Exception message: {result.Exception.Message}");
TelemetryInstance.TrackException(TelemetryErrorName.ProcessChanges, result.Exception, this._telemetryProps, measures);

await this.ClearRowsAsync(true);
await this.ClearRowsAsync();
}
}
}
Expand All @@ -370,11 +375,9 @@ private async void RunLeaseRenewalLoopAsync()
this._logger.LogDebugWithThreadId("BEGIN OpenLeaseRenewalLoopConnection");
await connection.OpenAsync(token);
this._logger.LogDebugWithThreadId("END OpenLeaseRenewalLoopConnection");

while (!token.IsCancellationRequested)
{
this._logger.LogDebugWithThreadId("BEGIN WaitRowsLock - LeaseRenewal");
await this._rowsLock.WaitAsync(token);
this._logger.LogDebugWithThreadId("END WaitRowsLock - LeaseRenewal");
Copy link
Contributor Author

@JatinSanghvi JatinSanghvi Sep 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Waiting on semaphore inside RenewLeasesAsync instead, so that we have same method both waiting on and releasing the semaphore.

await this.RenewLeasesAsync(connection, token);
await Task.Delay(TimeSpan.FromSeconds(LeaseRenewalIntervalInSeconds), token);
}
Expand All @@ -398,9 +401,13 @@ private async void RunLeaseRenewalLoopAsync()

private async Task RenewLeasesAsync(SqlConnection connection, CancellationToken token)
{
try
this._logger.LogDebugWithThreadId("BEGIN WaitRowsLock - RenewLeases");
await this._rowsLock.WaitAsync(token);
this._logger.LogDebugWithThreadId("END WaitRowsLock - RenewLeases");

if (this._state == State.ProcessingChanges)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have single a if statement instead of repetitive checks in try and finally blocks.

{
if (this._state == State.ProcessingChanges)
try
{
// I don't think I need a transaction for renewing leases. If this worker reads in a row from the
// leases table and determines that it corresponds to its batch of changes, but then that row gets
Expand All @@ -423,19 +430,16 @@ private async Task RenewLeasesAsync(SqlConnection connection, CancellationToken
TelemetryInstance.TrackEvent(TelemetryEventName.RenewLeasesEnd, this._telemetryProps, measures);
}
}
}
catch (Exception e)
{
// This catch block is necessary so that the finally block is executed even in the case of an exception
// (see https://docs.microsoft.com/dotnet/csharp/language-reference/keywords/try-finally, third
// paragraph). If we fail to renew the leases, multiple workers could be processing the same change
// data, but we have functionality in place to deal with this (see design doc).
this._logger.LogError($"Failed to renew leases due to exception: {e.GetType()}. Exception message: {e.Message}");
TelemetryInstance.TrackException(TelemetryErrorName.RenewLeases, e, this._telemetryProps);
}
finally
{
if (this._state == State.ProcessingChanges)
catch (Exception e)
{
// This catch block is necessary so that the finally block is executed even in the case of an exception
// (see https://docs.microsoft.com/dotnet/csharp/language-reference/keywords/try-finally, third
// paragraph). If we fail to renew the leases, multiple workers could be processing the same change
// data, but we have functionality in place to deal with this (see design doc).
this._logger.LogError($"Failed to renew leases due to exception: {e.GetType()}. Exception message: {e.Message}");
TelemetryInstance.TrackException(TelemetryErrorName.RenewLeases, e, this._telemetryProps);
}
finally
{
// Do we want to update this count even in the case of a failure to renew the leases? Probably,
// because the count is simply meant to indicate how much time the other thread has spent processing
Expand All @@ -455,30 +459,27 @@ private async Task RenewLeasesAsync(SqlConnection connection, CancellationToken
this._cancellationTokenSourceExecutor = new CancellationTokenSource();
}
}

// Want to always release the lock at the end, even if renewing the leases failed.
this._logger.LogDebugWithThreadId("ReleaseRowLock - RenewLeases");
this._rowsLock.Release();
}

// Want to always release the lock at the end, even if renewing the leases failed.
this._logger.LogDebugWithThreadId("ReleaseRowsLock - RenewLeases");
this._rowsLock.Release();
}

/// <summary>
/// Resets the in-memory state of the change monitor and sets it to start polling for changes again.
/// </summary>
/// <param name="acquireLock">True if ClearRowsAsync should acquire the "_rowsLock" (only true in the case of a failure)</param>
private async Task ClearRowsAsync(bool acquireLock)
private async Task ClearRowsAsync()
{
if (acquireLock)
{
this._logger.LogDebugWithThreadId("BEGIN WaitRowsLock - ClearRows");
await this._rowsLock.WaitAsync();
this._logger.LogDebugWithThreadId("END WaitRowsLock - ClearRows");
}
this._logger.LogDebugWithThreadId("BEGIN WaitRowsLock - ClearRows");
await this._rowsLock.WaitAsync();
this._logger.LogDebugWithThreadId("END WaitRowsLock - ClearRows");

this._leaseRenewalCount = 0;
this._state = State.CheckingForChanges;
this._rows = new List<IReadOnlyDictionary<string, string>>();
this._logger.LogDebugWithThreadId("ReleaseRowLock - ClearRows");

this._logger.LogDebugWithThreadId("ReleaseRowsLock - ClearRows");
this._rowsLock.Release();
}

Expand All @@ -489,10 +490,6 @@ private async Task ClearRowsAsync(bool acquireLock)
private async Task ReleaseLeasesAsync(SqlConnection connection, CancellationToken token)
{
TelemetryInstance.TrackEvent(TelemetryEventName.ReleaseLeasesStart, this._telemetryProps);
this._logger.LogDebugWithThreadId("BEGIN WaitRowsLock - ReleaseLeases");
// Don't want to change the "_rows" while another thread is attempting to renew leases on them.
await this._rowsLock.WaitAsync(token);
this._logger.LogDebugWithThreadId("END WaitRowsLock - ReleaseLeases");
long newLastSyncVersion = this.RecomputeLastSyncVersion();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to wait for semaphore here. While the _state is set to ProcessingChanges, the semaphore should be acquired if and only if either of _rows, _state or _leaseRenewalLoop needs to be modified in change-consumption loop and none of the shared class fields are getting modified. Once the _state equals ProcessingChanges, it is only ClearRowsAsync method that is in charge of modifying the rows, hence it is the only method that should be waiting on semaphore.

bool retrySucceeded = false;

Expand Down Expand Up @@ -569,9 +566,8 @@ private async Task ReleaseLeasesAsync(SqlConnection connection, CancellationToke
}
}
}
// Want to do this before releasing the lock in case the renew leases thread wakes up. It will see that
// the state is checking for changes and not renew the (just released) leases.
await this.ClearRowsAsync(false);

await this.ClearRowsAsync();
}

/// <summary>
Expand Down