From c89e7462f29d512a29b848737dbed24265c27f76 Mon Sep 17 00:00:00 2001 From: Jatin Sanghvi <20547963+JatinSanghvi@users.noreply.github.com> Date: Tue, 23 Aug 2022 21:59:45 +0530 Subject: [PATCH 1/3] Use constants for worker table column names --- src/TriggerBinding/SqlTableChangeMonitor.cs | 30 ++++++++++++--------- src/TriggerBinding/SqlTriggerConstants.cs | 4 +++ src/TriggerBinding/SqlTriggerListener.cs | 12 ++++++--- 3 files changed, 29 insertions(+), 17 deletions(-) diff --git a/src/TriggerBinding/SqlTableChangeMonitor.cs b/src/TriggerBinding/SqlTableChangeMonitor.cs index 3ec7120e0..74ee230dc 100644 --- a/src/TriggerBinding/SqlTableChangeMonitor.cs +++ b/src/TriggerBinding/SqlTableChangeMonitor.cs @@ -36,6 +36,10 @@ internal sealed class SqlTableChangeMonitor : IDisposable public const int LeaseIntervalInSeconds = 60; public const int LeaseRenewalIntervalInSeconds = 15; + private const string ChangeVersionColumnName = SqlTriggerConstants.WorkerTableChangeVersionColumnName; + private const string AttemptCountColumnName = SqlTriggerConstants.WorkerTableAttemptCountColumnName; + private const string LeaseExpirationTimeColumnName = SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName; + private readonly string _connectionString; private readonly int _userTableId; private readonly SqlObject _userTable; @@ -657,14 +661,14 @@ private SqlCommand BuildGetChangesCommand(SqlConnection connection, SqlTransacti SELECT TOP {BatchSize} {selectList}, c.SYS_CHANGE_VERSION, c.SYS_CHANGE_OPERATION, - w.ChangeVersion, w.AttemptCount, w.LeaseExpirationTime + w.{ChangeVersionColumnName}, w.{AttemptCountColumnName}, w.{LeaseExpirationTimeColumnName} FROM CHANGETABLE(CHANGES {this._userTable.BracketQuotedFullName}, @last_sync_version) AS c LEFT OUTER JOIN {this._workerTableName} AS w WITH (TABLOCKX) ON {workerTableJoinCondition} LEFT OUTER JOIN {this._userTable.BracketQuotedFullName} AS u ON {userTableJoinCondition} WHERE - (w.LeaseExpirationTime IS NULL AND (w.ChangeVersion IS NULL OR w.ChangeVersion < c.SYS_CHANGE_VERSION) OR - w.LeaseExpirationTime < SYSDATETIME()) AND - (w.AttemptCount IS NULL OR w.AttemptCount < {MaxAttemptCount}) + (w.{LeaseExpirationTimeColumnName} IS NULL AND (w.{ChangeVersionColumnName} IS NULL OR w.{ChangeVersionColumnName} < c.SYS_CHANGE_VERSION) OR + w.{LeaseExpirationTimeColumnName} < SYSDATETIME()) AND + (w.{AttemptCountColumnName} IS NULL OR w.{AttemptCountColumnName} < {MaxAttemptCount}) ORDER BY c.SYS_CHANGE_VERSION ASC; "; @@ -694,9 +698,9 @@ IF NOT EXISTS (SELECT * FROM {this._workerTableName} WITH (TABLOCKX) WHERE {this ELSE UPDATE {this._workerTableName} WITH (TABLOCKX) SET - ChangeVersion = {changeVersion}, - AttemptCount = AttemptCount + 1, - LeaseExpirationTime = DATEADD(second, {LeaseIntervalInSeconds}, SYSDATETIME()) + {ChangeVersionColumnName} = {changeVersion}, + {AttemptCountColumnName} = {AttemptCountColumnName} + 1, + {LeaseExpirationTimeColumnName} = DATEADD(second, {LeaseIntervalInSeconds}, SYSDATETIME()) WHERE {this._rowMatchConditions[rowIndex]}; "); } @@ -715,7 +719,7 @@ private SqlCommand BuildRenewLeasesCommand(SqlConnection connection) string renewLeasesQuery = $@" UPDATE {this._workerTableName} WITH (TABLOCKX) - SET LeaseExpirationTime = DATEADD(second, {LeaseIntervalInSeconds}, SYSDATETIME()) + SET {LeaseExpirationTimeColumnName} = DATEADD(second, {LeaseIntervalInSeconds}, SYSDATETIME()) WHERE {matchCondition}; "; @@ -738,13 +742,13 @@ private SqlCommand BuildReleaseLeasesCommand(SqlConnection connection, SqlTransa string changeVersion = this._rows[rowIndex]["SYS_CHANGE_VERSION"]; releaseLeasesQuery.Append($@" - SELECT @current_change_version = ChangeVersion + SELECT @current_change_version = {ChangeVersionColumnName} FROM {this._workerTableName} WITH (TABLOCKX) WHERE {this._rowMatchConditions[rowIndex]}; IF @current_change_version <= {changeVersion} UPDATE {this._workerTableName} WITH (TABLOCKX) - SET ChangeVersion = {changeVersion}, AttemptCount = 0, LeaseExpirationTime = NULL + SET {ChangeVersionColumnName} = {changeVersion}, {AttemptCountColumnName} = 0, {LeaseExpirationTimeColumnName} = NULL WHERE {this._rowMatchConditions[rowIndex]}; "); } @@ -778,8 +782,8 @@ FROM CHANGETABLE(CHANGES {this._userTable.BracketQuotedFullName}, @current_last_ LEFT OUTER JOIN {this._workerTableName} AS w WITH (TABLOCKX) ON {workerTableJoinCondition} WHERE c.SYS_CHANGE_VERSION <= {newLastSyncVersion} AND - ((w.ChangeVersion IS NULL OR w.ChangeVersion != c.SYS_CHANGE_VERSION OR w.LeaseExpirationTime IS NOT NULL) AND - (w.AttemptCount IS NULL OR w.AttemptCount < {MaxAttemptCount}))) AS Changes + ((w.{ChangeVersionColumnName} IS NULL OR w.{ChangeVersionColumnName} != c.SYS_CHANGE_VERSION OR w.{LeaseExpirationTimeColumnName} IS NOT NULL) AND + (w.{AttemptCountColumnName} IS NULL OR w.{AttemptCountColumnName} < {MaxAttemptCount}))) AS Changes IF @unprocessed_changes = 0 AND @current_last_sync_version < {newLastSyncVersion} BEGIN @@ -787,7 +791,7 @@ FROM CHANGETABLE(CHANGES {this._userTable.BracketQuotedFullName}, @current_last_ SET LastSyncVersion = {newLastSyncVersion} WHERE UserFunctionID = '{this._userFunctionId}' AND UserTableID = {this._userTableId}; - DELETE FROM {this._workerTableName} WITH (TABLOCKX) WHERE ChangeVersion <= {newLastSyncVersion}; + DELETE FROM {this._workerTableName} WITH (TABLOCKX) WHERE {ChangeVersionColumnName} <= {newLastSyncVersion}; END "; diff --git a/src/TriggerBinding/SqlTriggerConstants.cs b/src/TriggerBinding/SqlTriggerConstants.cs index 6aa13358d..caeae3a78 100644 --- a/src/TriggerBinding/SqlTriggerConstants.cs +++ b/src/TriggerBinding/SqlTriggerConstants.cs @@ -10,5 +10,9 @@ internal static class SqlTriggerConstants public const string GlobalStateTableName = "[" + SchemaName + "].[GlobalState]"; public const string WorkerTableNameFormat = "[" + SchemaName + "].[Worker_{0}]"; + + public const string WorkerTableChangeVersionColumnName = "ChangeVersion"; + public const string WorkerTableAttemptCountColumnName = "AttemptCount"; + public const string WorkerTableLeaseExpirationTimeColumnName = "LeaseExpirationTime"; } } \ No newline at end of file diff --git a/src/TriggerBinding/SqlTriggerListener.cs b/src/TriggerBinding/SqlTriggerListener.cs index f99b0f126..a8eceab3e 100644 --- a/src/TriggerBinding/SqlTriggerListener.cs +++ b/src/TriggerBinding/SqlTriggerListener.cs @@ -29,6 +29,10 @@ internal sealed class SqlTriggerListener : IListener private const int ListenerStopping = 3; private const int ListenerStopped = 4; + private const string ChangeVersionColumnName = SqlTriggerConstants.WorkerTableChangeVersionColumnName; + private const string AttemptCountColumnName = SqlTriggerConstants.WorkerTableAttemptCountColumnName; + private const string LeaseExpirationTimeColumnName = SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName; + private readonly SqlObject _userTable; private readonly string _connectionString; private readonly string _userFunctionId; @@ -229,7 +233,7 @@ FROM sys.indexes AS i using (var getPrimaryKeyColumnsCommand = new SqlCommand(getPrimaryKeyColumnsQuery, connection)) using (SqlDataReader reader = await getPrimaryKeyColumnsCommand.ExecuteReaderAsync(cancellationToken)) { - string[] reservedColumnNames = new string[] { "ChangeVersion", "AttemptCount", "LeaseExpirationTime" }; + string[] reservedColumnNames = new string[] { ChangeVersionColumnName, AttemptCountColumnName, LeaseExpirationTimeColumnName }; string[] variableLengthTypes = new string[] { "varchar", "nvarchar", "nchar", "char", "binary", "varbinary" }; string[] variablePrecisionTypes = new string[] { "numeric", "decimal" }; @@ -416,9 +420,9 @@ private static async Task CreateWorkerTableAsync( IF OBJECT_ID(N'{workerTableName}', 'U') IS NULL CREATE TABLE {workerTableName} ( {primaryKeysWithTypes}, - ChangeVersion bigint NOT NULL, - AttemptCount int NOT NULL, - LeaseExpirationTime datetime2, + {ChangeVersionColumnName} bigint NOT NULL, + {AttemptCountColumnName} int NOT NULL, + {LeaseExpirationTimeColumnName} datetime2, PRIMARY KEY ({primaryKeys}) ); "; From 93ea708ec80e313be5cf8c08b8c7293c8c923976 Mon Sep 17 00:00:00 2001 From: Jatin Sanghvi <20547963+JatinSanghvi@users.noreply.github.com> Date: Wed, 24 Aug 2022 13:30:13 +0530 Subject: [PATCH 2/3] Address review comments --- README.md | 2 +- src/TriggerBinding/SqlTableChangeMonitor.cs | 30 +++++++++------------ src/TriggerBinding/SqlTriggerConstants.cs | 6 ++--- src/TriggerBinding/SqlTriggerListener.cs | 12 +++------ 4 files changed, 21 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index 4c57915bf..bcf44b59b 100644 --- a/README.md +++ b/README.md @@ -607,7 +607,7 @@ The trigger binding utilizes SQL [change tracking](https://docs.microsoft.com/sq For more information, please refer to the documentation [here](https://docs.microsoft.com/sql/relational-databases/track-changes/enable-and-disable-change-tracking-sql-server#enable-change-tracking-for-a-table). The trigger needs to have read access on the table being monitored for changes as well as to the change tracking system tables. It also needs write access to an `az_func` schema within the database, where it will create additional worker tables to store the trigger states and leases. Each function trigger will thus have an associated change tracking table and worker table. - > **NOTE:** The worker table contains all columns corresponding to the primary key from the user table and three additional columns named `ChangeVersion`, `AttemptCount` and `LeastExpirationTime`. If any of the primary key columns happen to have the same name, that will result in an error message listing any conflicts. In this case, the listed primary key columns must be renamed for the trigger to work. + > **NOTE:** The worker table contains all columns corresponding to the primary key from the user table and three additional columns named `_az_func_ChangeVersion`, `_az_func_AttemptCount` and `_az_func_LeastExpirationTime`. If any of the primary key columns happen to have the same name, that will result in an error message listing any conflicts. In this case, the listed primary key columns must be renamed for the trigger to work. #### Trigger Samples The trigger binding takes two [arguments](https://github.com/Azure/azure-functions-sql-extension/blob/main/src/TriggerBinding/SqlTriggerAttribute.cs) diff --git a/src/TriggerBinding/SqlTableChangeMonitor.cs b/src/TriggerBinding/SqlTableChangeMonitor.cs index 74ee230dc..aaf0ff668 100644 --- a/src/TriggerBinding/SqlTableChangeMonitor.cs +++ b/src/TriggerBinding/SqlTableChangeMonitor.cs @@ -36,10 +36,6 @@ internal sealed class SqlTableChangeMonitor : IDisposable public const int LeaseIntervalInSeconds = 60; public const int LeaseRenewalIntervalInSeconds = 15; - private const string ChangeVersionColumnName = SqlTriggerConstants.WorkerTableChangeVersionColumnName; - private const string AttemptCountColumnName = SqlTriggerConstants.WorkerTableAttemptCountColumnName; - private const string LeaseExpirationTimeColumnName = SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName; - private readonly string _connectionString; private readonly int _userTableId; private readonly SqlObject _userTable; @@ -661,14 +657,14 @@ private SqlCommand BuildGetChangesCommand(SqlConnection connection, SqlTransacti SELECT TOP {BatchSize} {selectList}, c.SYS_CHANGE_VERSION, c.SYS_CHANGE_OPERATION, - w.{ChangeVersionColumnName}, w.{AttemptCountColumnName}, w.{LeaseExpirationTimeColumnName} + w.{SqlTriggerConstants.WorkerTableChangeVersionColumnName}, w.{SqlTriggerConstants.WorkerTableAttemptCountColumnName}, w.{SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName} FROM CHANGETABLE(CHANGES {this._userTable.BracketQuotedFullName}, @last_sync_version) AS c LEFT OUTER JOIN {this._workerTableName} AS w WITH (TABLOCKX) ON {workerTableJoinCondition} LEFT OUTER JOIN {this._userTable.BracketQuotedFullName} AS u ON {userTableJoinCondition} WHERE - (w.{LeaseExpirationTimeColumnName} IS NULL AND (w.{ChangeVersionColumnName} IS NULL OR w.{ChangeVersionColumnName} < c.SYS_CHANGE_VERSION) OR - w.{LeaseExpirationTimeColumnName} < SYSDATETIME()) AND - (w.{AttemptCountColumnName} IS NULL OR w.{AttemptCountColumnName} < {MaxAttemptCount}) + (w.{SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName} IS NULL AND (w.{SqlTriggerConstants.WorkerTableChangeVersionColumnName} IS NULL OR w.{SqlTriggerConstants.WorkerTableChangeVersionColumnName} < c.SYS_CHANGE_VERSION) OR + w.{SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName} < SYSDATETIME()) AND + (w.{SqlTriggerConstants.WorkerTableAttemptCountColumnName} IS NULL OR w.{SqlTriggerConstants.WorkerTableAttemptCountColumnName} < {MaxAttemptCount}) ORDER BY c.SYS_CHANGE_VERSION ASC; "; @@ -698,9 +694,9 @@ IF NOT EXISTS (SELECT * FROM {this._workerTableName} WITH (TABLOCKX) WHERE {this ELSE UPDATE {this._workerTableName} WITH (TABLOCKX) SET - {ChangeVersionColumnName} = {changeVersion}, - {AttemptCountColumnName} = {AttemptCountColumnName} + 1, - {LeaseExpirationTimeColumnName} = DATEADD(second, {LeaseIntervalInSeconds}, SYSDATETIME()) + {SqlTriggerConstants.WorkerTableChangeVersionColumnName} = {changeVersion}, + {SqlTriggerConstants.WorkerTableAttemptCountColumnName} = {SqlTriggerConstants.WorkerTableAttemptCountColumnName} + 1, + {SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName} = DATEADD(second, {LeaseIntervalInSeconds}, SYSDATETIME()) WHERE {this._rowMatchConditions[rowIndex]}; "); } @@ -719,7 +715,7 @@ private SqlCommand BuildRenewLeasesCommand(SqlConnection connection) string renewLeasesQuery = $@" UPDATE {this._workerTableName} WITH (TABLOCKX) - SET {LeaseExpirationTimeColumnName} = DATEADD(second, {LeaseIntervalInSeconds}, SYSDATETIME()) + SET {SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName} = DATEADD(second, {LeaseIntervalInSeconds}, SYSDATETIME()) WHERE {matchCondition}; "; @@ -742,13 +738,13 @@ private SqlCommand BuildReleaseLeasesCommand(SqlConnection connection, SqlTransa string changeVersion = this._rows[rowIndex]["SYS_CHANGE_VERSION"]; releaseLeasesQuery.Append($@" - SELECT @current_change_version = {ChangeVersionColumnName} + SELECT @current_change_version = {SqlTriggerConstants.WorkerTableChangeVersionColumnName} FROM {this._workerTableName} WITH (TABLOCKX) WHERE {this._rowMatchConditions[rowIndex]}; IF @current_change_version <= {changeVersion} UPDATE {this._workerTableName} WITH (TABLOCKX) - SET {ChangeVersionColumnName} = {changeVersion}, {AttemptCountColumnName} = 0, {LeaseExpirationTimeColumnName} = NULL + SET {SqlTriggerConstants.WorkerTableChangeVersionColumnName} = {changeVersion}, {SqlTriggerConstants.WorkerTableAttemptCountColumnName} = 0, {SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName} = NULL WHERE {this._rowMatchConditions[rowIndex]}; "); } @@ -782,8 +778,8 @@ FROM CHANGETABLE(CHANGES {this._userTable.BracketQuotedFullName}, @current_last_ LEFT OUTER JOIN {this._workerTableName} AS w WITH (TABLOCKX) ON {workerTableJoinCondition} WHERE c.SYS_CHANGE_VERSION <= {newLastSyncVersion} AND - ((w.{ChangeVersionColumnName} IS NULL OR w.{ChangeVersionColumnName} != c.SYS_CHANGE_VERSION OR w.{LeaseExpirationTimeColumnName} IS NOT NULL) AND - (w.{AttemptCountColumnName} IS NULL OR w.{AttemptCountColumnName} < {MaxAttemptCount}))) AS Changes + ((w.{SqlTriggerConstants.WorkerTableChangeVersionColumnName} IS NULL OR w.{SqlTriggerConstants.WorkerTableChangeVersionColumnName} != c.SYS_CHANGE_VERSION OR w.{SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName} IS NOT NULL) AND + (w.{SqlTriggerConstants.WorkerTableAttemptCountColumnName} IS NULL OR w.{SqlTriggerConstants.WorkerTableAttemptCountColumnName} < {MaxAttemptCount}))) AS Changes IF @unprocessed_changes = 0 AND @current_last_sync_version < {newLastSyncVersion} BEGIN @@ -791,7 +787,7 @@ FROM CHANGETABLE(CHANGES {this._userTable.BracketQuotedFullName}, @current_last_ SET LastSyncVersion = {newLastSyncVersion} WHERE UserFunctionID = '{this._userFunctionId}' AND UserTableID = {this._userTableId}; - DELETE FROM {this._workerTableName} WITH (TABLOCKX) WHERE {ChangeVersionColumnName} <= {newLastSyncVersion}; + DELETE FROM {this._workerTableName} WITH (TABLOCKX) WHERE {SqlTriggerConstants.WorkerTableChangeVersionColumnName} <= {newLastSyncVersion}; END "; diff --git a/src/TriggerBinding/SqlTriggerConstants.cs b/src/TriggerBinding/SqlTriggerConstants.cs index caeae3a78..3b8b828e1 100644 --- a/src/TriggerBinding/SqlTriggerConstants.cs +++ b/src/TriggerBinding/SqlTriggerConstants.cs @@ -11,8 +11,8 @@ internal static class SqlTriggerConstants public const string WorkerTableNameFormat = "[" + SchemaName + "].[Worker_{0}]"; - public const string WorkerTableChangeVersionColumnName = "ChangeVersion"; - public const string WorkerTableAttemptCountColumnName = "AttemptCount"; - public const string WorkerTableLeaseExpirationTimeColumnName = "LeaseExpirationTime"; + public const string WorkerTableChangeVersionColumnName = "_az_func_ChangeVersion"; + public const string WorkerTableAttemptCountColumnName = "_az_func_AttemptCount"; + public const string WorkerTableLeaseExpirationTimeColumnName = "_az_func_LeaseExpirationTime"; } } \ No newline at end of file diff --git a/src/TriggerBinding/SqlTriggerListener.cs b/src/TriggerBinding/SqlTriggerListener.cs index a8eceab3e..913c7b369 100644 --- a/src/TriggerBinding/SqlTriggerListener.cs +++ b/src/TriggerBinding/SqlTriggerListener.cs @@ -29,10 +29,6 @@ internal sealed class SqlTriggerListener : IListener private const int ListenerStopping = 3; private const int ListenerStopped = 4; - private const string ChangeVersionColumnName = SqlTriggerConstants.WorkerTableChangeVersionColumnName; - private const string AttemptCountColumnName = SqlTriggerConstants.WorkerTableAttemptCountColumnName; - private const string LeaseExpirationTimeColumnName = SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName; - private readonly SqlObject _userTable; private readonly string _connectionString; private readonly string _userFunctionId; @@ -233,7 +229,7 @@ FROM sys.indexes AS i using (var getPrimaryKeyColumnsCommand = new SqlCommand(getPrimaryKeyColumnsQuery, connection)) using (SqlDataReader reader = await getPrimaryKeyColumnsCommand.ExecuteReaderAsync(cancellationToken)) { - string[] reservedColumnNames = new string[] { ChangeVersionColumnName, AttemptCountColumnName, LeaseExpirationTimeColumnName }; + string[] reservedColumnNames = new string[] { SqlTriggerConstants.WorkerTableChangeVersionColumnName, SqlTriggerConstants.WorkerTableAttemptCountColumnName, SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName }; string[] variableLengthTypes = new string[] { "varchar", "nvarchar", "nchar", "char", "binary", "varbinary" }; string[] variablePrecisionTypes = new string[] { "numeric", "decimal" }; @@ -420,9 +416,9 @@ private static async Task CreateWorkerTableAsync( IF OBJECT_ID(N'{workerTableName}', 'U') IS NULL CREATE TABLE {workerTableName} ( {primaryKeysWithTypes}, - {ChangeVersionColumnName} bigint NOT NULL, - {AttemptCountColumnName} int NOT NULL, - {LeaseExpirationTimeColumnName} datetime2, + {SqlTriggerConstants.WorkerTableChangeVersionColumnName} bigint NOT NULL, + {SqlTriggerConstants.WorkerTableAttemptCountColumnName} int NOT NULL, + {SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName} datetime2, PRIMARY KEY ({primaryKeys}) ); "; From 4dfc858b29272a8433118af5d233eac7fb246fd2 Mon Sep 17 00:00:00 2001 From: Jatin Sanghvi <20547963+JatinSanghvi@users.noreply.github.com> Date: Wed, 24 Aug 2022 14:45:24 +0530 Subject: [PATCH 3/3] Fix failing integration test --- src/TriggerBinding/SqlTableChangeMonitor.cs | 12 +++++++++--- src/TriggerBinding/SqlTriggerListener.cs | 8 +++++++- .../Integration/SqlTriggerBindingIntegrationTests.cs | 7 ++++++- 3 files changed, 22 insertions(+), 5 deletions(-) diff --git a/src/TriggerBinding/SqlTableChangeMonitor.cs b/src/TriggerBinding/SqlTableChangeMonitor.cs index aaf0ff668..1460753cd 100644 --- a/src/TriggerBinding/SqlTableChangeMonitor.cs +++ b/src/TriggerBinding/SqlTableChangeMonitor.cs @@ -662,7 +662,8 @@ FROM CHANGETABLE(CHANGES {this._userTable.BracketQuotedFullName}, @last_sync_ver LEFT OUTER JOIN {this._workerTableName} AS w WITH (TABLOCKX) ON {workerTableJoinCondition} LEFT OUTER JOIN {this._userTable.BracketQuotedFullName} AS u ON {userTableJoinCondition} WHERE - (w.{SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName} IS NULL AND (w.{SqlTriggerConstants.WorkerTableChangeVersionColumnName} IS NULL OR w.{SqlTriggerConstants.WorkerTableChangeVersionColumnName} < c.SYS_CHANGE_VERSION) OR + (w.{SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName} IS NULL AND + (w.{SqlTriggerConstants.WorkerTableChangeVersionColumnName} IS NULL OR w.{SqlTriggerConstants.WorkerTableChangeVersionColumnName} < c.SYS_CHANGE_VERSION) OR w.{SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName} < SYSDATETIME()) AND (w.{SqlTriggerConstants.WorkerTableAttemptCountColumnName} IS NULL OR w.{SqlTriggerConstants.WorkerTableAttemptCountColumnName} < {MaxAttemptCount}) ORDER BY c.SYS_CHANGE_VERSION ASC; @@ -744,7 +745,10 @@ private SqlCommand BuildReleaseLeasesCommand(SqlConnection connection, SqlTransa IF @current_change_version <= {changeVersion} UPDATE {this._workerTableName} WITH (TABLOCKX) - SET {SqlTriggerConstants.WorkerTableChangeVersionColumnName} = {changeVersion}, {SqlTriggerConstants.WorkerTableAttemptCountColumnName} = 0, {SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName} = NULL + SET + {SqlTriggerConstants.WorkerTableChangeVersionColumnName} = {changeVersion}, + {SqlTriggerConstants.WorkerTableAttemptCountColumnName} = 0, + {SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName} = NULL WHERE {this._rowMatchConditions[rowIndex]}; "); } @@ -778,7 +782,9 @@ FROM CHANGETABLE(CHANGES {this._userTable.BracketQuotedFullName}, @current_last_ LEFT OUTER JOIN {this._workerTableName} AS w WITH (TABLOCKX) ON {workerTableJoinCondition} WHERE c.SYS_CHANGE_VERSION <= {newLastSyncVersion} AND - ((w.{SqlTriggerConstants.WorkerTableChangeVersionColumnName} IS NULL OR w.{SqlTriggerConstants.WorkerTableChangeVersionColumnName} != c.SYS_CHANGE_VERSION OR w.{SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName} IS NOT NULL) AND + ((w.{SqlTriggerConstants.WorkerTableChangeVersionColumnName} IS NULL OR + w.{SqlTriggerConstants.WorkerTableChangeVersionColumnName} != c.SYS_CHANGE_VERSION OR + w.{SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName} IS NOT NULL) AND (w.{SqlTriggerConstants.WorkerTableAttemptCountColumnName} IS NULL OR w.{SqlTriggerConstants.WorkerTableAttemptCountColumnName} < {MaxAttemptCount}))) AS Changes IF @unprocessed_changes = 0 AND @current_last_sync_version < {newLastSyncVersion} diff --git a/src/TriggerBinding/SqlTriggerListener.cs b/src/TriggerBinding/SqlTriggerListener.cs index 913c7b369..f04a638b0 100644 --- a/src/TriggerBinding/SqlTriggerListener.cs +++ b/src/TriggerBinding/SqlTriggerListener.cs @@ -229,7 +229,13 @@ FROM sys.indexes AS i using (var getPrimaryKeyColumnsCommand = new SqlCommand(getPrimaryKeyColumnsQuery, connection)) using (SqlDataReader reader = await getPrimaryKeyColumnsCommand.ExecuteReaderAsync(cancellationToken)) { - string[] reservedColumnNames = new string[] { SqlTriggerConstants.WorkerTableChangeVersionColumnName, SqlTriggerConstants.WorkerTableAttemptCountColumnName, SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName }; + string[] reservedColumnNames = new string[] + { + SqlTriggerConstants.WorkerTableChangeVersionColumnName, + SqlTriggerConstants.WorkerTableAttemptCountColumnName, + SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName + }; + string[] variableLengthTypes = new string[] { "varchar", "nvarchar", "nchar", "char", "binary", "varbinary" }; string[] variablePrecisionTypes = new string[] { "numeric", "decimal" }; diff --git a/test/Integration/SqlTriggerBindingIntegrationTests.cs b/test/Integration/SqlTriggerBindingIntegrationTests.cs index e060642a3..b1b94b6c2 100644 --- a/test/Integration/SqlTriggerBindingIntegrationTests.cs +++ b/test/Integration/SqlTriggerBindingIntegrationTests.cs @@ -151,8 +151,13 @@ private static void ValidateProductChanges(List> changes, int int count = last_id - first_id + 1; Assert.Equal(count, changes.Count); + // Since the table rows are changed with a single SQL statement, the changes are not guaranteed to arrive in + // ProductID-order. Occasionally, we find the items in the second batch are passed to the user function in + // reverse order, which is an expected behavior. + IEnumerable> orderedChanges = changes.OrderBy(change => change.Item.ProductID); + int id = first_id; - foreach (SqlChange change in changes) + foreach (SqlChange change in orderedChanges) { Assert.Equal(operation, change.Operation); Product product = change.Item;