diff --git a/README.md b/README.md index bcf44b59b..b7a4c5654 100644 --- a/README.md +++ b/README.md @@ -605,9 +605,9 @@ The trigger binding utilizes SQL [change tracking](https://docs.microsoft.com/sq ENABLE CHANGE_TRACKING; ``` - For more information, please refer to the documentation [here](https://docs.microsoft.com/sql/relational-databases/track-changes/enable-and-disable-change-tracking-sql-server#enable-change-tracking-for-a-table). The trigger needs to have read access on the table being monitored for changes as well as to the change tracking system tables. It also needs write access to an `az_func` schema within the database, where it will create additional worker tables to store the trigger states and leases. Each function trigger will thus have an associated change tracking table and worker table. + For more information, please refer to the documentation [here](https://docs.microsoft.com/sql/relational-databases/track-changes/enable-and-disable-change-tracking-sql-server#enable-change-tracking-for-a-table). The trigger needs to have read access on the table being monitored for changes as well as to the change tracking system tables. It also needs write access to an `az_func` schema within the database, where it will create additional leases tables to store the trigger states and leases. Each function trigger will thus have an associated change tracking table and leases table. - > **NOTE:** The worker table contains all columns corresponding to the primary key from the user table and three additional columns named `_az_func_ChangeVersion`, `_az_func_AttemptCount` and `_az_func_LeastExpirationTime`. If any of the primary key columns happen to have the same name, that will result in an error message listing any conflicts. In this case, the listed primary key columns must be renamed for the trigger to work. + > **NOTE:** The leases table contains all columns corresponding to the primary key from the user table and three additional columns named `_az_func_ChangeVersion`, `_az_func_AttemptCount` and `_az_func_LeastExpirationTime`. If any of the primary key columns happen to have the same name, that will result in an error message listing any conflicts. In this case, the listed primary key columns must be renamed for the trigger to work. #### Trigger Samples The trigger binding takes two [arguments](https://github.com/Azure/azure-functions-sql-extension/blob/main/src/TriggerBinding/SqlTriggerAttribute.cs) diff --git a/src/Telemetry/Telemetry.cs b/src/Telemetry/Telemetry.cs index 56723c835..5874551c9 100644 --- a/src/Telemetry/Telemetry.cs +++ b/src/Telemetry/Telemetry.cs @@ -359,11 +359,11 @@ public enum TelemetryPropertyName ErrorName, ExceptionType, HasIdentityColumn, + LeasesTableName, QueryType, ServerVersion, Type, UserFunctionId, - WorkerTableName, } /// @@ -376,7 +376,7 @@ public enum TelemetryMeasureName CommandDurationMs, CreatedSchemaDurationMs, CreateGlobalStateTableDurationMs, - CreateWorkerTableDurationMs, + CreateLeasesTableDurationMs, DurationMs, GetCaseSensitivityDurationMs, GetChangesDurationMs, diff --git a/src/TriggerBinding/SqlTableChangeMonitor.cs b/src/TriggerBinding/SqlTableChangeMonitor.cs index a9d36d7c8..2f64b85be 100644 --- a/src/TriggerBinding/SqlTableChangeMonitor.cs +++ b/src/TriggerBinding/SqlTableChangeMonitor.cs @@ -40,7 +40,7 @@ internal sealed class SqlTableChangeMonitor : IDisposable private readonly int _userTableId; private readonly SqlObject _userTable; private readonly string _userFunctionId; - private readonly string _workerTableName; + private readonly string _leasesTableName; private readonly IReadOnlyList _userTableColumns; private readonly IReadOnlyList _primaryKeyColumns; private readonly IReadOnlyList _rowMatchConditions; @@ -67,7 +67,7 @@ internal sealed class SqlTableChangeMonitor : IDisposable /// SQL object ID of the user table /// instance created with user table name /// Unique identifier for the user function - /// Name of the worker table + /// Name of the leases table /// List of all column names in the user table /// List of primary key column names in the user table /// Defines contract for triggering user function @@ -78,7 +78,7 @@ public SqlTableChangeMonitor( int userTableId, SqlObject userTable, string userFunctionId, - string workerTableName, + string leasesTableName, IReadOnlyList userTableColumns, IReadOnlyList primaryKeyColumns, ITriggeredFunctionExecutor executor, @@ -88,7 +88,7 @@ public SqlTableChangeMonitor( _ = !string.IsNullOrEmpty(connectionString) ? true : throw new ArgumentNullException(nameof(connectionString)); _ = !string.IsNullOrEmpty(userTable.FullName) ? true : throw new ArgumentNullException(nameof(userTable)); _ = !string.IsNullOrEmpty(userFunctionId) ? true : throw new ArgumentNullException(nameof(userFunctionId)); - _ = !string.IsNullOrEmpty(workerTableName) ? true : throw new ArgumentNullException(nameof(workerTableName)); + _ = !string.IsNullOrEmpty(leasesTableName) ? true : throw new ArgumentNullException(nameof(leasesTableName)); _ = userTableColumns ?? throw new ArgumentNullException(nameof(userTableColumns)); _ = primaryKeyColumns ?? throw new ArgumentNullException(nameof(primaryKeyColumns)); _ = executor ?? throw new ArgumentNullException(nameof(executor)); @@ -98,7 +98,7 @@ public SqlTableChangeMonitor( this._userTableId = userTableId; this._userTable = userTable; this._userFunctionId = userFunctionId; - this._workerTableName = workerTableName; + this._leasesTableName = leasesTableName; this._userTableColumns = userTableColumns; this._primaryKeyColumns = primaryKeyColumns; @@ -137,7 +137,7 @@ public void Dispose() /// /// Executed once every period. If the state of the change monitor is - /// , then the method query the change/worker tables for changes on the + /// , then the method query the change/leases tables for changes on the /// user's table. If any are found, the state of the change monitor is transitioned to /// and the user's function is executed with the found changes. If the /// execution is successful, the leases on "_rows" are released and the state transitions to @@ -189,7 +189,7 @@ private async Task RunChangeConsumptionLoopAsync() } /// - /// Queries the change/worker tables to check for new changes on the user's table. If any are found, stores the + /// Queries the change/leases tables to check for new changes on the user's table. If any are found, stores the /// change along with the corresponding data from the user table in "_rows". /// private async Task GetTableChangesAsync(SqlConnection connection, CancellationToken token) @@ -386,7 +386,7 @@ private async Task RenewLeasesAsync(SqlConnection connection, CancellationToken if (this._state == State.ProcessingChanges) { // I don't think I need a transaction for renewing leases. If this worker reads in a row from the - // worker table and determines that it corresponds to its batch of changes, but then that row gets + // leases table and determines that it corresponds to its batch of changes, but then that row gets // deleted by a cleanup task, it shouldn't renew the lease on it anyways. using (SqlCommand renewLeasesCommand = this.BuildRenewLeasesCommand(connection)) { @@ -488,7 +488,7 @@ private async Task ReleaseLeasesAsync(SqlConnection connection, CancellationToke } // Update the global state table if we have processed all changes with ChangeVersion <= newLastSyncVersion, - // and clean up the worker table to remove all rows with ChangeVersion <= newLastSyncVersion. + // and clean up the leases table to remove all rows with ChangeVersion <= newLastSyncVersion. using (SqlCommand updateTablesPostInvocationCommand = this.BuildUpdateTablesPostInvocation(connection, transaction, newLastSyncVersion)) { var commandSw = Stopwatch.StartNew(); @@ -584,7 +584,7 @@ private IReadOnlyList> GetChanges() /// /// Gets the change associated with this row (either an insert, update or delete). /// - /// The (combined) row from the change table and worker table + /// The (combined) row from the change table and leases table /// Thrown if the value of the "SYS_CHANGE_OPERATION" column is none of "I", "U", or "D" /// SqlChangeOperation.Insert for an insert, SqlChangeOperation.Update for an update, and SqlChangeOperation.Delete for a delete private static SqlChangeOperation GetChangeOperation(IReadOnlyDictionary row) @@ -636,7 +636,7 @@ private SqlCommand BuildGetChangesCommand(SqlConnection connection, SqlTransacti { string selectList = string.Join(", ", this._userTableColumns.Select(col => this._primaryKeyColumns.Contains(col) ? $"c.{col.AsBracketQuotedString()}" : $"u.{col.AsBracketQuotedString()}")); string userTableJoinCondition = string.Join(" AND ", this._primaryKeyColumns.Select(col => $"c.{col.AsBracketQuotedString()} = u.{col.AsBracketQuotedString()}")); - string workerTableJoinCondition = string.Join(" AND ", this._primaryKeyColumns.Select(col => $"c.{col.AsBracketQuotedString()} = w.{col.AsBracketQuotedString()}")); + string leasesTableJoinCondition = string.Join(" AND ", this._primaryKeyColumns.Select(col => $"c.{col.AsBracketQuotedString()} = l.{col.AsBracketQuotedString()}")); string getChangesQuery = $@" DECLARE @last_sync_version bigint; @@ -647,15 +647,15 @@ private SqlCommand BuildGetChangesCommand(SqlConnection connection, SqlTransacti SELECT TOP {BatchSize} {selectList}, c.SYS_CHANGE_VERSION, c.SYS_CHANGE_OPERATION, - w.{SqlTriggerConstants.WorkerTableChangeVersionColumnName}, w.{SqlTriggerConstants.WorkerTableAttemptCountColumnName}, w.{SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName} + l.{SqlTriggerConstants.LeasesTableChangeVersionColumnName}, l.{SqlTriggerConstants.LeasesTableAttemptCountColumnName}, l.{SqlTriggerConstants.LeasesTableLeaseExpirationTimeColumnName} FROM CHANGETABLE(CHANGES {this._userTable.BracketQuotedFullName}, @last_sync_version) AS c - LEFT OUTER JOIN {this._workerTableName} AS w WITH (TABLOCKX) ON {workerTableJoinCondition} + LEFT OUTER JOIN {this._leasesTableName} AS l WITH (TABLOCKX) ON {leasesTableJoinCondition} LEFT OUTER JOIN {this._userTable.BracketQuotedFullName} AS u ON {userTableJoinCondition} WHERE - (w.{SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName} IS NULL AND - (w.{SqlTriggerConstants.WorkerTableChangeVersionColumnName} IS NULL OR w.{SqlTriggerConstants.WorkerTableChangeVersionColumnName} < c.SYS_CHANGE_VERSION) OR - w.{SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName} < SYSDATETIME()) AND - (w.{SqlTriggerConstants.WorkerTableAttemptCountColumnName} IS NULL OR w.{SqlTriggerConstants.WorkerTableAttemptCountColumnName} < {MaxAttemptCount}) + (l.{SqlTriggerConstants.LeasesTableLeaseExpirationTimeColumnName} IS NULL AND + (l.{SqlTriggerConstants.LeasesTableChangeVersionColumnName} IS NULL OR l.{SqlTriggerConstants.LeasesTableChangeVersionColumnName} < c.SYS_CHANGE_VERSION) OR + l.{SqlTriggerConstants.LeasesTableLeaseExpirationTimeColumnName} < SYSDATETIME()) AND + (l.{SqlTriggerConstants.LeasesTableAttemptCountColumnName} IS NULL OR l.{SqlTriggerConstants.LeasesTableAttemptCountColumnName} < {MaxAttemptCount}) ORDER BY c.SYS_CHANGE_VERSION ASC; "; @@ -679,15 +679,15 @@ private SqlCommand BuildAcquireLeasesCommand(SqlConnection connection, SqlTransa string changeVersion = this._rows[rowIndex]["SYS_CHANGE_VERSION"]; acquireLeasesQuery.Append($@" - IF NOT EXISTS (SELECT * FROM {this._workerTableName} WITH (TABLOCKX) WHERE {this._rowMatchConditions[rowIndex]}) - INSERT INTO {this._workerTableName} WITH (TABLOCKX) + IF NOT EXISTS (SELECT * FROM {this._leasesTableName} WITH (TABLOCKX) WHERE {this._rowMatchConditions[rowIndex]}) + INSERT INTO {this._leasesTableName} WITH (TABLOCKX) VALUES ({valuesList}, {changeVersion}, 1, DATEADD(second, {LeaseIntervalInSeconds}, SYSDATETIME())); ELSE - UPDATE {this._workerTableName} WITH (TABLOCKX) + UPDATE {this._leasesTableName} WITH (TABLOCKX) SET - {SqlTriggerConstants.WorkerTableChangeVersionColumnName} = {changeVersion}, - {SqlTriggerConstants.WorkerTableAttemptCountColumnName} = {SqlTriggerConstants.WorkerTableAttemptCountColumnName} + 1, - {SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName} = DATEADD(second, {LeaseIntervalInSeconds}, SYSDATETIME()) + {SqlTriggerConstants.LeasesTableChangeVersionColumnName} = {changeVersion}, + {SqlTriggerConstants.LeasesTableAttemptCountColumnName} = {SqlTriggerConstants.LeasesTableAttemptCountColumnName} + 1, + {SqlTriggerConstants.LeasesTableLeaseExpirationTimeColumnName} = DATEADD(second, {LeaseIntervalInSeconds}, SYSDATETIME()) WHERE {this._rowMatchConditions[rowIndex]}; "); } @@ -705,8 +705,8 @@ private SqlCommand BuildRenewLeasesCommand(SqlConnection connection) string matchCondition = string.Join(" OR ", this._rowMatchConditions.Take(this._rows.Count)); string renewLeasesQuery = $@" - UPDATE {this._workerTableName} WITH (TABLOCKX) - SET {SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName} = DATEADD(second, {LeaseIntervalInSeconds}, SYSDATETIME()) + UPDATE {this._leasesTableName} WITH (TABLOCKX) + SET {SqlTriggerConstants.LeasesTableLeaseExpirationTimeColumnName} = DATEADD(second, {LeaseIntervalInSeconds}, SYSDATETIME()) WHERE {matchCondition}; "; @@ -729,16 +729,16 @@ private SqlCommand BuildReleaseLeasesCommand(SqlConnection connection, SqlTransa string changeVersion = this._rows[rowIndex]["SYS_CHANGE_VERSION"]; releaseLeasesQuery.Append($@" - SELECT @current_change_version = {SqlTriggerConstants.WorkerTableChangeVersionColumnName} - FROM {this._workerTableName} WITH (TABLOCKX) + SELECT @current_change_version = {SqlTriggerConstants.LeasesTableChangeVersionColumnName} + FROM {this._leasesTableName} WITH (TABLOCKX) WHERE {this._rowMatchConditions[rowIndex]}; IF @current_change_version <= {changeVersion} - UPDATE {this._workerTableName} WITH (TABLOCKX) + UPDATE {this._leasesTableName} WITH (TABLOCKX) SET - {SqlTriggerConstants.WorkerTableChangeVersionColumnName} = {changeVersion}, - {SqlTriggerConstants.WorkerTableAttemptCountColumnName} = 0, - {SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName} = NULL + {SqlTriggerConstants.LeasesTableChangeVersionColumnName} = {changeVersion}, + {SqlTriggerConstants.LeasesTableAttemptCountColumnName} = 0, + {SqlTriggerConstants.LeasesTableLeaseExpirationTimeColumnName} = NULL WHERE {this._rowMatchConditions[rowIndex]}; "); } @@ -748,7 +748,7 @@ private SqlCommand BuildReleaseLeasesCommand(SqlConnection connection, SqlTransa /// /// Builds the command to update the global version number in _globalStateTable after successful invocation of - /// the user's function. If the global version number is updated, also cleans the worker table and removes all + /// the user's function. If the global version number is updated, also cleans the leases table and removes all /// rows for which ChangeVersion <= newLastSyncVersion. /// /// The connection to add to the returned SqlCommand @@ -757,7 +757,7 @@ private SqlCommand BuildReleaseLeasesCommand(SqlConnection connection, SqlTransa /// The SqlCommand populated with the query and appropriate parameters private SqlCommand BuildUpdateTablesPostInvocation(SqlConnection connection, SqlTransaction transaction, long newLastSyncVersion) { - string workerTableJoinCondition = string.Join(" AND ", this._primaryKeyColumns.Select(col => $"c.{col.AsBracketQuotedString()} = w.{col.AsBracketQuotedString()}")); + string leasesTableJoinCondition = string.Join(" AND ", this._primaryKeyColumns.Select(col => $"c.{col.AsBracketQuotedString()} = l.{col.AsBracketQuotedString()}")); string updateTablesPostInvocationQuery = $@" DECLARE @current_last_sync_version bigint; @@ -769,13 +769,13 @@ private SqlCommand BuildUpdateTablesPostInvocation(SqlConnection connection, Sql SELECT @unprocessed_changes = COUNT(*) FROM ( SELECT c.SYS_CHANGE_VERSION FROM CHANGETABLE(CHANGES {this._userTable.BracketQuotedFullName}, @current_last_sync_version) AS c - LEFT OUTER JOIN {this._workerTableName} AS w WITH (TABLOCKX) ON {workerTableJoinCondition} + LEFT OUTER JOIN {this._leasesTableName} AS l WITH (TABLOCKX) ON {leasesTableJoinCondition} WHERE c.SYS_CHANGE_VERSION <= {newLastSyncVersion} AND - ((w.{SqlTriggerConstants.WorkerTableChangeVersionColumnName} IS NULL OR - w.{SqlTriggerConstants.WorkerTableChangeVersionColumnName} != c.SYS_CHANGE_VERSION OR - w.{SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName} IS NOT NULL) AND - (w.{SqlTriggerConstants.WorkerTableAttemptCountColumnName} IS NULL OR w.{SqlTriggerConstants.WorkerTableAttemptCountColumnName} < {MaxAttemptCount}))) AS Changes + ((l.{SqlTriggerConstants.LeasesTableChangeVersionColumnName} IS NULL OR + l.{SqlTriggerConstants.LeasesTableChangeVersionColumnName} != c.SYS_CHANGE_VERSION OR + l.{SqlTriggerConstants.LeasesTableLeaseExpirationTimeColumnName} IS NOT NULL) AND + (l.{SqlTriggerConstants.LeasesTableAttemptCountColumnName} IS NULL OR l.{SqlTriggerConstants.LeasesTableAttemptCountColumnName} < {MaxAttemptCount}))) AS Changes IF @unprocessed_changes = 0 AND @current_last_sync_version < {newLastSyncVersion} BEGIN @@ -783,7 +783,7 @@ FROM CHANGETABLE(CHANGES {this._userTable.BracketQuotedFullName}, @current_last_ SET LastSyncVersion = {newLastSyncVersion} WHERE UserFunctionID = '{this._userFunctionId}' AND UserTableID = {this._userTableId}; - DELETE FROM {this._workerTableName} WITH (TABLOCKX) WHERE {SqlTriggerConstants.WorkerTableChangeVersionColumnName} <= {newLastSyncVersion}; + DELETE FROM {this._leasesTableName} WITH (TABLOCKX) WHERE {SqlTriggerConstants.LeasesTableChangeVersionColumnName} <= {newLastSyncVersion}; END "; diff --git a/src/TriggerBinding/SqlTriggerConstants.cs b/src/TriggerBinding/SqlTriggerConstants.cs index 3b8b828e1..2f37d866f 100644 --- a/src/TriggerBinding/SqlTriggerConstants.cs +++ b/src/TriggerBinding/SqlTriggerConstants.cs @@ -9,10 +9,10 @@ internal static class SqlTriggerConstants public const string GlobalStateTableName = "[" + SchemaName + "].[GlobalState]"; - public const string WorkerTableNameFormat = "[" + SchemaName + "].[Worker_{0}]"; + public const string LeasesTableNameFormat = "[" + SchemaName + "].[Leases_{0}]"; - public const string WorkerTableChangeVersionColumnName = "_az_func_ChangeVersion"; - public const string WorkerTableAttemptCountColumnName = "_az_func_AttemptCount"; - public const string WorkerTableLeaseExpirationTimeColumnName = "_az_func_LeaseExpirationTime"; + public const string LeasesTableChangeVersionColumnName = "_az_func_ChangeVersion"; + public const string LeasesTableAttemptCountColumnName = "_az_func_AttemptCount"; + public const string LeasesTableLeaseExpirationTimeColumnName = "_az_func_LeaseExpirationTime"; } } \ No newline at end of file diff --git a/src/TriggerBinding/SqlTriggerListener.cs b/src/TriggerBinding/SqlTriggerListener.cs index 7c5cd4169..789ed05e9 100644 --- a/src/TriggerBinding/SqlTriggerListener.cs +++ b/src/TriggerBinding/SqlTriggerListener.cs @@ -103,19 +103,19 @@ public async Task StartAsync(CancellationToken cancellationToken) IReadOnlyList<(string name, string type)> primaryKeyColumns = await this.GetPrimaryKeyColumnsAsync(connection, userTableId, cancellationToken); IReadOnlyList userTableColumns = await this.GetUserTableColumnsAsync(connection, userTableId, cancellationToken); - string workerTableName = string.Format(CultureInfo.InvariantCulture, SqlTriggerConstants.WorkerTableNameFormat, $"{this._userFunctionId}_{userTableId}"); - this._logger.LogDebug($"Worker table name: '{workerTableName}'."); - this._telemetryProps[TelemetryPropertyName.WorkerTableName] = workerTableName; + string leasesTableName = string.Format(CultureInfo.InvariantCulture, SqlTriggerConstants.LeasesTableNameFormat, $"{this._userFunctionId}_{userTableId}"); + this._logger.LogDebug($"leases table name: '{leasesTableName}'."); + this._telemetryProps[TelemetryPropertyName.LeasesTableName] = leasesTableName; var transactionSw = Stopwatch.StartNew(); - long createdSchemaDurationMs = 0L, createGlobalStateTableDurationMs = 0L, insertGlobalStateTableRowDurationMs = 0L, createWorkerTableDurationMs = 0L; + long createdSchemaDurationMs = 0L, createGlobalStateTableDurationMs = 0L, insertGlobalStateTableRowDurationMs = 0L, createLeasesTableDurationMs = 0L; using (SqlTransaction transaction = connection.BeginTransaction(System.Data.IsolationLevel.RepeatableRead)) { createdSchemaDurationMs = await CreateSchemaAsync(connection, transaction, cancellationToken); createGlobalStateTableDurationMs = await CreateGlobalStateTableAsync(connection, transaction, cancellationToken); insertGlobalStateTableRowDurationMs = await this.InsertGlobalStateTableRowAsync(connection, transaction, userTableId, cancellationToken); - createWorkerTableDurationMs = await CreateWorkerTableAsync(connection, transaction, workerTableName, primaryKeyColumns, cancellationToken); + createLeasesTableDurationMs = await CreateLeasesTableAsync(connection, transaction, leasesTableName, primaryKeyColumns, cancellationToken); transaction.Commit(); } @@ -127,7 +127,7 @@ public async Task StartAsync(CancellationToken cancellationToken) userTableId, this._userTable, this._userFunctionId, - workerTableName, + leasesTableName, userTableColumns, primaryKeyColumns.Select(col => col.name).ToList(), this._executor, @@ -142,7 +142,7 @@ public async Task StartAsync(CancellationToken cancellationToken) [TelemetryMeasureName.CreatedSchemaDurationMs] = createdSchemaDurationMs, [TelemetryMeasureName.CreateGlobalStateTableDurationMs] = createGlobalStateTableDurationMs, [TelemetryMeasureName.InsertGlobalStateTableRowDurationMs] = insertGlobalStateTableRowDurationMs, - [TelemetryMeasureName.CreateWorkerTableDurationMs] = createWorkerTableDurationMs, + [TelemetryMeasureName.CreateLeasesTableDurationMs] = createLeasesTableDurationMs, [TelemetryMeasureName.TransactionDurationMs] = transactionSw.ElapsedMilliseconds, }; @@ -213,7 +213,7 @@ private async Task GetUserTableIdAsync(SqlConnection connection, Cancellati /// Gets the names and types of primary key columns of the user table. /// /// - /// Thrown if there are no primary key columns present in the user table or if their names conflict with columns in worker table. + /// Thrown if there are no primary key columns present in the user table or if their names conflict with columns in leases table. /// private async Task> GetPrimaryKeyColumnsAsync(SqlConnection connection, int userTableId, CancellationToken cancellationToken) { @@ -263,9 +263,9 @@ FROM sys.indexes AS i string[] reservedColumnNames = new[] { - SqlTriggerConstants.WorkerTableChangeVersionColumnName, - SqlTriggerConstants.WorkerTableAttemptCountColumnName, - SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName + SqlTriggerConstants.LeasesTableChangeVersionColumnName, + SqlTriggerConstants.LeasesTableAttemptCountColumnName, + SqlTriggerConstants.LeasesTableLeaseExpirationTimeColumnName }; var conflictingColumnNames = primaryKeyColumns.Select(col => col.name).Intersect(reservedColumnNames).ToList(); @@ -326,7 +326,7 @@ FROM sys.columns AS c } /// - /// Creates the schema for global state table and worker tables, if it does not already exist. + /// Creates the schema for global state table and leases tables, if it does not already exist. /// private static async Task CreateSchemaAsync(SqlConnection connection, SqlTransaction transaction, CancellationToken cancellationToken) { @@ -409,33 +409,33 @@ INSERT INTO {SqlTriggerConstants.GlobalStateTableName} } /// - /// Creates the worker table for the 'user function and table', if one does not already exist. + /// Creates the leases table for the 'user function and table', if one does not already exist. /// - private static async Task CreateWorkerTableAsync( + private static async Task CreateLeasesTableAsync( SqlConnection connection, SqlTransaction transaction, - string workerTableName, + string leasesTableName, IReadOnlyList<(string name, string type)> primaryKeyColumns, CancellationToken cancellationToken) { string primaryKeysWithTypes = string.Join(", ", primaryKeyColumns.Select(col => $"{col.name.AsBracketQuotedString()} {col.type}")); string primaryKeys = string.Join(", ", primaryKeyColumns.Select(col => col.name.AsBracketQuotedString())); - string createWorkerTableQuery = $@" - IF OBJECT_ID(N'{workerTableName}', 'U') IS NULL - CREATE TABLE {workerTableName} ( + string createLeasesTableQuery = $@" + IF OBJECT_ID(N'{leasesTableName}', 'U') IS NULL + CREATE TABLE {leasesTableName} ( {primaryKeysWithTypes}, - {SqlTriggerConstants.WorkerTableChangeVersionColumnName} bigint NOT NULL, - {SqlTriggerConstants.WorkerTableAttemptCountColumnName} int NOT NULL, - {SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName} datetime2, + {SqlTriggerConstants.LeasesTableChangeVersionColumnName} bigint NOT NULL, + {SqlTriggerConstants.LeasesTableAttemptCountColumnName} int NOT NULL, + {SqlTriggerConstants.LeasesTableLeaseExpirationTimeColumnName} datetime2, PRIMARY KEY ({primaryKeys}) ); "; - using (var createWorkerTableCommand = new SqlCommand(createWorkerTableQuery, connection, transaction)) + using (var createLeasesTableCommand = new SqlCommand(createLeasesTableQuery, connection, transaction)) { var stopwatch = Stopwatch.StartNew(); - await createWorkerTableCommand.ExecuteNonQueryAsync(cancellationToken); + await createLeasesTableCommand.ExecuteNonQueryAsync(cancellationToken); return stopwatch.ElapsedMilliseconds; } } diff --git a/test/Integration/SqlTriggerBindingIntegrationTests.cs b/test/Integration/SqlTriggerBindingIntegrationTests.cs index 758ea303f..c92ab7a83 100644 --- a/test/Integration/SqlTriggerBindingIntegrationTests.cs +++ b/test/Integration/SqlTriggerBindingIntegrationTests.cs @@ -123,7 +123,7 @@ public void PrimaryKeyNotCreatedTriggerTest() /// /// Tests the error message when the user table contains one or more primary keys with names conflicting with - /// column names in the worker table. + /// column names in the leases table. /// [Fact] public void ReservedPrimaryKeyColumnNamesTriggerTest() diff --git a/test/Integration/test-csharp/ReservedPrimaryKeyColumnNamesTrigger.cs b/test/Integration/test-csharp/ReservedPrimaryKeyColumnNamesTrigger.cs index 1389c279f..98f67662c 100644 --- a/test/Integration/test-csharp/ReservedPrimaryKeyColumnNamesTrigger.cs +++ b/test/Integration/test-csharp/ReservedPrimaryKeyColumnNamesTrigger.cs @@ -11,7 +11,7 @@ public static class ReservedPrimaryKeyColumnNamesTrigger { /// /// Used in verification of the error message when the user table contains one or more primary keys with names - /// conflicting with column names in the worker table. + /// conflicting with column names in the leases table. /// [FunctionName(nameof(ReservedPrimaryKeyColumnNamesTrigger))] public static void Run(