diff --git a/docs/BindingsOverview.md b/docs/BindingsOverview.md index e4d967e8f..f9ceffd9d 100644 --- a/docs/BindingsOverview.md +++ b/docs/BindingsOverview.md @@ -18,7 +18,7 @@ - [az\_func.GlobalState](#az_funcglobalstate) - [az\_func.Leases\_\*](#az_funcleases_) - [Configuration for Trigger Bindings](#configuration-for-trigger-bindings) - - [Sql\_Trigger\_BatchSize](#sql_trigger_batchsize) + - [Sql\_Trigger\_MaxBatchSize](#sql_trigger_maxbatchsize) - [Sql\_Trigger\_PollingIntervalMs](#sql_trigger_pollingintervalms) - [Sql\_Trigger\_MaxChangesPerWorker](#sql_trigger_maxchangesperworker) - [Scaling for Trigger Bindings](#scaling-for-trigger-bindings) @@ -153,9 +153,9 @@ A row is created for every row in the target table that is modified. These are t This section goes over some of the configuration values you can use to customize SQL trigger bindings. See [How to Use Azure Function App Settings](https://learn.microsoft.com/azure/azure-functions/functions-how-to-use-azure-function-app-settings) to learn more. -#### Sql_Trigger_BatchSize +#### Sql_Trigger_MaxBatchSize -This controls the number of changes processed at once before being sent to the triggered function. +This controls the maximum number of changes sent to the function during each iteration of the change processing loop. #### Sql_Trigger_PollingIntervalMs diff --git a/performance/SqlTriggerPerformance_BatchOverride.cs b/performance/SqlTriggerPerformance_BatchOverride.cs index 803774d29..3a8f3fa19 100644 --- a/performance/SqlTriggerPerformance_BatchOverride.cs +++ b/performance/SqlTriggerPerformance_BatchOverride.cs @@ -14,7 +14,7 @@ public class SqlTriggerBindingPerformance_BatchOverride : SqlTriggerBindingPerfo { [Params(100, 1000)] - public int BatchSize; + public int MaxBatchSize; [GlobalSetup] public void GlobalSetup() @@ -24,7 +24,7 @@ public void GlobalSetup() nameof(ProductsTrigger), SupportedLanguages.CSharp, environmentVariables: new Dictionary() { - { "Sql_Trigger_BatchSize", this.BatchSize.ToString() } + { "Sql_Trigger_MaxBatchSize", this.MaxBatchSize.ToString() } }); } @@ -35,7 +35,7 @@ public void GlobalSetup() [Arguments(5)] public async Task Run(double numBatches) { - int count = (int)(numBatches * this.BatchSize); + int count = (int)(numBatches * this.MaxBatchSize); await this.WaitForProductChanges( 1, count, @@ -43,7 +43,7 @@ await this.WaitForProductChanges( () => { this.InsertProducts(1, count); return Task.CompletedTask; }, id => $"Product {id}", id => id * 100, - this.GetBatchProcessingTimeout(1, count, batchSize: this.BatchSize)); + this.GetBatchProcessingTimeout(1, count, maxBatchSize: this.MaxBatchSize)); } } } \ No newline at end of file diff --git a/performance/SqlTriggerPerformance_Overrides.cs b/performance/SqlTriggerPerformance_Overrides.cs index 4728778e8..b4cef58c3 100644 --- a/performance/SqlTriggerPerformance_Overrides.cs +++ b/performance/SqlTriggerPerformance_Overrides.cs @@ -16,7 +16,7 @@ public class SqlTriggerPerformance_Overrides : SqlTriggerBindingPerformanceTestB public int PollingIntervalMs; [Params(500, 2000)] - public int BatchSize; + public int MaxBatchSize; [GlobalSetup] public void GlobalSetup() @@ -26,7 +26,7 @@ public void GlobalSetup() nameof(ProductsTrigger), SupportedLanguages.CSharp, environmentVariables: new Dictionary() { - { "Sql_Trigger_BatchSize", this.BatchSize.ToString() }, + { "Sql_Trigger_MaxBatchSize", this.MaxBatchSize.ToString() }, { "Sql_Trigger_PollingIntervalMs", this.PollingIntervalMs.ToString() } }); } @@ -37,7 +37,7 @@ public void GlobalSetup() [Arguments(5)] public async Task Run(double numBatches) { - int count = (int)(numBatches * this.BatchSize); + int count = (int)(numBatches * this.MaxBatchSize); await this.WaitForProductChanges( 1, count, @@ -45,7 +45,7 @@ await this.WaitForProductChanges( () => { this.InsertProducts(1, count); return Task.CompletedTask; }, id => $"Product {id}", id => id * 100, - this.GetBatchProcessingTimeout(1, count, batchSize: this.BatchSize)); + this.GetBatchProcessingTimeout(1, count, maxBatchSize: this.MaxBatchSize)); } } } \ No newline at end of file diff --git a/performance/SqlTriggerPerformance_PollingIntervalOverride.cs b/performance/SqlTriggerPerformance_PollingIntervalOverride.cs index 2d0bc0b68..4554d3aa0 100644 --- a/performance/SqlTriggerPerformance_PollingIntervalOverride.cs +++ b/performance/SqlTriggerPerformance_PollingIntervalOverride.cs @@ -30,7 +30,7 @@ public void GlobalSetup() [Benchmark] public async Task Run() { - int count = SqlTableChangeMonitor.DefaultBatchSize * 2; + int count = SqlTableChangeMonitor.DefaultMaxBatchSize * 2; await this.WaitForProductChanges( 1, count, diff --git a/src/Telemetry/Telemetry.cs b/src/Telemetry/Telemetry.cs index ac69f00d4..b3124fadd 100644 --- a/src/Telemetry/Telemetry.cs +++ b/src/Telemetry/Telemetry.cs @@ -350,7 +350,7 @@ public enum TelemetryPropertyName ErrorCode, ErrorName, HasIdentityColumn, - HasConfiguredBatchSize, + HasConfiguredMaxBatchSize, HasConfiguredMaxChangesPerWorker, HasConfiguredPollingInterval, LeasesTableName, @@ -383,6 +383,7 @@ public enum TelemetryMeasureName GetPrimaryKeysDurationMs, GetUnprocessedChangesDurationMs, InsertGlobalStateTableRowDurationMs, + MaxBatchSize, MaxChangesPerWorker, NumRows, PollingIntervalMs, diff --git a/src/TriggerBinding/SqlTableChangeMonitor.cs b/src/TriggerBinding/SqlTableChangeMonitor.cs index b801cbd26..fe6dccf8e 100644 --- a/src/TriggerBinding/SqlTableChangeMonitor.cs +++ b/src/TriggerBinding/SqlTableChangeMonitor.cs @@ -45,7 +45,7 @@ internal sealed class SqlTableChangeMonitor : IDisposable private const int LeaseRenewalIntervalInSeconds = 15; private const int MaxRetryReleaseLeases = 3; - public const int DefaultBatchSize = 100; + public const int DefaultMaxBatchSize = 100; public const int DefaultPollingIntervalMs = 1000; #endregion Constants @@ -60,9 +60,9 @@ internal sealed class SqlTableChangeMonitor : IDisposable private readonly ITriggeredFunctionExecutor _executor; private readonly ILogger _logger; /// - /// Number of changes to process in each iteration of the loop + /// Maximum number of changes to process in each iteration of the loop /// - private readonly int _batchSize = DefaultBatchSize; + private readonly int _maxBatchSize = DefaultMaxBatchSize; /// /// Delay in ms between processing each batch of changes /// @@ -131,13 +131,18 @@ public SqlTableChangeMonitor( this._userTableId = userTableId; this._telemetryProps = telemetryProps ?? new Dictionary(); - // Check if there's config settings to override the default batch size/polling interval values - int? configuredBatchSize = configuration.GetValue(ConfigKey_SqlTrigger_BatchSize); + // Check if there's config settings to override the default max batch size/polling interval values + int? configuredMaxBatchSize = configuration.GetValue(ConfigKey_SqlTrigger_MaxBatchSize); + // Fall back to original value for backwards compat if the new value isn't specified + if (configuredMaxBatchSize == null) + { + configuredMaxBatchSize = configuration.GetValue(ConfigKey_SqlTrigger_BatchSize); + } int? configuredPollingInterval = configuration.GetValue(ConfigKey_SqlTrigger_PollingInterval); - this._batchSize = configuredBatchSize ?? this._batchSize; - if (this._batchSize <= 0) + this._maxBatchSize = configuredMaxBatchSize ?? this._maxBatchSize; + if (this._maxBatchSize <= 0) { - throw new InvalidOperationException($"Invalid value for configuration setting '{ConfigKey_SqlTrigger_BatchSize}'. Ensure that the value is a positive integer."); + throw new InvalidOperationException($"Invalid value for configuration setting '{ConfigKey_SqlTrigger_MaxBatchSize}'. Ensure that the value is a positive integer."); } this._pollingIntervalInMs = configuredPollingInterval ?? this._pollingIntervalInMs; if (this._pollingIntervalInMs <= 0) @@ -147,17 +152,17 @@ public SqlTableChangeMonitor( TelemetryInstance.TrackEvent( TelemetryEventName.TriggerMonitorStart, new Dictionary(telemetryProps) { - { TelemetryPropertyName.HasConfiguredBatchSize, (configuredBatchSize != null).ToString() }, + { TelemetryPropertyName.HasConfiguredMaxBatchSize, (configuredMaxBatchSize != null).ToString() }, { TelemetryPropertyName.HasConfiguredPollingInterval, (configuredPollingInterval != null).ToString() }, }, new Dictionary() { - { TelemetryMeasureName.BatchSize, this._batchSize }, + { TelemetryMeasureName.MaxBatchSize, this._maxBatchSize }, { TelemetryMeasureName.PollingIntervalMs, this._pollingIntervalInMs } } ); // Prep search-conditions that will be used besides WHERE clause to match table rows. - this._rowMatchConditions = Enumerable.Range(0, this._batchSize) + this._rowMatchConditions = Enumerable.Range(0, this._maxBatchSize) .Select(rowIndex => string.Join(" AND ", this._primaryKeyColumns.Select((col, colIndex) => $"{col.name.AsBracketQuotedString()} = @{rowIndex}_{colIndex}"))) .ToList(); @@ -253,7 +258,7 @@ public async Task GetUnprocessedChangeCountAsync() /// private async Task RunChangeConsumptionLoopAsync() { - this._logger.LogInformationWithThreadId($"Starting change consumption loop. BatchSize: {this._batchSize} PollingIntervalMs: {this._pollingIntervalInMs}"); + this._logger.LogInformationWithThreadId($"Starting change consumption loop. MaxBatchSize: {this._maxBatchSize} PollingIntervalMs: {this._pollingIntervalInMs}"); try { @@ -875,7 +880,7 @@ private SqlCommand BuildGetChangesCommand(SqlConnection connection, SqlTransacti FROM {GlobalStateTableName} WHERE UserFunctionID = '{this._userFunctionId}' AND UserTableID = {this._userTableId}; - SELECT TOP {this._batchSize} + SELECT TOP {this._maxBatchSize} {selectList}, c.{SysChangeVersionColumnName}, c.SYS_CHANGE_OPERATION, diff --git a/src/TriggerBinding/SqlTriggerConstants.cs b/src/TriggerBinding/SqlTriggerConstants.cs index 3b9cb47eb..276381b4a 100644 --- a/src/TriggerBinding/SqlTriggerConstants.cs +++ b/src/TriggerBinding/SqlTriggerConstants.cs @@ -26,7 +26,11 @@ internal static class SqlTriggerConstants LeasesTableLeaseExpirationTimeColumnName }; + /// + /// Deprecated config value for MaxBatchSize, kept for backwards compat reasons + /// public const string ConfigKey_SqlTrigger_BatchSize = "Sql_Trigger_BatchSize"; + public const string ConfigKey_SqlTrigger_MaxBatchSize = "Sql_Trigger_MaxBatchSize"; public const string ConfigKey_SqlTrigger_PollingInterval = "Sql_Trigger_PollingIntervalMs"; public const string ConfigKey_SqlTrigger_MaxChangesPerWorker = "Sql_Trigger_MaxChangesPerWorker"; diff --git a/test/Integration/SqlTriggerBindingIntegrationTestBase.cs b/test/Integration/SqlTriggerBindingIntegrationTestBase.cs index 803df33ed..49e51b3c2 100644 --- a/test/Integration/SqlTriggerBindingIntegrationTestBase.cs +++ b/test/Integration/SqlTriggerBindingIntegrationTestBase.cs @@ -181,17 +181,17 @@ void OutputHandler(object sender, DataReceivedEventArgs e) /// /// Gets a timeout value to use when processing the given number of changes, based on the - /// default batch size and polling interval. + /// default max batch size and polling interval. /// /// The first ID in the batch to process /// The last ID in the batch to process - /// The batch size if different than the default batch size + /// The max batch size if different than the default max batch size /// The polling interval in ms if different than the default polling interval /// - public int GetBatchProcessingTimeout(int firstId, int lastId, int batchSize = SqlTableChangeMonitor.DefaultBatchSize, int pollingIntervalMs = SqlTableChangeMonitor.DefaultPollingIntervalMs) + public int GetBatchProcessingTimeout(int firstId, int lastId, int maxBatchSize = SqlTableChangeMonitor.DefaultMaxBatchSize, int pollingIntervalMs = SqlTableChangeMonitor.DefaultPollingIntervalMs) { int changesToProcess = lastId - firstId + 1; - int calculatedTimeout = (int)(Math.Ceiling((double)changesToProcess / batchSize // The number of batches to process + int calculatedTimeout = (int)(Math.Ceiling((double)changesToProcess / maxBatchSize // The number of batches to process / this.FunctionHostList.Count) // The number of function host processes * pollingIntervalMs // The length to process each batch * 2); // Double to add buffer time for processing results & writing log messages diff --git a/test/Integration/SqlTriggerBindingIntegrationTests.cs b/test/Integration/SqlTriggerBindingIntegrationTests.cs index a04e2d388..06cf487c9 100644 --- a/test/Integration/SqlTriggerBindingIntegrationTests.cs +++ b/test/Integration/SqlTriggerBindingIntegrationTests.cs @@ -73,33 +73,34 @@ await this.WaitForProductChanges( } /// - /// Verifies that manually setting the batch size correctly changes the number of changes processed at once + /// Verifies that manually setting the batch size using the original config var correctly changes the + /// number of changes processed at once. /// [Fact] public async Task BatchSizeOverrideTriggerTest() { // Use enough items to require 4 batches to be processed but then - // set the batch size to the same value so they can all be processed in one + // set the max batch size to the same value so they can all be processed in one // batch. The test will only wait for ~1 batch worth of time so will timeout - // if the batch size isn't actually changed - const int batchSize = SqlTableChangeMonitor.DefaultBatchSize * 4; + // if the max batch size isn't actually changed + const int maxBatchSize = SqlTableChangeMonitor.DefaultMaxBatchSize * 4; const int firstId = 1; - const int lastId = batchSize; + const int lastId = maxBatchSize; this.SetChangeTrackingForTable("Products"); var taskCompletionSource = new TaskCompletionSource(); DataReceivedEventHandler handler = TestUtils.CreateOutputReceievedHandler( taskCompletionSource, - @"Starting change consumption loop. BatchSize: (\d*) PollingIntervalMs: \d*", - "BatchSize", - batchSize.ToString()); + @"Starting change consumption loop. MaxBatchSize: (\d*) PollingIntervalMs: \d*", + "MaxBatchSize", + maxBatchSize.ToString()); this.StartFunctionHost( nameof(ProductsTriggerWithValidation), SupportedLanguages.CSharp, useTestFolder: true, customOutputHandler: handler, environmentVariables: new Dictionary() { - { "TEST_EXPECTED_BATCH_SIZE", batchSize.ToString() }, - { "Sql_Trigger_BatchSize", batchSize.ToString() } + { "TEST_EXPECTED_MAX_BATCH_SIZE", maxBatchSize.ToString() }, + { "Sql_Trigger_BatchSize", maxBatchSize.ToString() } // Use old BatchSize config } ); @@ -110,8 +111,50 @@ await this.WaitForProductChanges( () => { this.InsertProducts(firstId, lastId); return Task.CompletedTask; }, id => $"Product {id}", id => id * 100, - this.GetBatchProcessingTimeout(firstId, lastId, batchSize: batchSize)); - await taskCompletionSource.Task.TimeoutAfter(TimeSpan.FromSeconds(5000), "Timed out waiting for BatchSize configuration message"); + this.GetBatchProcessingTimeout(firstId, lastId, maxBatchSize: maxBatchSize)); + await taskCompletionSource.Task.TimeoutAfter(TimeSpan.FromSeconds(5), "Timed out waiting for MaxBatchSize configuration message"); + } + + /// + /// Verifies that manually setting the max batch size correctly changes the number of changes processed at once + /// + [Fact] + public async Task MaxBatchSizeOverrideTriggerTest() + { + // Use enough items to require 4 batches to be processed but then + // set the max batch size to the same value so they can all be processed in one + // batch. The test will only wait for ~1 batch worth of time so will timeout + // if the max batch size isn't actually changed + const int maxBatchSize = SqlTableChangeMonitor.DefaultMaxBatchSize * 4; + const int firstId = 1; + const int lastId = maxBatchSize; + this.SetChangeTrackingForTable("Products"); + var taskCompletionSource = new TaskCompletionSource(); + DataReceivedEventHandler handler = TestUtils.CreateOutputReceievedHandler( + taskCompletionSource, + @"Starting change consumption loop. MaxBatchSize: (\d*) PollingIntervalMs: \d*", + "MaxBatchSize", + maxBatchSize.ToString()); + this.StartFunctionHost( + nameof(ProductsTriggerWithValidation), + SupportedLanguages.CSharp, + useTestFolder: true, + customOutputHandler: handler, + environmentVariables: new Dictionary() { + { "TEST_EXPECTED_MAX_BATCH_SIZE", maxBatchSize.ToString() }, + { "Sql_Trigger_MaxBatchSize", maxBatchSize.ToString() } + } + ); + + await this.WaitForProductChanges( + firstId, + lastId, + SqlChangeOperation.Insert, + () => { this.InsertProducts(firstId, lastId); return Task.CompletedTask; }, + id => $"Product {id}", + id => id * 100, + this.GetBatchProcessingTimeout(firstId, lastId, maxBatchSize: maxBatchSize)); + await taskCompletionSource.Task.TimeoutAfter(TimeSpan.FromSeconds(5), "Timed out waiting for MaxBatchSize configuration message"); } /// @@ -124,13 +167,13 @@ public async Task PollingIntervalOverrideTriggerTest() // Use enough items to require 5 batches to be processed - the test will // only wait for the expected time and timeout if the default polling // interval isn't actually modified. - const int lastId = SqlTableChangeMonitor.DefaultBatchSize * 5; + const int lastId = SqlTableChangeMonitor.DefaultMaxBatchSize * 5; const int pollingIntervalMs = SqlTableChangeMonitor.DefaultPollingIntervalMs / 2; this.SetChangeTrackingForTable("Products"); var taskCompletionSource = new TaskCompletionSource(); DataReceivedEventHandler handler = TestUtils.CreateOutputReceievedHandler( taskCompletionSource, - @"Starting change consumption loop. BatchSize: \d* PollingIntervalMs: (\d*)", + @"Starting change consumption loop. MaxBatchSize: \d* PollingIntervalMs: (\d*)", "PollingInterval", pollingIntervalMs.ToString()); this.StartFunctionHost( @@ -151,7 +194,7 @@ await this.WaitForProductChanges( id => $"Product {id}", id => id * 100, this.GetBatchProcessingTimeout(firstId, lastId, pollingIntervalMs: pollingIntervalMs)); - await taskCompletionSource.Task.TimeoutAfter(TimeSpan.FromSeconds(5000), "Timed out waiting for PollingInterval configuration message"); + await taskCompletionSource.Task.TimeoutAfter(TimeSpan.FromSeconds(5), "Timed out waiting for PollingInterval configuration message"); } /// diff --git a/test/Integration/test-csharp/ProductsTriggerWithValidation.cs b/test/Integration/test-csharp/ProductsTriggerWithValidation.cs index bb6d22df4..cab9e42c0 100644 --- a/test/Integration/test-csharp/ProductsTriggerWithValidation.cs +++ b/test/Integration/test-csharp/ProductsTriggerWithValidation.cs @@ -13,7 +13,7 @@ public static class ProductsTriggerWithValidation { /// /// Simple trigger function with additional logic to allow for verifying that the expected number - /// of changes was recieved in each batch. + /// of changes was received in each batch. /// [FunctionName(nameof(ProductsTriggerWithValidation))] public static void Run( @@ -21,10 +21,10 @@ public static void Run( IReadOnlyList> changes, ILogger logger) { - string expectedBatchSize = Environment.GetEnvironmentVariable("TEST_EXPECTED_BATCH_SIZE"); - if (!string.IsNullOrEmpty(expectedBatchSize) && int.Parse(expectedBatchSize) != changes.Count) + string expectedMaxBatchSize = Environment.GetEnvironmentVariable("TEST_EXPECTED_MAX_BATCH_SIZE"); + if (!string.IsNullOrEmpty(expectedMaxBatchSize) && int.Parse(expectedMaxBatchSize) != changes.Count) { - throw new Exception($"Invalid batch size, got {changes.Count} changes but expected {expectedBatchSize}"); + throw new Exception($"Invalid max batch size, got {changes.Count} changes but expected {expectedMaxBatchSize}"); } // The output is used to inspect the trigger binding parameter in test methods. logger.LogInformation("SQL Changes: " + JsonConvert.SerializeObject(changes));