Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -605,9 +605,9 @@ The trigger binding utilizes SQL [change tracking](https://docs.microsoft.com/sq
ENABLE CHANGE_TRACKING;
```

For more information, please refer to the documentation [here](https://docs.microsoft.com/sql/relational-databases/track-changes/enable-and-disable-change-tracking-sql-server#enable-change-tracking-for-a-table). The trigger needs to have read access on the table being monitored for changes as well as to the change tracking system tables. It also needs write access to an `az_func` schema within the database, where it will create additional worker tables to store the trigger states and leases. Each function trigger will thus have an associated change tracking table and worker table.
For more information, please refer to the documentation [here](https://docs.microsoft.com/sql/relational-databases/track-changes/enable-and-disable-change-tracking-sql-server#enable-change-tracking-for-a-table). The trigger needs to have read access on the table being monitored for changes as well as to the change tracking system tables. It also needs write access to an `az_func` schema within the database, where it will create additional leases tables to store the trigger states and leases. Each function trigger will thus have an associated change tracking table and leases table.

> **NOTE:** The worker table contains all columns corresponding to the primary key from the user table and three additional columns named `_az_func_ChangeVersion`, `_az_func_AttemptCount` and `_az_func_LeastExpirationTime`. If any of the primary key columns happen to have the same name, that will result in an error message listing any conflicts. In this case, the listed primary key columns must be renamed for the trigger to work.
> **NOTE:** The leases table contains all columns corresponding to the primary key from the user table and three additional columns named `_az_func_ChangeVersion`, `_az_func_AttemptCount` and `_az_func_LeastExpirationTime`. If any of the primary key columns happen to have the same name, that will result in an error message listing any conflicts. In this case, the listed primary key columns must be renamed for the trigger to work.

#### Trigger Samples
The trigger binding takes two [arguments](https://github.com/Azure/azure-functions-sql-extension/blob/main/src/TriggerBinding/SqlTriggerAttribute.cs)
Expand Down
4 changes: 2 additions & 2 deletions src/Telemetry/Telemetry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -359,11 +359,11 @@ public enum TelemetryPropertyName
ErrorName,
ExceptionType,
HasIdentityColumn,
LeasesTableName,
QueryType,
ServerVersion,
Type,
UserFunctionId,
WorkerTableName,
}

/// <summary>
Expand All @@ -376,7 +376,7 @@ public enum TelemetryMeasureName
CommandDurationMs,
CreatedSchemaDurationMs,
CreateGlobalStateTableDurationMs,
CreateWorkerTableDurationMs,
CreateLeasesTableDurationMs,
DurationMs,
GetCaseSensitivityDurationMs,
GetChangesDurationMs,
Expand Down
78 changes: 39 additions & 39 deletions src/TriggerBinding/SqlTableChangeMonitor.cs

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions src/TriggerBinding/SqlTriggerConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ internal static class SqlTriggerConstants

public const string GlobalStateTableName = "[" + SchemaName + "].[GlobalState]";

public const string WorkerTableNameFormat = "[" + SchemaName + "].[Worker_{0}]";
public const string LeasesTableNameFormat = "[" + SchemaName + "].[Leases_{0}]";

public const string WorkerTableChangeVersionColumnName = "_az_func_ChangeVersion";
public const string WorkerTableAttemptCountColumnName = "_az_func_AttemptCount";
public const string WorkerTableLeaseExpirationTimeColumnName = "_az_func_LeaseExpirationTime";
public const string LeasesTableChangeVersionColumnName = "_az_func_ChangeVersion";
public const string LeasesTableAttemptCountColumnName = "_az_func_AttemptCount";
public const string LeasesTableLeaseExpirationTimeColumnName = "_az_func_LeaseExpirationTime";
}
}
46 changes: 23 additions & 23 deletions src/TriggerBinding/SqlTriggerListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,19 @@ public async Task StartAsync(CancellationToken cancellationToken)
IReadOnlyList<(string name, string type)> primaryKeyColumns = await this.GetPrimaryKeyColumnsAsync(connection, userTableId, cancellationToken);
IReadOnlyList<string> userTableColumns = await this.GetUserTableColumnsAsync(connection, userTableId, cancellationToken);

string workerTableName = string.Format(CultureInfo.InvariantCulture, SqlTriggerConstants.WorkerTableNameFormat, $"{this._userFunctionId}_{userTableId}");
this._logger.LogDebug($"Worker table name: '{workerTableName}'.");
this._telemetryProps[TelemetryPropertyName.WorkerTableName] = workerTableName;
string leasesTableName = string.Format(CultureInfo.InvariantCulture, SqlTriggerConstants.LeasesTableNameFormat, $"{this._userFunctionId}_{userTableId}");
this._logger.LogDebug($"leases table name: '{leasesTableName}'.");
this._telemetryProps[TelemetryPropertyName.LeasesTableName] = leasesTableName;

var transactionSw = Stopwatch.StartNew();
long createdSchemaDurationMs = 0L, createGlobalStateTableDurationMs = 0L, insertGlobalStateTableRowDurationMs = 0L, createWorkerTableDurationMs = 0L;
long createdSchemaDurationMs = 0L, createGlobalStateTableDurationMs = 0L, insertGlobalStateTableRowDurationMs = 0L, createLeasesTableDurationMs = 0L;

using (SqlTransaction transaction = connection.BeginTransaction(System.Data.IsolationLevel.RepeatableRead))
{
createdSchemaDurationMs = await CreateSchemaAsync(connection, transaction, cancellationToken);
createGlobalStateTableDurationMs = await CreateGlobalStateTableAsync(connection, transaction, cancellationToken);
insertGlobalStateTableRowDurationMs = await this.InsertGlobalStateTableRowAsync(connection, transaction, userTableId, cancellationToken);
createWorkerTableDurationMs = await CreateWorkerTableAsync(connection, transaction, workerTableName, primaryKeyColumns, cancellationToken);
createLeasesTableDurationMs = await CreateLeasesTableAsync(connection, transaction, leasesTableName, primaryKeyColumns, cancellationToken);
transaction.Commit();
}

Expand All @@ -127,7 +127,7 @@ public async Task StartAsync(CancellationToken cancellationToken)
userTableId,
this._userTable,
this._userFunctionId,
workerTableName,
leasesTableName,
userTableColumns,
primaryKeyColumns.Select(col => col.name).ToList(),
this._executor,
Expand All @@ -142,7 +142,7 @@ public async Task StartAsync(CancellationToken cancellationToken)
[TelemetryMeasureName.CreatedSchemaDurationMs] = createdSchemaDurationMs,
[TelemetryMeasureName.CreateGlobalStateTableDurationMs] = createGlobalStateTableDurationMs,
[TelemetryMeasureName.InsertGlobalStateTableRowDurationMs] = insertGlobalStateTableRowDurationMs,
[TelemetryMeasureName.CreateWorkerTableDurationMs] = createWorkerTableDurationMs,
[TelemetryMeasureName.CreateLeasesTableDurationMs] = createLeasesTableDurationMs,
[TelemetryMeasureName.TransactionDurationMs] = transactionSw.ElapsedMilliseconds,
};

Expand Down Expand Up @@ -213,7 +213,7 @@ private async Task<int> GetUserTableIdAsync(SqlConnection connection, Cancellati
/// Gets the names and types of primary key columns of the user table.
/// </summary>
/// <exception cref="InvalidOperationException">
/// Thrown if there are no primary key columns present in the user table or if their names conflict with columns in worker table.
/// Thrown if there are no primary key columns present in the user table or if their names conflict with columns in leases table.
/// </exception>
private async Task<IReadOnlyList<(string name, string type)>> GetPrimaryKeyColumnsAsync(SqlConnection connection, int userTableId, CancellationToken cancellationToken)
{
Expand Down Expand Up @@ -263,9 +263,9 @@ FROM sys.indexes AS i

string[] reservedColumnNames = new[]
{
SqlTriggerConstants.WorkerTableChangeVersionColumnName,
SqlTriggerConstants.WorkerTableAttemptCountColumnName,
SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName
SqlTriggerConstants.LeasesTableChangeVersionColumnName,
SqlTriggerConstants.LeasesTableAttemptCountColumnName,
SqlTriggerConstants.LeasesTableLeaseExpirationTimeColumnName
};

var conflictingColumnNames = primaryKeyColumns.Select(col => col.name).Intersect(reservedColumnNames).ToList();
Expand Down Expand Up @@ -326,7 +326,7 @@ FROM sys.columns AS c
}

/// <summary>
/// Creates the schema for global state table and worker tables, if it does not already exist.
/// Creates the schema for global state table and leases tables, if it does not already exist.
/// </summary>
private static async Task<long> CreateSchemaAsync(SqlConnection connection, SqlTransaction transaction, CancellationToken cancellationToken)
{
Expand Down Expand Up @@ -409,33 +409,33 @@ INSERT INTO {SqlTriggerConstants.GlobalStateTableName}
}

/// <summary>
/// Creates the worker table for the 'user function and table', if one does not already exist.
/// Creates the leases table for the 'user function and table', if one does not already exist.
/// </summary>
private static async Task<long> CreateWorkerTableAsync(
private static async Task<long> CreateLeasesTableAsync(
SqlConnection connection,
SqlTransaction transaction,
string workerTableName,
string leasesTableName,
IReadOnlyList<(string name, string type)> primaryKeyColumns,
CancellationToken cancellationToken)
{
string primaryKeysWithTypes = string.Join(", ", primaryKeyColumns.Select(col => $"{col.name.AsBracketQuotedString()} {col.type}"));
string primaryKeys = string.Join(", ", primaryKeyColumns.Select(col => col.name.AsBracketQuotedString()));

string createWorkerTableQuery = $@"
IF OBJECT_ID(N'{workerTableName}', 'U') IS NULL
CREATE TABLE {workerTableName} (
string createLeasesTableQuery = $@"
IF OBJECT_ID(N'{leasesTableName}', 'U') IS NULL
CREATE TABLE {leasesTableName} (
{primaryKeysWithTypes},
{SqlTriggerConstants.WorkerTableChangeVersionColumnName} bigint NOT NULL,
{SqlTriggerConstants.WorkerTableAttemptCountColumnName} int NOT NULL,
{SqlTriggerConstants.WorkerTableLeaseExpirationTimeColumnName} datetime2,
{SqlTriggerConstants.LeasesTableChangeVersionColumnName} bigint NOT NULL,
{SqlTriggerConstants.LeasesTableAttemptCountColumnName} int NOT NULL,
{SqlTriggerConstants.LeasesTableLeaseExpirationTimeColumnName} datetime2,
PRIMARY KEY ({primaryKeys})
);
";

using (var createWorkerTableCommand = new SqlCommand(createWorkerTableQuery, connection, transaction))
using (var createLeasesTableCommand = new SqlCommand(createLeasesTableQuery, connection, transaction))
{
var stopwatch = Stopwatch.StartNew();
await createWorkerTableCommand.ExecuteNonQueryAsync(cancellationToken);
await createLeasesTableCommand.ExecuteNonQueryAsync(cancellationToken);
return stopwatch.ElapsedMilliseconds;
}
}
Expand Down
2 changes: 1 addition & 1 deletion test/Integration/SqlTriggerBindingIntegrationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public void PrimaryKeyNotCreatedTriggerTest()

/// <summary>
/// Tests the error message when the user table contains one or more primary keys with names conflicting with
/// column names in the worker table.
/// column names in the leases table.
/// </summary>
[Fact]
public void ReservedPrimaryKeyColumnNamesTriggerTest()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public static class ReservedPrimaryKeyColumnNamesTrigger
{
/// <summary>
/// Used in verification of the error message when the user table contains one or more primary keys with names
/// conflicting with column names in the worker table.
/// conflicting with column names in the leases table.
/// </summary>
[FunctionName(nameof(ReservedPrimaryKeyColumnNamesTrigger))]
public static void Run(
Expand Down