From 95a7ec072da87dd7d1cb7ee912ee493861b45e95 Mon Sep 17 00:00:00 2001 From: Charles Gagnon Date: Wed, 21 Sep 2022 11:12:47 -0700 Subject: [PATCH 1/2] Add configuration for batch size/polling interval --- README.md | 18 ++++++++ src/TriggerBinding/SqlTableChangeMonitor.cs | 30 +++++++++---- src/TriggerBinding/SqlTriggerBinding.cs | 8 +++- .../SqlTriggerBindingProvider.cs | 4 +- src/TriggerBinding/SqlTriggerListener.cs | 9 +++- src/Utils.cs | 5 +++ test/.editorconfig | 3 +- test/Integration/IntegrationTestBase.cs | 7 ++- .../SqlTriggerBindingIntegrationTests.cs | 43 +++++++++++++++++++ .../ProductsTriggerWithValidation.cs | 33 ++++++++++++++ 10 files changed, 143 insertions(+), 17 deletions(-) create mode 100644 test/Integration/test-csharp/ProductsTriggerWithValidation.cs diff --git a/README.md b/README.md index 9fd3cb0d2..fb6166bac 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,10 @@ Azure SQL bindings for Azure Functions are supported for: - [Python functions](#python-functions) - [Input Binding Tutorial](#input-binding-tutorial-2) - [Output Binding Tutorial](#output-binding-tutorial-2) + - [Configuration](#configuration) + - [Trigger Binding Configuration](#trigger-binding-configuration) + - [Sql_Trigger_BatchSize](#sql_trigger_batchsize) + - [Sql_Trigger_PollingIntervalMs](#sql_trigger_pollingintervalms) - [More Samples](#more-samples) - [Input Binding](#input-binding) - [Query String](#query-string) @@ -554,6 +558,20 @@ Note: This tutorial requires that a SQL database is setup as shown in [Create a - Hit 'F5' to run your code. Click the link to upsert the output array values in your SQL table. Your upserted values should launch in the browser. - Congratulations! You have successfully created your first SQL output binding! Checkout [Output Binding](#Output-Binding) for more information on how to use it and explore on your own! +## Configuration + +This section goes over some of the configuration values you can use to customize the SQL bindings. See [How to Use Azure Function App Settings](https://learn.microsoft.com/azure/azure-functions/functions-how-to-use-azure-function-app-settings) to learn more. + +### Trigger Binding Configuration + +#### Sql_Trigger_BatchSize + +This controls the number of changes processed at once before being sent to the triggered function. + +#### Sql_Trigger_PollingIntervalMs + +This controls the delay in milliseconds between processing each batch of changes. + ## More Samples ### Input Binding diff --git a/src/TriggerBinding/SqlTableChangeMonitor.cs b/src/TriggerBinding/SqlTableChangeMonitor.cs index eee1a77bb..b6503dd2d 100644 --- a/src/TriggerBinding/SqlTableChangeMonitor.cs +++ b/src/TriggerBinding/SqlTableChangeMonitor.cs @@ -16,6 +16,7 @@ using Microsoft.Data.SqlClient; using Microsoft.Extensions.Logging; using Newtonsoft.Json; +using Microsoft.Extensions.Configuration; namespace Microsoft.Azure.WebJobs.Extensions.Sql { @@ -25,10 +26,9 @@ namespace Microsoft.Azure.WebJobs.Extensions.Sql /// POCO class representing the row in the user table internal sealed class SqlTableChangeMonitor : IDisposable { - public const int BatchSize = 10; - public const int PollingIntervalInSeconds = 5; public const int MaxAttemptCount = 5; - + private const string CONFIG_KEY_SQL_TRIGGER_BATCHSIZE = "Sql_Trigger_BatchSize"; + private const string CONFIG_KEY_SQL_TRIGGER_POLLINGINTERVALMS = "Sql_Trigger_PollingIntervalMs"; // Leases are held for approximately (LeaseRenewalIntervalInSeconds * MaxLeaseRenewalCount) seconds. It is // required to have at least one of (LeaseIntervalInSeconds / LeaseRenewalIntervalInSeconds) attempts to // renew the lease succeed to prevent it from expiring. @@ -46,6 +46,14 @@ internal sealed class SqlTableChangeMonitor : IDisposable private readonly IReadOnlyList _rowMatchConditions; private readonly ITriggeredFunctionExecutor _executor; private readonly ILogger _logger; + /// + /// Number of changes to process in each iteration of the loop + /// + private readonly int _batchSize = 10; + /// + /// Delay in ms between processing each batch of changes + /// + private readonly int _pollingIntervalInMs = 5000; private readonly CancellationTokenSource _cancellationTokenSourceCheckForChanges = new CancellationTokenSource(); private readonly CancellationTokenSource _cancellationTokenSourceRenewLeases = new CancellationTokenSource(); @@ -87,6 +95,7 @@ public SqlTableChangeMonitor( IReadOnlyList primaryKeyColumns, ITriggeredFunctionExecutor executor, ILogger logger, + IConfiguration configuration, IDictionary telemetryProps) { this._connectionString = !string.IsNullOrEmpty(connectionString) ? connectionString : throw new ArgumentNullException(nameof(connectionString)); @@ -99,9 +108,11 @@ public SqlTableChangeMonitor( this._logger = logger ?? throw new ArgumentNullException(nameof(logger)); this._userTableId = userTableId; - + // Check if there's config settings to override the default batch size/polling interval values + this._batchSize = configuration.GetValue(CONFIG_KEY_SQL_TRIGGER_BATCHSIZE) ?? this._batchSize; + this._pollingIntervalInMs = configuration.GetValue(CONFIG_KEY_SQL_TRIGGER_POLLINGINTERVALMS) ?? this._pollingIntervalInMs; // Prep search-conditions that will be used besides WHERE clause to match table rows. - this._rowMatchConditions = Enumerable.Range(0, BatchSize) + this._rowMatchConditions = Enumerable.Range(0, this._batchSize) .Select(rowIndex => string.Join(" AND ", this._primaryKeyColumns.Select((col, colIndex) => $"{col.AsBracketQuotedString()} = @{rowIndex}_{colIndex}"))) .ToList(); @@ -122,7 +133,7 @@ public void Dispose() } /// - /// Executed once every period. If the state of the change monitor is + /// Executed once every period. If the state of the change monitor is /// , then the method query the change/leases tables for changes on the /// user's table. If any are found, the state of the change monitor is transitioned to /// and the user's function is executed with the found changes. If the @@ -131,7 +142,7 @@ public void Dispose() /// private async Task RunChangeConsumptionLoopAsync() { - this._logger.LogDebugWithThreadId("Starting change consumption loop."); + this._logger.LogInformationWithThreadId($"Starting change consumption loop. BatchSize: {this._batchSize} PollingIntervalMs: {this._pollingIntervalInMs}"); try { @@ -153,7 +164,8 @@ private async Task RunChangeConsumptionLoopAsync() await this.ProcessTableChangesAsync(connection, token); } this._logger.LogDebugWithThreadId("END CheckingForChanges"); - await Task.Delay(TimeSpan.FromSeconds(PollingIntervalInSeconds), token); + this._logger.LogDebugWithThreadId($"Delaying for {this._pollingIntervalInMs}ms"); + await Task.Delay(TimeSpan.FromMilliseconds(this._pollingIntervalInMs), token); } } } @@ -666,7 +678,7 @@ private SqlCommand BuildGetChangesCommand(SqlConnection connection, SqlTransacti FROM {SqlTriggerConstants.GlobalStateTableName} WHERE UserFunctionID = '{this._userFunctionId}' AND UserTableID = {this._userTableId}; - SELECT TOP {BatchSize} + SELECT TOP {this._batchSize} {selectList}, c.SYS_CHANGE_VERSION, c.SYS_CHANGE_OPERATION, l.{SqlTriggerConstants.LeasesTableChangeVersionColumnName}, l.{SqlTriggerConstants.LeasesTableAttemptCountColumnName}, l.{SqlTriggerConstants.LeasesTableLeaseExpirationTimeColumnName} diff --git a/src/TriggerBinding/SqlTriggerBinding.cs b/src/TriggerBinding/SqlTriggerBinding.cs index b04fd95c4..841c8a79e 100644 --- a/src/TriggerBinding/SqlTriggerBinding.cs +++ b/src/TriggerBinding/SqlTriggerBinding.cs @@ -15,6 +15,7 @@ using Microsoft.Azure.WebJobs.Host.Protocols; using Microsoft.Azure.WebJobs.Host.Triggers; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Configuration; namespace Microsoft.Azure.WebJobs.Extensions.Sql { @@ -29,6 +30,7 @@ internal sealed class SqlTriggerBinding : ITriggerBinding private readonly ParameterInfo _parameter; private readonly IHostIdProvider _hostIdProvider; private readonly ILogger _logger; + private readonly IConfiguration _configuration; private static readonly IReadOnlyDictionary _emptyBindingContract = new Dictionary(); private static readonly IReadOnlyDictionary _emptyBindingData = new Dictionary(); @@ -41,13 +43,15 @@ internal sealed class SqlTriggerBinding : ITriggerBinding /// Trigger binding parameter information /// Provider of unique host identifier /// Facilitates logging of messages - public SqlTriggerBinding(string connectionString, string tableName, ParameterInfo parameter, IHostIdProvider hostIdProvider, ILogger logger) + /// Provides configuration values + public SqlTriggerBinding(string connectionString, string tableName, ParameterInfo parameter, IHostIdProvider hostIdProvider, ILogger logger, IConfiguration configuration) { this._connectionString = connectionString ?? throw new ArgumentNullException(nameof(connectionString)); this._tableName = tableName ?? throw new ArgumentNullException(nameof(tableName)); this._parameter = parameter ?? throw new ArgumentNullException(nameof(parameter)); this._hostIdProvider = hostIdProvider ?? throw new ArgumentNullException(nameof(hostIdProvider)); this._logger = logger ?? throw new ArgumentNullException(nameof(logger)); + this._configuration = configuration ?? throw new ArgumentNullException(nameof(configuration)); } /// @@ -68,7 +72,7 @@ public async Task CreateListenerAsync(ListenerFactoryContext context) _ = context ?? throw new ArgumentNullException(nameof(context), "Missing listener context"); string userFunctionId = await this.GetUserFunctionIdAsync(); - return new SqlTriggerListener(this._connectionString, this._tableName, userFunctionId, context.Executor, this._logger); + return new SqlTriggerListener(this._connectionString, this._tableName, userFunctionId, context.Executor, this._logger, this._configuration); } public ParameterDescriptor ToParameterDescriptor() diff --git a/src/TriggerBinding/SqlTriggerBindingProvider.cs b/src/TriggerBinding/SqlTriggerBindingProvider.cs index b35ed3f01..17afa4cc3 100644 --- a/src/TriggerBinding/SqlTriggerBindingProvider.cs +++ b/src/TriggerBinding/SqlTriggerBindingProvider.cs @@ -78,10 +78,10 @@ public Task TryCreateAsync(TriggerBindingProviderContext contex Type userType = parameter.ParameterType.GetGenericArguments()[0].GetGenericArguments()[0]; Type bindingType = typeof(SqlTriggerBinding<>).MakeGenericType(userType); - var constructorParameterTypes = new Type[] { typeof(string), typeof(string), typeof(ParameterInfo), typeof(IHostIdProvider), typeof(ILogger) }; + var constructorParameterTypes = new Type[] { typeof(string), typeof(string), typeof(ParameterInfo), typeof(IHostIdProvider), typeof(ILogger), typeof(IConfiguration) }; ConstructorInfo bindingConstructor = bindingType.GetConstructor(constructorParameterTypes); - object[] constructorParameterValues = new object[] { connectionString, attribute.TableName, parameter, this._hostIdProvider, this._logger }; + object[] constructorParameterValues = new object[] { connectionString, attribute.TableName, parameter, this._hostIdProvider, this._logger, this._configuration }; var triggerBinding = (ITriggerBinding)bindingConstructor.Invoke(constructorParameterValues); return Task.FromResult(triggerBinding); diff --git a/src/TriggerBinding/SqlTriggerListener.cs b/src/TriggerBinding/SqlTriggerListener.cs index 377aba054..d1d95bb7d 100644 --- a/src/TriggerBinding/SqlTriggerListener.cs +++ b/src/TriggerBinding/SqlTriggerListener.cs @@ -14,6 +14,7 @@ using Microsoft.Azure.WebJobs.Host.Listeners; using Microsoft.Data.SqlClient; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Configuration; namespace Microsoft.Azure.WebJobs.Extensions.Sql { @@ -34,6 +35,7 @@ internal sealed class SqlTriggerListener : IListener private readonly string _userFunctionId; private readonly ITriggeredFunctionExecutor _executor; private readonly ILogger _logger; + private readonly IConfiguration _configuration; private readonly IDictionary _telemetryProps = new Dictionary(); @@ -48,13 +50,15 @@ internal sealed class SqlTriggerListener : IListener /// Unique identifier for the user function /// Defines contract for triggering user function /// Facilitates logging of messages - public SqlTriggerListener(string connectionString, string tableName, string userFunctionId, ITriggeredFunctionExecutor executor, ILogger logger) + /// Provides configuration values + public SqlTriggerListener(string connectionString, string tableName, string userFunctionId, ITriggeredFunctionExecutor executor, ILogger logger, IConfiguration configuration) { this._connectionString = !string.IsNullOrEmpty(connectionString) ? connectionString : throw new ArgumentNullException(nameof(connectionString)); this._userTable = !string.IsNullOrEmpty(tableName) ? new SqlObject(tableName) : throw new ArgumentNullException(nameof(tableName)); this._userFunctionId = !string.IsNullOrEmpty(userFunctionId) ? userFunctionId : throw new ArgumentNullException(nameof(userFunctionId)); this._executor = executor ?? throw new ArgumentNullException(nameof(executor)); this._logger = logger ?? throw new ArgumentNullException(nameof(logger)); + this._configuration = configuration ?? throw new ArgumentNullException(nameof(configuration)); } public void Cancel() @@ -122,6 +126,7 @@ public async Task StartAsync(CancellationToken cancellationToken) primaryKeyColumns.Select(col => col.name).ToList(), this._executor, this._logger, + this._configuration, this._telemetryProps); this._listenerState = ListenerStarted; @@ -467,7 +472,7 @@ PRIMARY KEY ({primaryKeys}) } /// - /// Clears the current telemetry property dictionary and initializes the default initial properties. + /// Clears the current telemetry property dictionary and initializes the default initial properties. /// private void InitializeTelemetryProps() { diff --git a/src/Utils.cs b/src/Utils.cs index d05d01a9a..0bff69855 100644 --- a/src/Utils.cs +++ b/src/Utils.cs @@ -99,5 +99,10 @@ public static void LogDebugWithThreadId(this ILogger logger, string message, par { logger.LogDebug($"TID:{Environment.CurrentManagedThreadId} {message}", args); } + + public static void LogInformationWithThreadId(this ILogger logger, string message, params object[] args) + { + logger.LogInformation($"TID:{Environment.CurrentManagedThreadId} {message}", args); + } } } diff --git a/test/.editorconfig b/test/.editorconfig index 4386dd31a..0082aa2a0 100644 --- a/test/.editorconfig +++ b/test/.editorconfig @@ -5,4 +5,5 @@ # Disabled dotnet_diagnostic.CA1309.severity = silent # Use ordinal StringComparison - this isn't important for tests and just adds clutter dotnet_diagnostic.CA1305.severity = silent # Specify IFormatProvider - this isn't important for tests and just adds clutter -dotnet_diagnostic.CA1707.severity = silent # Identifiers should not contain underscores - this helps make test names more readable \ No newline at end of file +dotnet_diagnostic.CA1707.severity = silent # Identifiers should not contain underscores - this helps make test names more readable +dotnet_diagnostic.CA2201.severity = silent # Do not raise reserved exception types - tests can throw whatever they want \ No newline at end of file diff --git a/test/Integration/IntegrationTestBase.cs b/test/Integration/IntegrationTestBase.cs index 665fbf3fa..63cdac911 100644 --- a/test/Integration/IntegrationTestBase.cs +++ b/test/Integration/IntegrationTestBase.cs @@ -17,6 +17,7 @@ using Microsoft.Azure.WebJobs.Extensions.Sql.Samples.Common; using Microsoft.AspNetCore.WebUtilities; using System.Collections.Generic; +using System.Linq; namespace Microsoft.Azure.WebJobs.Extensions.Sql.Tests.Integration { @@ -175,7 +176,7 @@ protected void StartAzurite() /// - The functionName is different than its route.
/// - You can start multiple functions by passing in a space-separated list of function names.
/// - protected void StartFunctionHost(string functionName, SupportedLanguages language, bool useTestFolder = false, DataReceivedEventHandler customOutputHandler = null) + protected void StartFunctionHost(string functionName, SupportedLanguages language, bool useTestFolder = false, DataReceivedEventHandler customOutputHandler = null, IDictionary environmentVariables = null) { string workingDirectory = useTestFolder ? GetPathToBin() : Path.Combine(GetPathToBin(), "SqlExtensionSamples", Enum.GetName(typeof(SupportedLanguages), language)); if (!Directory.Exists(workingDirectory)) @@ -194,6 +195,10 @@ protected void StartFunctionHost(string functionName, SupportedLanguages languag RedirectStandardError = true, UseShellExecute = false }; + if (environmentVariables != null) + { + environmentVariables.ToList().ForEach(ev => startInfo.EnvironmentVariables[ev.Key] = ev.Value); + } this.LogOutput($"Starting {startInfo.FileName} {startInfo.Arguments} in {startInfo.WorkingDirectory}"); this.FunctionHost = new Process { diff --git a/test/Integration/SqlTriggerBindingIntegrationTests.cs b/test/Integration/SqlTriggerBindingIntegrationTests.cs index ccea60489..7eca03418 100644 --- a/test/Integration/SqlTriggerBindingIntegrationTests.cs +++ b/test/Integration/SqlTriggerBindingIntegrationTests.cs @@ -55,6 +55,49 @@ public async Task SingleOperationTriggerTest() changes.Clear(); } + /// + /// Verifies that manually setting the batch size correctly changes the number of changes processed at once + /// + [Fact] + public async Task BatchSizeOverrideTriggerTest() + { + this.EnableChangeTrackingForTable("Products"); + this.StartFunctionHost(nameof(ProductsTriggerWithValidation), Common.SupportedLanguages.CSharp, true, environmentVariables: new Dictionary() { + { "TEST_EXPECTED_BATCH_SIZE", "20" }, + { "Sql_Trigger_BatchSize", "20" } + }); + + var changes = new List>(); + this.MonitorProductChanges(changes, "SQL Changes: "); + + // Considering the polling interval of 5 seconds and batch-size of 20, it should take around 10 seconds to + // process 40 insert operations. + this.InsertProducts(1, 40); + await Task.Delay(TimeSpan.FromSeconds(12)); + ValidateProductChanges(changes, 1, 40, SqlChangeOperation.Insert, id => $"Product {id}", id => id * 100); + } + + /// + /// Verifies that manually setting the polling interval correctly changes the delay between processing each batch of changes + /// + [Fact] + public async Task PollingIntervalOverrideTriggerTest() + { + this.EnableChangeTrackingForTable("Products"); + this.StartFunctionHost(nameof(ProductsTriggerWithValidation), Common.SupportedLanguages.CSharp, true, environmentVariables: new Dictionary() { + { "Sql_Trigger_PollingIntervalMs", "100" } + }); + + var changes = new List>(); + this.MonitorProductChanges(changes, "SQL Changes: "); + + // Considering the polling interval of 100ms and batch-size of 10, it should take around .5 second to + // process 50 insert operations. + this.InsertProducts(1, 50); + await Task.Delay(TimeSpan.FromSeconds(1)); + ValidateProductChanges(changes, 1, 50, SqlChangeOperation.Insert, id => $"Product {id}", id => id * 100); + } + /// /// Verifies that if several changes have happened to the table row since last invocation, then a single net diff --git a/test/Integration/test-csharp/ProductsTriggerWithValidation.cs b/test/Integration/test-csharp/ProductsTriggerWithValidation.cs new file mode 100644 index 000000000..7fca5cbf9 --- /dev/null +++ b/test/Integration/test-csharp/ProductsTriggerWithValidation.cs @@ -0,0 +1,33 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using Microsoft.Azure.WebJobs.Extensions.Sql.Samples.Common; +using Microsoft.Extensions.Logging; +using Newtonsoft.Json; + +namespace Microsoft.Azure.WebJobs.Extensions.Sql.Tests.Integration +{ + public static class ProductsTriggerWithValidation + { + /// + /// Simple trigger function with additional logic to allow for verifying that the expected number + /// of changes was recieved in each batch. + /// + [FunctionName(nameof(ProductsTriggerWithValidation))] + public static void Run( + [SqlTrigger("[dbo].[Products]", ConnectionStringSetting = "SqlConnectionString")] + IReadOnlyList> changes, + ILogger logger) + { + string expectedBatchSize = Environment.GetEnvironmentVariable("TEST_EXPECTED_BATCH_SIZE"); + if (!string.IsNullOrEmpty(expectedBatchSize) && int.Parse(expectedBatchSize) != changes.Count) + { + throw new Exception($"Invalid batch size, got {changes.Count} changes but expected {expectedBatchSize}"); + } + // The output is used to inspect the trigger binding parameter in test methods. + logger.LogInformation("SQL Changes: " + JsonConvert.SerializeObject(changes)); + } + } +} From 7cf130e87a621ef20535997762f78b3db24d7848 Mon Sep 17 00:00:00 2001 From: Charles Gagnon Date: Thu, 22 Sep 2022 09:09:26 -0700 Subject: [PATCH 2/2] PR comments --- src/TriggerBinding/SqlTableChangeMonitor.cs | 33 +++++++++++++-------- src/TriggerBinding/SqlTriggerConstants.cs | 3 ++ 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/src/TriggerBinding/SqlTableChangeMonitor.cs b/src/TriggerBinding/SqlTableChangeMonitor.cs index b6503dd2d..3d161a708 100644 --- a/src/TriggerBinding/SqlTableChangeMonitor.cs +++ b/src/TriggerBinding/SqlTableChangeMonitor.cs @@ -26,16 +26,25 @@ namespace Microsoft.Azure.WebJobs.Extensions.Sql /// POCO class representing the row in the user table internal sealed class SqlTableChangeMonitor : IDisposable { - public const int MaxAttemptCount = 5; - private const string CONFIG_KEY_SQL_TRIGGER_BATCHSIZE = "Sql_Trigger_BatchSize"; - private const string CONFIG_KEY_SQL_TRIGGER_POLLINGINTERVALMS = "Sql_Trigger_PollingIntervalMs"; - // Leases are held for approximately (LeaseRenewalIntervalInSeconds * MaxLeaseRenewalCount) seconds. It is + #region Constants + /// + /// The maximum number of times we'll attempt to process a change before giving up + /// + private const int MaxChangeProcessAttemptCount = 5; + /// + /// The maximum number of times that we'll attempt to renew a lease be + /// + /// + /// Leases are held for approximately (LeaseRenewalIntervalInSeconds * MaxLeaseRenewalCount) seconds. It is // required to have at least one of (LeaseIntervalInSeconds / LeaseRenewalIntervalInSeconds) attempts to // renew the lease succeed to prevent it from expiring. - public const int MaxLeaseRenewalCount = 10; - public const int LeaseIntervalInSeconds = 60; - public const int LeaseRenewalIntervalInSeconds = 15; - public const int MaxRetryReleaseLeases = 3; + // + private const int MaxLeaseRenewalCount = 10; + private const int LeaseIntervalInSeconds = 60; + private const int LeaseRenewalIntervalInSeconds = 15; + private const int MaxRetryReleaseLeases = 3; + #endregion Constants + private readonly string _connectionString; private readonly int _userTableId; private readonly SqlObject _userTable; @@ -109,8 +118,8 @@ public SqlTableChangeMonitor( this._userTableId = userTableId; // Check if there's config settings to override the default batch size/polling interval values - this._batchSize = configuration.GetValue(CONFIG_KEY_SQL_TRIGGER_BATCHSIZE) ?? this._batchSize; - this._pollingIntervalInMs = configuration.GetValue(CONFIG_KEY_SQL_TRIGGER_POLLINGINTERVALMS) ?? this._pollingIntervalInMs; + this._batchSize = configuration.GetValue(SqlTriggerConstants.ConfigKey_SqlTrigger_BatchSize) ?? this._batchSize; + this._pollingIntervalInMs = configuration.GetValue(SqlTriggerConstants.ConfigKey_SqlTrigger_PollingInterval) ?? this._pollingIntervalInMs; // Prep search-conditions that will be used besides WHERE clause to match table rows. this._rowMatchConditions = Enumerable.Range(0, this._batchSize) .Select(rowIndex => string.Join(" AND ", this._primaryKeyColumns.Select((col, colIndex) => $"{col.AsBracketQuotedString()} = @{rowIndex}_{colIndex}"))) @@ -689,7 +698,7 @@ LEFT OUTER JOIN {this._userTable.BracketQuotedFullName} AS u ON {userTableJoinCo (l.{SqlTriggerConstants.LeasesTableLeaseExpirationTimeColumnName} IS NULL AND (l.{SqlTriggerConstants.LeasesTableChangeVersionColumnName} IS NULL OR l.{SqlTriggerConstants.LeasesTableChangeVersionColumnName} < c.SYS_CHANGE_VERSION) OR l.{SqlTriggerConstants.LeasesTableLeaseExpirationTimeColumnName} < SYSDATETIME()) AND - (l.{SqlTriggerConstants.LeasesTableAttemptCountColumnName} IS NULL OR l.{SqlTriggerConstants.LeasesTableAttemptCountColumnName} < {MaxAttemptCount}) + (l.{SqlTriggerConstants.LeasesTableAttemptCountColumnName} IS NULL OR l.{SqlTriggerConstants.LeasesTableAttemptCountColumnName} < {MaxChangeProcessAttemptCount}) ORDER BY c.SYS_CHANGE_VERSION ASC; "; @@ -810,7 +819,7 @@ FROM CHANGETABLE(CHANGES {this._userTable.BracketQuotedFullName}, @current_last_ ((l.{SqlTriggerConstants.LeasesTableChangeVersionColumnName} IS NULL OR l.{SqlTriggerConstants.LeasesTableChangeVersionColumnName} != c.SYS_CHANGE_VERSION OR l.{SqlTriggerConstants.LeasesTableLeaseExpirationTimeColumnName} IS NOT NULL) AND - (l.{SqlTriggerConstants.LeasesTableAttemptCountColumnName} IS NULL OR l.{SqlTriggerConstants.LeasesTableAttemptCountColumnName} < {MaxAttemptCount}))) AS Changes + (l.{SqlTriggerConstants.LeasesTableAttemptCountColumnName} IS NULL OR l.{SqlTriggerConstants.LeasesTableAttemptCountColumnName} < {MaxChangeProcessAttemptCount}))) AS Changes IF @unprocessed_changes = 0 AND @current_last_sync_version < {newLastSyncVersion} BEGIN diff --git a/src/TriggerBinding/SqlTriggerConstants.cs b/src/TriggerBinding/SqlTriggerConstants.cs index 2f37d866f..8c627202d 100644 --- a/src/TriggerBinding/SqlTriggerConstants.cs +++ b/src/TriggerBinding/SqlTriggerConstants.cs @@ -14,5 +14,8 @@ internal static class SqlTriggerConstants public const string LeasesTableChangeVersionColumnName = "_az_func_ChangeVersion"; public const string LeasesTableAttemptCountColumnName = "_az_func_AttemptCount"; public const string LeasesTableLeaseExpirationTimeColumnName = "_az_func_LeaseExpirationTime"; + + public const string ConfigKey_SqlTrigger_BatchSize = "Sql_Trigger_BatchSize"; + public const string ConfigKey_SqlTrigger_PollingInterval = "Sql_Trigger_PollingIntervalMs"; } } \ No newline at end of file