Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ dotnet_analyzer_diagnostic.severity = error
# Namespace does not match folder structure - Ideally this should be enabled but it seems to have issues with root level files so disabling for now
dotnet_diagnostic.IDE0130.severity = none

# CA1805: Do not initialize unnecessarily - It's better to be explicit when initializing vars to ensure correct value is used
dotnet_diagnostic.CA1805.severity = none

# Documentation related errors, remove once they are fixed
dotnet_diagnostic.CS1591.severity = none
dotnet_diagnostic.CS1573.severity = none
Expand Down
52 changes: 17 additions & 35 deletions src/TriggerBinding/SqlTableChangeMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,22 @@ internal sealed class SqlTableChangeMonitor<T> : IDisposable
private readonly ITriggeredFunctionExecutor _executor;
private readonly ILogger _logger;

private readonly CancellationTokenSource _cancellationTokenSourceCheckForChanges;
private readonly CancellationTokenSource _cancellationTokenSourceRenewLeases;
private CancellationTokenSource _cancellationTokenSourceExecutor;
private readonly CancellationTokenSource _cancellationTokenSourceCheckForChanges = new CancellationTokenSource();
private readonly CancellationTokenSource _cancellationTokenSourceRenewLeases = new CancellationTokenSource();
private CancellationTokenSource _cancellationTokenSourceExecutor = new CancellationTokenSource();

// The semaphore gets used by lease-renewal loop to ensure that '_state' stays set to 'ProcessingChanges' while
// the leases are being renewed. The change-consumption loop requires to wait for the semaphore before modifying
// the value of '_state' back to 'CheckingForChanges'. Since the field '_rows' is only updated if the value of
// '_state' is set to 'CheckingForChanges', this guarantees that '_rows' will stay same while it is being
// iterated over inside the lease-renewal loop.
private readonly SemaphoreSlim _rowsLock;
private readonly SemaphoreSlim _rowsLock = new SemaphoreSlim(1, 1);

private readonly IDictionary<TelemetryPropertyName, string> _telemetryProps;

private IReadOnlyList<IReadOnlyDictionary<string, object>> _rows;
private int _leaseRenewalCount;
private State _state;
private IReadOnlyList<IReadOnlyDictionary<string, object>> _rows = new List<IReadOnlyDictionary<string, object>>();
private int _leaseRenewalCount = 0;
private State _state = State.CheckingForChanges;

