diff --git a/README.md b/README.md index 4c57915bf..bcf44b59b 100644 --- a/README.md +++ b/README.md @@ -607,7 +607,7 @@ The trigger binding utilizes SQL [change tracking](https://docs.microsoft.com/sq For more information, please refer to the documentation [here](https://docs.microsoft.com/sql/relational-databases/track-changes/enable-and-disable-change-tracking-sql-server#enable-change-tracking-for-a-table). The trigger needs to have read access on the table being monitored for changes as well as to the change tracking system tables. It also needs write access to an `az_func` schema within the database, where it will create additional worker tables to store the trigger states and leases. Each function trigger will thus have an associated change tracking table and worker table. - > **NOTE:** The worker table contains all columns corresponding to the primary key from the user table and three additional columns named `ChangeVersion`, `AttemptCount` and `LeastExpirationTime`. If any of the primary key columns happen to have the same name, that will result in an error message listing any conflicts. In this case, the listed primary key columns must be renamed for the trigger to work. + > **NOTE:** The worker table contains all columns corresponding to the primary key from the user table and three additional columns named `_az_func_ChangeVersion`, `_az_func_AttemptCount` and `_az_func_LeastExpirationTime`. If any of the primary key columns happen to have the same name, that will result in an error message listing any conflicts. In this case, the listed primary key columns must be renamed for the trigger to work. #### Trigger Samples The trigger binding takes two [arguments](https://github.com/Azure/azure-functions-sql-extension/blob/main/src/TriggerBinding/SqlTriggerAttribute.cs) diff --git a/src/TriggerBinding/SqlTableChangeMonitor.cs b/src/TriggerBinding/SqlTableChangeMonitor.cs index 3ec7120e0..1460753cd 100644 --- a/src/TriggerBinding/SqlTableChangeMonitor.cs +++ b/src/TriggerBinding/SqlTableChangeMonitor.cs @@ -657,14 +657,15 @@ private SqlCommand BuildGetChangesCommand(SqlConnection connection, SqlTransacti SELECT TOP {BatchSize} {selectList}, c.SYS_CHANGE_VERSION, c.SYS_CHANGE_OPERATION, - w.ChangeVersion, w.AttemptCount, w.LeaseExpirationTime + w.{SqlTriggerConstants.WorkerTableChangeVersionColumnName}, w.{SqlTriggerConstants.WorkerTableAttemptCountColumnName}, w.{SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName} FROM CHANGETABLE(CHANGES {this._userTable.BracketQuotedFullName}, @last_sync_version) AS c LEFT OUTER JOIN {this._workerTableName} AS w WITH (TABLOCKX) ON {workerTableJoinCondition} LEFT OUTER JOIN {this._userTable.BracketQuotedFullName} AS u ON {userTableJoinCondition} WHERE - (w.LeaseExpirationTime IS NULL AND (w.ChangeVersion IS NULL OR w.ChangeVersion < c.SYS_CHANGE_VERSION) OR - w.LeaseExpirationTime < SYSDATETIME()) AND - (w.AttemptCount IS NULL OR w.AttemptCount < {MaxAttemptCount}) + (w.{SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName} IS NULL AND + (w.{SqlTriggerConstants.WorkerTableChangeVersionColumnName} IS NULL OR w.{SqlTriggerConstants.WorkerTableChangeVersionColumnName} < c.SYS_CHANGE_VERSION) OR + w.{SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName} < SYSDATETIME()) AND + (w.{SqlTriggerConstants.WorkerTableAttemptCountColumnName} IS NULL OR w.{SqlTriggerConstants.WorkerTableAttemptCountColumnName} < {MaxAttemptCount}) ORDER BY c.SYS_CHANGE_VERSION ASC; "; @@ -694,9 +695,9 @@ IF NOT EXISTS (SELECT * FROM {this._workerTableName} WITH (TABLOCKX) WHERE {this ELSE UPDATE {this._workerTableName} WITH (TABLOCKX) SET - ChangeVersion = {changeVersion}, - AttemptCount = AttemptCount + 1, - LeaseExpirationTime = DATEADD(second, {LeaseIntervalInSeconds}, SYSDATETIME()) + {SqlTriggerConstants.WorkerTableChangeVersionColumnName} = {changeVersion}, + {SqlTriggerConstants.WorkerTableAttemptCountColumnName} = {SqlTriggerConstants.WorkerTableAttemptCountColumnName} + 1, + {SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName} = DATEADD(second, {LeaseIntervalInSeconds}, SYSDATETIME()) WHERE {this._rowMatchConditions[rowIndex]}; "); } @@ -715,7 +716,7 @@ private SqlCommand BuildRenewLeasesCommand(SqlConnection connection) string renewLeasesQuery = $@" UPDATE {this._workerTableName} WITH (TABLOCKX) - SET LeaseExpirationTime = DATEADD(second, {LeaseIntervalInSeconds}, SYSDATETIME()) + SET {SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName} = DATEADD(second, {LeaseIntervalInSeconds}, SYSDATETIME()) WHERE {matchCondition}; "; @@ -738,13 +739,16 @@ private SqlCommand BuildReleaseLeasesCommand(SqlConnection connection, SqlTransa string changeVersion = this._rows[rowIndex]["SYS_CHANGE_VERSION"]; releaseLeasesQuery.Append($@" - SELECT @current_change_version = ChangeVersion + SELECT @current_change_version = {SqlTriggerConstants.WorkerTableChangeVersionColumnName} FROM {this._workerTableName} WITH (TABLOCKX) WHERE {this._rowMatchConditions[rowIndex]}; IF @current_change_version <= {changeVersion} UPDATE {this._workerTableName} WITH (TABLOCKX) - SET ChangeVersion = {changeVersion}, AttemptCount = 0, LeaseExpirationTime = NULL + SET + {SqlTriggerConstants.WorkerTableChangeVersionColumnName} = {changeVersion}, + {SqlTriggerConstants.WorkerTableAttemptCountColumnName} = 0, + {SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName} = NULL WHERE {this._rowMatchConditions[rowIndex]}; "); } @@ -778,8 +782,10 @@ FROM CHANGETABLE(CHANGES {this._userTable.BracketQuotedFullName}, @current_last_ LEFT OUTER JOIN {this._workerTableName} AS w WITH (TABLOCKX) ON {workerTableJoinCondition} WHERE c.SYS_CHANGE_VERSION <= {newLastSyncVersion} AND - ((w.ChangeVersion IS NULL OR w.ChangeVersion != c.SYS_CHANGE_VERSION OR w.LeaseExpirationTime IS NOT NULL) AND - (w.AttemptCount IS NULL OR w.AttemptCount < {MaxAttemptCount}))) AS Changes + ((w.{SqlTriggerConstants.WorkerTableChangeVersionColumnName} IS NULL OR + w.{SqlTriggerConstants.WorkerTableChangeVersionColumnName} != c.SYS_CHANGE_VERSION OR + w.{SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName} IS NOT NULL) AND + (w.{SqlTriggerConstants.WorkerTableAttemptCountColumnName} IS NULL OR w.{SqlTriggerConstants.WorkerTableAttemptCountColumnName} < {MaxAttemptCount}))) AS Changes IF @unprocessed_changes = 0 AND @current_last_sync_version < {newLastSyncVersion} BEGIN @@ -787,7 +793,7 @@ FROM CHANGETABLE(CHANGES {this._userTable.BracketQuotedFullName}, @current_last_ SET LastSyncVersion = {newLastSyncVersion} WHERE UserFunctionID = '{this._userFunctionId}' AND UserTableID = {this._userTableId}; - DELETE FROM {this._workerTableName} WITH (TABLOCKX) WHERE ChangeVersion <= {newLastSyncVersion}; + DELETE FROM {this._workerTableName} WITH (TABLOCKX) WHERE {SqlTriggerConstants.WorkerTableChangeVersionColumnName} <= {newLastSyncVersion}; END "; diff --git a/src/TriggerBinding/SqlTriggerConstants.cs b/src/TriggerBinding/SqlTriggerConstants.cs index 6aa13358d..3b8b828e1 100644 --- a/src/TriggerBinding/SqlTriggerConstants.cs +++ b/src/TriggerBinding/SqlTriggerConstants.cs @@ -10,5 +10,9 @@ internal static class SqlTriggerConstants public const string GlobalStateTableName = "[" + SchemaName + "].[GlobalState]"; public const string WorkerTableNameFormat = "[" + SchemaName + "].[Worker_{0}]"; + + public const string WorkerTableChangeVersionColumnName = "_az_func_ChangeVersion"; + public const string WorkerTableAttemptCountColumnName = "_az_func_AttemptCount"; + public const string WorkerTableLeaseExpirationTimeColumnName = "_az_func_LeaseExpirationTime"; } } \ No newline at end of file diff --git a/src/TriggerBinding/SqlTriggerListener.cs b/src/TriggerBinding/SqlTriggerListener.cs index adab107d8..7c5cd4169 100644 --- a/src/TriggerBinding/SqlTriggerListener.cs +++ b/src/TriggerBinding/SqlTriggerListener.cs @@ -261,7 +261,13 @@ FROM sys.indexes AS i throw new InvalidOperationException($"Could not find primary key created in table: '{this._userTable.FullName}'."); } - string[] reservedColumnNames = new[] { "ChangeVersion", "AttemptCount", "LeaseExpirationTime" }; + string[] reservedColumnNames = new[] + { + SqlTriggerConstants.WorkerTableChangeVersionColumnName, + SqlTriggerConstants.WorkerTableAttemptCountColumnName, + SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName + }; + var conflictingColumnNames = primaryKeyColumns.Select(col => col.name).Intersect(reservedColumnNames).ToList(); if (conflictingColumnNames.Count > 0) @@ -419,9 +425,9 @@ private static async Task CreateWorkerTableAsync( IF OBJECT_ID(N'{workerTableName}', 'U') IS NULL CREATE TABLE {workerTableName} ( {primaryKeysWithTypes}, - ChangeVersion bigint NOT NULL, - AttemptCount int NOT NULL, - LeaseExpirationTime datetime2, + {SqlTriggerConstants.WorkerTableChangeVersionColumnName} bigint NOT NULL, + {SqlTriggerConstants.WorkerTableAttemptCountColumnName} int NOT NULL, + {SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName} datetime2, PRIMARY KEY ({primaryKeys}) ); "; diff --git a/test/Integration/SqlTriggerBindingIntegrationTests.cs b/test/Integration/SqlTriggerBindingIntegrationTests.cs index 42fa6a1f5..758ea303f 100644 --- a/test/Integration/SqlTriggerBindingIntegrationTests.cs +++ b/test/Integration/SqlTriggerBindingIntegrationTests.cs @@ -131,7 +131,7 @@ public void ReservedPrimaryKeyColumnNamesTriggerTest() this.StartFunctionsHostAndWaitForError( nameof(ReservedPrimaryKeyColumnNamesTrigger), true, - "Found reserved column name(s): 'ChangeVersion', 'AttemptCount', 'LeaseExpirationTime' in table: 'dbo.ProductsWithReservedPrimaryKeyColumnNames'." + + "Found reserved column name(s): '_az_func_ChangeVersion', '_az_func_AttemptCount', '_az_func_LeaseExpirationTime' in table: 'dbo.ProductsWithReservedPrimaryKeyColumnNames'." + " Please rename them to be able to use trigger binding."); } @@ -223,8 +223,13 @@ private static void ValidateProductChanges(List> changes, int int count = last_id - first_id + 1; Assert.Equal(count, changes.Count); + // Since the table rows are changed with a single SQL statement, the changes are not guaranteed to arrive in + // ProductID-order. Occasionally, we find the items in the second batch are passed to the user function in + // reverse order, which is an expected behavior. + IEnumerable> orderedChanges = changes.OrderBy(change => change.Item.ProductID); + int id = first_id; - foreach (SqlChange change in changes) + foreach (SqlChange change in orderedChanges) { Assert.Equal(operation, change.Operation); Product product = change.Item; diff --git a/test/Integration/test-csharp/Database/Tables/ProductsWithReservedPrimaryKeyColumnNames.sql b/test/Integration/test-csharp/Database/Tables/ProductsWithReservedPrimaryKeyColumnNames.sql index 032b4a745..ac05fb59a 100644 --- a/test/Integration/test-csharp/Database/Tables/ProductsWithReservedPrimaryKeyColumnNames.sql +++ b/test/Integration/test-csharp/Database/Tables/ProductsWithReservedPrimaryKeyColumnNames.sql @@ -1,14 +1,14 @@ CREATE TABLE [ProductsWithReservedPrimaryKeyColumnNames] ( [ProductId] [int] NOT NULL IDENTITY(1, 1), - [ChangeVersion] [int] NOT NULL, - [AttemptCount] [int] NOT NULL, - [LeaseExpirationTime] [int] NOT NULL, + [_az_func_ChangeVersion] [int] NOT NULL, + [_az_func_AttemptCount] [int] NOT NULL, + [_az_func_LeaseExpirationTime] [int] NOT NULL, [Name] [nvarchar](100) NULL, [Cost] [int] NULL, PRIMARY KEY ( ProductId, - ChangeVersion, - AttemptCount, - LeaseExpirationTime + _az_func_ChangeVersion, + _az_func_AttemptCount, + _az_func_LeaseExpirationTime ) ); \ No newline at end of file