Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ The trigger binding utilizes SQL [change tracking](https://docs.microsoft.com/sq

For more information, please refer to the documentation [here](https://docs.microsoft.com/sql/relational-databases/track-changes/enable-and-disable-change-tracking-sql-server#enable-change-tracking-for-a-table). The trigger needs to have read access on the table being monitored for changes as well as to the change tracking system tables. It also needs write access to an `az_func` schema within the database, where it will create additional worker tables to store the trigger states and leases. Each function trigger will thus have an associated change tracking table and worker table.

> **NOTE:** The worker table contains all columns corresponding to the primary key from the user table and three additional columns named `ChangeVersion`, `AttemptCount` and `LeastExpirationTime`. If any of the primary key columns happen to have the same name, that will result in an error message listing any conflicts. In this case, the listed primary key columns must be renamed for the trigger to work.
> **NOTE:** The worker table contains all columns corresponding to the primary key from the user table and three additional columns named `_az_func_ChangeVersion`, `_az_func_AttemptCount` and `_az_func_LeastExpirationTime`. If any of the primary key columns happen to have the same name, that will result in an error message listing any conflicts. In this case, the listed primary key columns must be renamed for the trigger to work.

#### Trigger Samples
The trigger binding takes two [arguments](https://github.com/Azure/azure-functions-sql-extension/blob/main/src/TriggerBinding/SqlTriggerAttribute.cs)
Expand Down
32 changes: 19 additions & 13 deletions src/TriggerBinding/SqlTableChangeMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -657,14 +657,15 @@ private SqlCommand BuildGetChangesCommand(SqlConnection connection, SqlTransacti
SELECT TOP {BatchSize}
{selectList},
c.SYS_CHANGE_VERSION, c.SYS_CHANGE_OPERATION,
w.ChangeVersion, w.AttemptCount, w.LeaseExpirationTime
w.{SqlTriggerConstants.WorkerTableChangeVersionColumnName}, w.{SqlTriggerConstants.WorkerTableAttemptCountColumnName}, w.{SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName}
FROM CHANGETABLE(CHANGES {this._userTable.BracketQuotedFullName}, @last_sync_version) AS c
LEFT OUTER JOIN {this._workerTableName} AS w WITH (TABLOCKX) ON {workerTableJoinCondition}
LEFT OUTER JOIN {this._userTable.BracketQuotedFullName} AS u ON {userTableJoinCondition}
WHERE
(w.LeaseExpirationTime IS NULL AND (w.ChangeVersion IS NULL OR w.ChangeVersion < c.SYS_CHANGE_VERSION) OR
w.LeaseExpirationTime < SYSDATETIME()) AND
(w.AttemptCount IS NULL OR w.AttemptCount < {MaxAttemptCount})
(w.{SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName} IS NULL AND
(w.{SqlTriggerConstants.WorkerTableChangeVersionColumnName} IS NULL OR w.{SqlTriggerConstants.WorkerTableChangeVersionColumnName} < c.SYS_CHANGE_VERSION) OR
w.{SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName} < SYSDATETIME()) AND
(w.{SqlTriggerConstants.WorkerTableAttemptCountColumnName} IS NULL OR w.{SqlTriggerConstants.WorkerTableAttemptCountColumnName} < {MaxAttemptCount})
ORDER BY c.SYS_CHANGE_VERSION ASC;
";

Expand Down Expand Up @@ -694,9 +695,9 @@ IF NOT EXISTS (SELECT * FROM {this._workerTableName} WITH (TABLOCKX) WHERE {this
ELSE
UPDATE {this._workerTableName} WITH (TABLOCKX)
SET
ChangeVersion = {changeVersion},
AttemptCount = AttemptCount + 1,
LeaseExpirationTime = DATEADD(second, {LeaseIntervalInSeconds}, SYSDATETIME())
{SqlTriggerConstants.WorkerTableChangeVersionColumnName} = {changeVersion},
{SqlTriggerConstants.WorkerTableAttemptCountColumnName} = {SqlTriggerConstants.WorkerTableAttemptCountColumnName} + 1,
{SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName} = DATEADD(second, {LeaseIntervalInSeconds}, SYSDATETIME())
WHERE {this._rowMatchConditions[rowIndex]};
");
}
Expand All @@ -715,7 +716,7 @@ private SqlCommand BuildRenewLeasesCommand(SqlConnection connection)

string renewLeasesQuery = $@"
UPDATE {this._workerTableName} WITH (TABLOCKX)
SET LeaseExpirationTime = DATEADD(second, {LeaseIntervalInSeconds}, SYSDATETIME())
SET {SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName} = DATEADD(second, {LeaseIntervalInSeconds}, SYSDATETIME())
WHERE {matchCondition};
";

Expand All @@ -738,13 +739,16 @@ private SqlCommand BuildReleaseLeasesCommand(SqlConnection connection, SqlTransa
string changeVersion = this._rows[rowIndex]["SYS_CHANGE_VERSION"];

releaseLeasesQuery.Append($@"
SELECT @current_change_version = ChangeVersion
SELECT @current_change_version = {SqlTriggerConstants.WorkerTableChangeVersionColumnName}
FROM {this._workerTableName} WITH (TABLOCKX)
WHERE {this._rowMatchConditions[rowIndex]};

IF @current_change_version <= {changeVersion}
UPDATE {this._workerTableName} WITH (TABLOCKX)
SET ChangeVersion = {changeVersion}, AttemptCount = 0, LeaseExpirationTime = NULL
SET
{SqlTriggerConstants.WorkerTableChangeVersionColumnName} = {changeVersion},
{SqlTriggerConstants.WorkerTableAttemptCountColumnName} = 0,
{SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName} = NULL
WHERE {this._rowMatchConditions[rowIndex]};
");
}
Expand Down Expand Up @@ -778,16 +782,18 @@ FROM CHANGETABLE(CHANGES {this._userTable.BracketQuotedFullName}, @current_last_
LEFT OUTER JOIN {this._workerTableName} AS w WITH (TABLOCKX) ON {workerTableJoinCondition}
WHERE
c.SYS_CHANGE_VERSION <= {newLastSyncVersion} AND
((w.ChangeVersion IS NULL OR w.ChangeVersion != c.SYS_CHANGE_VERSION OR w.LeaseExpirationTime IS NOT NULL) AND
(w.AttemptCount IS NULL OR w.AttemptCount < {MaxAttemptCount}))) AS Changes
((w.{SqlTriggerConstants.WorkerTableChangeVersionColumnName} IS NULL OR
w.{SqlTriggerConstants.WorkerTableChangeVersionColumnName} != c.SYS_CHANGE_VERSION OR
w.{SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName} IS NOT NULL) AND
(w.{SqlTriggerConstants.WorkerTableAttemptCountColumnName} IS NULL OR w.{SqlTriggerConstants.WorkerTableAttemptCountColumnName} < {MaxAttemptCount}))) AS Changes

IF @unprocessed_changes = 0 AND @current_last_sync_version < {newLastSyncVersion}
BEGIN
UPDATE {SqlTriggerConstants.GlobalStateTableName}
SET LastSyncVersion = {newLastSyncVersion}
WHERE UserFunctionID = '{this._userFunctionId}' AND UserTableID = {this._userTableId};

DELETE FROM {this._workerTableName} WITH (TABLOCKX) WHERE ChangeVersion <= {newLastSyncVersion};
DELETE FROM {this._workerTableName} WITH (TABLOCKX) WHERE {SqlTriggerConstants.WorkerTableChangeVersionColumnName} <= {newLastSyncVersion};
END
";

Expand Down
4 changes: 4 additions & 0 deletions src/TriggerBinding/SqlTriggerConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,9 @@ internal static class SqlTriggerConstants
public const string GlobalStateTableName = "[" + SchemaName + "].[GlobalState]";

public const string WorkerTableNameFormat = "[" + SchemaName + "].[Worker_{0}]";

public const string WorkerTableChangeVersionColumnName = "_az_func_ChangeVersion";
public const string WorkerTableAttemptCountColumnName = "_az_func_AttemptCount";
public const string WorkerTableLeaseExpirationTimeColumnName = "_az_func_LeaseExpirationTime";
}
}
14 changes: 10 additions & 4 deletions src/TriggerBinding/SqlTriggerListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,13 @@ FROM sys.indexes AS i
throw new InvalidOperationException($"Could not find primary key created in table: '{this._userTable.FullName}'.");
}

string[] reservedColumnNames = new[] { "ChangeVersion", "AttemptCount", "LeaseExpirationTime" };
string[] reservedColumnNames = new[]
{
SqlTriggerConstants.WorkerTableChangeVersionColumnName,
SqlTriggerConstants.WorkerTableAttemptCountColumnName,
SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName
};

var conflictingColumnNames = primaryKeyColumns.Select(col => col.name).Intersect(reservedColumnNames).ToList();

if (conflictingColumnNames.Count > 0)
Expand Down Expand Up @@ -419,9 +425,9 @@ private static async Task<long> CreateWorkerTableAsync(
IF OBJECT_ID(N'{workerTableName}', 'U') IS NULL
CREATE TABLE {workerTableName} (
{primaryKeysWithTypes},
ChangeVersion bigint NOT NULL,
AttemptCount int NOT NULL,
LeaseExpirationTime datetime2,
{SqlTriggerConstants.WorkerTableChangeVersionColumnName} bigint NOT NULL,
{SqlTriggerConstants.WorkerTableAttemptCountColumnName} int NOT NULL,
{SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName} datetime2,
PRIMARY KEY ({primaryKeys})
);
";
Expand Down
9 changes: 7 additions & 2 deletions test/Integration/SqlTriggerBindingIntegrationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void ReservedPrimaryKeyColumnNamesTriggerTest()
this.StartFunctionsHostAndWaitForError(
nameof(ReservedPrimaryKeyColumnNamesTrigger),
true,
"Found reserved column name(s): 'ChangeVersion', 'AttemptCount', 'LeaseExpirationTime' in table: 'dbo.ProductsWithReservedPrimaryKeyColumnNames'." +
"Found reserved column name(s): '_az_func_ChangeVersion', '_az_func_AttemptCount', '_az_func_LeaseExpirationTime' in table: 'dbo.ProductsWithReservedPrimaryKeyColumnNames'." +
" Please rename them to be able to use trigger binding.");
}

Expand Down Expand Up @@ -223,8 +223,13 @@ private static void ValidateProductChanges(List<SqlChange<Product>> changes, int
int count = last_id - first_id + 1;
Assert.Equal(count, changes.Count);

// Since the table rows are changed with a single SQL statement, the changes are not guaranteed to arrive in
// ProductID-order. Occasionally, we find the items in the second batch are passed to the user function in
// reverse order, which is an expected behavior.
IEnumerable<SqlChange<Product>> orderedChanges = changes.OrderBy(change => change.Item.ProductID);

int id = first_id;
foreach (SqlChange<Product> change in changes)
foreach (SqlChange<Product> change in orderedChanges)
{
Assert.Equal(operation, change.Operation);
Product product = change.Item;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
CREATE TABLE [ProductsWithReservedPrimaryKeyColumnNames] (
[ProductId] [int] NOT NULL IDENTITY(1, 1),
[ChangeVersion] [int] NOT NULL,
[AttemptCount] [int] NOT NULL,
[LeaseExpirationTime] [int] NOT NULL,
[_az_func_ChangeVersion] [int] NOT NULL,
[_az_func_AttemptCount] [int] NOT NULL,
[_az_func_LeaseExpirationTime] [int] NOT NULL,
[Name] [nvarchar](100) NULL,
[Cost] [int] NULL,
PRIMARY KEY (
ProductId,
ChangeVersion,
AttemptCount,
LeaseExpirationTime
_az_func_ChangeVersion,
_az_func_AttemptCount,
_az_func_LeaseExpirationTime
)
);