/// <summary>
/// Initializes a new instance of the <see cref="SqlTableChangeMonitor{T}" />> class.
Expand All @@ -89,41 +89,23 @@ public SqlTableChangeMonitor(
ILogger logger,
IDictionary<TelemetryPropertyName, string> telemetryProps)
{
_ = !string.IsNullOrEmpty(connectionString) ? true : throw new ArgumentNullException(nameof(connectionString));
_ = !string.IsNullOrEmpty(userTable.FullName) ? true : throw new ArgumentNullException(nameof(userTable));
_ = !string.IsNullOrEmpty(userFunctionId) ? true : throw new ArgumentNullException(nameof(userFunctionId));
_ = !string.IsNullOrEmpty(leasesTableName) ? true : throw new ArgumentNullException(nameof(leasesTableName));
_ = userTableColumns ?? throw new ArgumentNullException(nameof(userTableColumns));
_ = primaryKeyColumns ?? throw new ArgumentNullException(nameof(primaryKeyColumns));
_ = executor ?? throw new ArgumentNullException(nameof(executor));
_ = logger ?? throw new ArgumentNullException(nameof(logger));

this._connectionString = connectionString;
this._connectionString = !string.IsNullOrEmpty(connectionString) ? connectionString : throw new ArgumentNullException(nameof(connectionString));
this._userTable = !string.IsNullOrEmpty(userTable?.FullName) ? userTable : throw new ArgumentNullException(nameof(userTable));
this._userFunctionId = !string.IsNullOrEmpty(userFunctionId) ? userFunctionId : throw new ArgumentNullException(nameof(userFunctionId));
this._leasesTableName = !string.IsNullOrEmpty(leasesTableName) ? leasesTableName : throw new ArgumentNullException(nameof(leasesTableName));
this._userTableColumns = userTableColumns ?? throw new ArgumentNullException(nameof(userTableColumns));
this._primaryKeyColumns = primaryKeyColumns ?? throw new ArgumentNullException(nameof(primaryKeyColumns));
this._executor = executor ?? throw new ArgumentNullException(nameof(executor));
this._logger = logger ?? throw new ArgumentNullException(nameof(logger));

this._userTableId = userTableId;
this._userTable = userTable;
this._userFunctionId = userFunctionId;
this._leasesTableName = leasesTableName;
this._userTableColumns = userTableColumns;
this._primaryKeyColumns = primaryKeyColumns;

// Prep search-conditions that will be used besides WHERE clause to match table rows.
this._rowMatchConditions = Enumerable.Range(0, BatchSize)
.Select(rowIndex => string.Join(" AND ", this._primaryKeyColumns.Select((col, colIndex) => $"{col.AsBracketQuotedString()} = @{rowIndex}_{colIndex}")))
.ToList();

this._executor = executor;
this._logger = logger;

this._cancellationTokenSourceCheckForChanges = new CancellationTokenSource();
this._cancellationTokenSourceRenewLeases = new CancellationTokenSource();
this._cancellationTokenSourceExecutor = new CancellationTokenSource();

this._telemetryProps = telemetryProps;

this._rowsLock = new SemaphoreSlim(1, 1);
this._rows = new List<IReadOnlyDictionary<string, object>>();
this._leaseRenewalCount = 0;
this._state = State.CheckingForChanges;
this._telemetryProps = telemetryProps ?? new Dictionary<TelemetryPropertyName, string>();

#pragma warning disable CS4014 // Queue the below tasks and exit. Do not wait for their completion.
_ = Task.Run(() =>
Expand Down
3 changes: 1 addition & 2 deletions src/TriggerBinding/SqlTriggerBindingProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ public SqlTriggerBindingProvider(IConfiguration configuration, IHostIdProvider h
this._configuration = configuration ?? throw new ArgumentNullException(nameof(configuration));
this._hostIdProvider = hostIdProvider ?? throw new ArgumentNullException(nameof(hostIdProvider));

_ = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
this._logger = loggerFactory.CreateLogger(LogCategories.CreateTriggerCategory("Sql"));
this._logger = loggerFactory?.CreateLogger(LogCategories.CreateTriggerCategory("Sql")) ?? throw new ArgumentNullException(nameof(loggerFactory));
}

/// <summary>
Expand Down
19 changes: 6 additions & 13 deletions src/TriggerBinding/SqlTriggerListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ internal sealed class SqlTriggerListener<T> : IListener
private readonly IDictionary<TelemetryPropertyName, string> _telemetryProps = new Dictionary<TelemetryPropertyName, string>();

private SqlTableChangeMonitor<T> _changeMonitor;
private int _listenerState;
private int _listenerState = ListenerNotStarted;

/// <summary>
/// Initializes a new instance of the <see cref="SqlTriggerListener{T}"/> class.
Expand All @@ -50,18 +50,11 @@ internal sealed class SqlTriggerListener<T> : IListener
/// <param name="logger">Facilitates logging of messages</param>
public SqlTriggerListener(string connectionString, string tableName, string userFunctionId, ITriggeredFunctionExecutor executor, ILogger logger)
{
_ = !string.IsNullOrEmpty(connectionString) ? true : throw new ArgumentNullException(nameof(connectionString));
_ = !string.IsNullOrEmpty(tableName) ? true : throw new ArgumentNullException(nameof(tableName));
_ = !string.IsNullOrEmpty(userFunctionId) ? true : throw new ArgumentNullException(nameof(userFunctionId));
_ = executor ?? throw new ArgumentNullException(nameof(executor));
_ = logger ?? throw new ArgumentNullException(nameof(logger));

this._connectionString = connectionString;
this._userTable = new SqlObject(tableName);
this._userFunctionId = userFunctionId;
this._executor = executor;
this._logger = logger;
this._listenerState = ListenerNotStarted;
this._connectionString = !string.IsNullOrEmpty(connectionString) ? connectionString : throw new ArgumentNullException(nameof(connectionString));
this._userTable = !string.IsNullOrEmpty(tableName) ? new SqlObject(tableName) : throw new ArgumentNullException(nameof(tableName));
this._userFunctionId = !string.IsNullOrEmpty(userFunctionId) ? userFunctionId : throw new ArgumentNullException(nameof(userFunctionId));
this._executor = executor ?? throw new ArgumentNullException(nameof(executor));
this._logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

public void Cancel()
Expand Down