From 1c4218b9ad86074aad9a7ca259e44b3a950a32a1 Mon Sep 17 00:00:00 2001 From: Charles Gagnon Date: Tue, 7 Feb 2023 14:05:59 -0800 Subject: [PATCH 1/5] BatchSize -> MaxBatchSize --- docs/BindingsOverview.md | 6 ++--- .../SqlTriggerPerformance_BatchOverride.cs | 8 +++---- .../SqlTriggerPerformance_Overrides.cs | 8 +++---- ...ggerPerformance_PollingIntervalOverride.cs | 2 +- src/Telemetry/Telemetry.cs | 2 +- src/TriggerBinding/SqlTableChangeMonitor.cs | 24 +++++++++---------- src/TriggerBinding/SqlTriggerConstants.cs | 2 +- .../SqlTriggerBindingIntegrationTestBase.cs | 6 ++--- .../SqlTriggerBindingIntegrationTests.cs | 14 +++++------ 9 files changed, 36 insertions(+), 36 deletions(-) diff --git a/docs/BindingsOverview.md b/docs/BindingsOverview.md index de9896cef..3e77c1f9e 100644 --- a/docs/BindingsOverview.md +++ b/docs/BindingsOverview.md @@ -17,7 +17,7 @@ - [az\_func.GlobalState](#az_funcglobalstate) - [az\_func.Leases\_\*](#az_funcleases_) - [Configuration for Trigger Bindings](#configuration-for-trigger-bindings) - - [Sql\_Trigger\_BatchSize](#sql_trigger_batchsize) + - [Sql\_Trigger\_MaxBatchSize](#sql_trigger_maxbatchsize) - [Sql\_Trigger\_PollingIntervalMs](#sql_trigger_pollingintervalms) - [Sql\_Trigger\_MaxChangesPerWorker](#sql_trigger_maxchangesperworker) - [Scaling for Trigger Bindings](#scaling-for-trigger-bindings) @@ -132,9 +132,9 @@ A row is created for every row in the target table that is modified. These are t This section goes over some of the configuration values you can use to customize SQL trigger bindings. See [How to Use Azure Function App Settings](https://learn.microsoft.com/azure/azure-functions/functions-how-to-use-azure-function-app-settings) to learn more. -#### Sql_Trigger_BatchSize +#### Sql_Trigger_MaxBatchSize -This controls the number of changes processed at once before being sent to the triggered function. +This controls the maximum number of changes sent to the function during each iteration of the change processing loop. #### Sql_Trigger_PollingIntervalMs diff --git a/performance/SqlTriggerPerformance_BatchOverride.cs b/performance/SqlTriggerPerformance_BatchOverride.cs index 803774d29..3a8f3fa19 100644 --- a/performance/SqlTriggerPerformance_BatchOverride.cs +++ b/performance/SqlTriggerPerformance_BatchOverride.cs @@ -14,7 +14,7 @@ public class SqlTriggerBindingPerformance_BatchOverride : SqlTriggerBindingPerfo { [Params(100, 1000)] - public int BatchSize; + public int MaxBatchSize; [GlobalSetup] public void GlobalSetup() @@ -24,7 +24,7 @@ public void GlobalSetup() nameof(ProductsTrigger), SupportedLanguages.CSharp, environmentVariables: new Dictionary() { - { "Sql_Trigger_BatchSize", this.BatchSize.ToString() } + { "Sql_Trigger_MaxBatchSize", this.MaxBatchSize.ToString() } }); } @@ -35,7 +35,7 @@ public void GlobalSetup() [Arguments(5)] public async Task Run(double numBatches) { - int count = (int)(numBatches * this.BatchSize); + int count = (int)(numBatches * this.MaxBatchSize); await this.WaitForProductChanges( 1, count, @@ -43,7 +43,7 @@ await this.WaitForProductChanges( () => { this.InsertProducts(1, count); return Task.CompletedTask; }, id => $"Product {id}", id => id * 100, - this.GetBatchProcessingTimeout(1, count, batchSize: this.BatchSize)); + this.GetBatchProcessingTimeout(1, count, maxBatchSize: this.MaxBatchSize)); } } } \ No newline at end of file diff --git a/performance/SqlTriggerPerformance_Overrides.cs b/performance/SqlTriggerPerformance_Overrides.cs index 4728778e8..b4cef58c3 100644 --- a/performance/SqlTriggerPerformance_Overrides.cs +++ b/performance/SqlTriggerPerformance_Overrides.cs @@ -16,7 +16,7 @@ public class SqlTriggerPerformance_Overrides : SqlTriggerBindingPerformanceTestB public int PollingIntervalMs; [Params(500, 2000)] - public int BatchSize; + public int MaxBatchSize; [GlobalSetup] public void GlobalSetup() @@ -26,7 +26,7 @@ public void GlobalSetup() nameof(ProductsTrigger), SupportedLanguages.CSharp, environmentVariables: new Dictionary() { - { "Sql_Trigger_BatchSize", this.BatchSize.ToString() }, + { "Sql_Trigger_MaxBatchSize", this.MaxBatchSize.ToString() }, { "Sql_Trigger_PollingIntervalMs", this.PollingIntervalMs.ToString() } }); } @@ -37,7 +37,7 @@ public void GlobalSetup() [Arguments(5)] public async Task Run(double numBatches) { - int count = (int)(numBatches * this.BatchSize); + int count = (int)(numBatches * this.MaxBatchSize); await this.WaitForProductChanges( 1, count, @@ -45,7 +45,7 @@ await this.WaitForProductChanges( () => { this.InsertProducts(1, count); return Task.CompletedTask; }, id => $"Product {id}", id => id * 100, - this.GetBatchProcessingTimeout(1, count, batchSize: this.BatchSize)); + this.GetBatchProcessingTimeout(1, count, maxBatchSize: this.MaxBatchSize)); } } } \ No newline at end of file diff --git a/performance/SqlTriggerPerformance_PollingIntervalOverride.cs b/performance/SqlTriggerPerformance_PollingIntervalOverride.cs index 2d0bc0b68..4554d3aa0 100644 --- a/performance/SqlTriggerPerformance_PollingIntervalOverride.cs +++ b/performance/SqlTriggerPerformance_PollingIntervalOverride.cs @@ -30,7 +30,7 @@ public void GlobalSetup() [Benchmark] public async Task Run() { - int count = SqlTableChangeMonitor.DefaultBatchSize * 2; + int count = SqlTableChangeMonitor.DefaultMaxBatchSize * 2; await this.WaitForProductChanges( 1, count, diff --git a/src/Telemetry/Telemetry.cs b/src/Telemetry/Telemetry.cs index ac69f00d4..31ea0d248 100644 --- a/src/Telemetry/Telemetry.cs +++ b/src/Telemetry/Telemetry.cs @@ -350,7 +350,7 @@ public enum TelemetryPropertyName ErrorCode, ErrorName, HasIdentityColumn, - HasConfiguredBatchSize, + HasConfiguredMaxBatchSize, HasConfiguredMaxChangesPerWorker, HasConfiguredPollingInterval, LeasesTableName, diff --git a/src/TriggerBinding/SqlTableChangeMonitor.cs b/src/TriggerBinding/SqlTableChangeMonitor.cs index b801cbd26..2e5543d18 100644 --- a/src/TriggerBinding/SqlTableChangeMonitor.cs +++ b/src/TriggerBinding/SqlTableChangeMonitor.cs @@ -45,7 +45,7 @@ internal sealed class SqlTableChangeMonitor : IDisposable private const int LeaseRenewalIntervalInSeconds = 15; private const int MaxRetryReleaseLeases = 3; - public const int DefaultBatchSize = 100; + public const int DefaultMaxBatchSize = 100; public const int DefaultPollingIntervalMs = 1000; #endregion Constants @@ -60,9 +60,9 @@ internal sealed class SqlTableChangeMonitor : IDisposable private readonly ITriggeredFunctionExecutor _executor; private readonly ILogger _logger; /// - /// Number of changes to process in each iteration of the loop + /// Maximum number of changes to process in each iteration of the loop /// - private readonly int _batchSize = DefaultBatchSize; + private readonly int _maxBatchSize = DefaultMaxBatchSize; /// /// Delay in ms between processing each batch of changes /// @@ -132,12 +132,12 @@ public SqlTableChangeMonitor( this._telemetryProps = telemetryProps ?? new Dictionary(); // Check if there's config settings to override the default batch size/polling interval values - int? configuredBatchSize = configuration.GetValue(ConfigKey_SqlTrigger_BatchSize); + int? configuredBatchSize = configuration.GetValue(ConfigKey_SqlTrigger_MaxBatchSize); int? configuredPollingInterval = configuration.GetValue(ConfigKey_SqlTrigger_PollingInterval); - this._batchSize = configuredBatchSize ?? this._batchSize; - if (this._batchSize <= 0) + this._maxBatchSize = configuredBatchSize ?? this._maxBatchSize; + if (this._maxBatchSize <= 0) { - throw new InvalidOperationException($"Invalid value for configuration setting '{ConfigKey_SqlTrigger_BatchSize}'. Ensure that the value is a positive integer."); + throw new InvalidOperationException($"Invalid value for configuration setting '{ConfigKey_SqlTrigger_MaxBatchSize}'. Ensure that the value is a positive integer."); } this._pollingIntervalInMs = configuredPollingInterval ?? this._pollingIntervalInMs; if (this._pollingIntervalInMs <= 0) @@ -147,17 +147,17 @@ public SqlTableChangeMonitor( TelemetryInstance.TrackEvent( TelemetryEventName.TriggerMonitorStart, new Dictionary(telemetryProps) { - { TelemetryPropertyName.HasConfiguredBatchSize, (configuredBatchSize != null).ToString() }, + { TelemetryPropertyName.HasConfiguredMaxBatchSize, (configuredBatchSize != null).ToString() }, { TelemetryPropertyName.HasConfiguredPollingInterval, (configuredPollingInterval != null).ToString() }, }, new Dictionary() { - { TelemetryMeasureName.BatchSize, this._batchSize }, + { TelemetryMeasureName.BatchSize, this._maxBatchSize }, { TelemetryMeasureName.PollingIntervalMs, this._pollingIntervalInMs } } ); // Prep search-conditions that will be used besides WHERE clause to match table rows. - this._rowMatchConditions = Enumerable.Range(0, this._batchSize) + this._rowMatchConditions = Enumerable.Range(0, this._maxBatchSize) .Select(rowIndex => string.Join(" AND ", this._primaryKeyColumns.Select((col, colIndex) => $"{col.name.AsBracketQuotedString()} = @{rowIndex}_{colIndex}"))) .ToList(); @@ -253,7 +253,7 @@ public async Task GetUnprocessedChangeCountAsync() /// private async Task RunChangeConsumptionLoopAsync() { - this._logger.LogInformationWithThreadId($"Starting change consumption loop. BatchSize: {this._batchSize} PollingIntervalMs: {this._pollingIntervalInMs}"); + this._logger.LogInformationWithThreadId($"Starting change consumption loop. BatchSize: {this._maxBatchSize} PollingIntervalMs: {this._pollingIntervalInMs}"); try { @@ -875,7 +875,7 @@ private SqlCommand BuildGetChangesCommand(SqlConnection connection, SqlTransacti FROM {GlobalStateTableName} WHERE UserFunctionID = '{this._userFunctionId}' AND UserTableID = {this._userTableId}; - SELECT TOP {this._batchSize} + SELECT TOP {this._maxBatchSize} {selectList}, c.{SysChangeVersionColumnName}, c.SYS_CHANGE_OPERATION, diff --git a/src/TriggerBinding/SqlTriggerConstants.cs b/src/TriggerBinding/SqlTriggerConstants.cs index 3b9cb47eb..5f6f21c89 100644 --- a/src/TriggerBinding/SqlTriggerConstants.cs +++ b/src/TriggerBinding/SqlTriggerConstants.cs @@ -26,7 +26,7 @@ internal static class SqlTriggerConstants LeasesTableLeaseExpirationTimeColumnName }; - public const string ConfigKey_SqlTrigger_BatchSize = "Sql_Trigger_BatchSize"; + public const string ConfigKey_SqlTrigger_MaxBatchSize = "Sql_Trigger_MaxBatchSize"; public const string ConfigKey_SqlTrigger_PollingInterval = "Sql_Trigger_PollingIntervalMs"; public const string ConfigKey_SqlTrigger_MaxChangesPerWorker = "Sql_Trigger_MaxChangesPerWorker"; diff --git a/test/Integration/SqlTriggerBindingIntegrationTestBase.cs b/test/Integration/SqlTriggerBindingIntegrationTestBase.cs index c5ff388da..66c1a2c72 100644 --- a/test/Integration/SqlTriggerBindingIntegrationTestBase.cs +++ b/test/Integration/SqlTriggerBindingIntegrationTestBase.cs @@ -184,13 +184,13 @@ void OutputHandler(object sender, DataReceivedEventArgs e) /// /// The first ID in the batch to process /// The last ID in the batch to process - /// The batch size if different than the default batch size + /// The max batch size if different than the default batch size /// The polling interval in ms if different than the default polling interval /// - public int GetBatchProcessingTimeout(int firstId, int lastId, int batchSize = SqlTableChangeMonitor.DefaultBatchSize, int pollingIntervalMs = SqlTableChangeMonitor.DefaultPollingIntervalMs) + public int GetBatchProcessingTimeout(int firstId, int lastId, int maxBatchSize = SqlTableChangeMonitor.DefaultMaxBatchSize, int pollingIntervalMs = SqlTableChangeMonitor.DefaultPollingIntervalMs) { int changesToProcess = lastId - firstId + 1; - int calculatedTimeout = (int)(Math.Ceiling((double)changesToProcess / batchSize // The number of batches to process + int calculatedTimeout = (int)(Math.Ceiling((double)changesToProcess / maxBatchSize // The number of batches to process / this.FunctionHostList.Count) // The number of function host processes * pollingIntervalMs // The length to process each batch * 2); // Double to add buffer time for processing results & writing log messages diff --git a/test/Integration/SqlTriggerBindingIntegrationTests.cs b/test/Integration/SqlTriggerBindingIntegrationTests.cs index a04e2d388..9d8ce15a4 100644 --- a/test/Integration/SqlTriggerBindingIntegrationTests.cs +++ b/test/Integration/SqlTriggerBindingIntegrationTests.cs @@ -82,24 +82,24 @@ public async Task BatchSizeOverrideTriggerTest() // set the batch size to the same value so they can all be processed in one // batch. The test will only wait for ~1 batch worth of time so will timeout // if the batch size isn't actually changed - const int batchSize = SqlTableChangeMonitor.DefaultBatchSize * 4; + const int maxBatchSize = SqlTableChangeMonitor.DefaultMaxBatchSize * 4; const int firstId = 1; - const int lastId = batchSize; + const int lastId = maxBatchSize; this.SetChangeTrackingForTable("Products"); var taskCompletionSource = new TaskCompletionSource(); DataReceivedEventHandler handler = TestUtils.CreateOutputReceievedHandler( taskCompletionSource, @"Starting change consumption loop. BatchSize: (\d*) PollingIntervalMs: \d*", "BatchSize", - batchSize.ToString()); + maxBatchSize.ToString()); this.StartFunctionHost( nameof(ProductsTriggerWithValidation), SupportedLanguages.CSharp, useTestFolder: true, customOutputHandler: handler, environmentVariables: new Dictionary() { - { "TEST_EXPECTED_BATCH_SIZE", batchSize.ToString() }, - { "Sql_Trigger_BatchSize", batchSize.ToString() } + { "TEST_EXPECTED_BATCH_SIZE", maxBatchSize.ToString() }, + { "Sql_Trigger_MaxBatchSize", maxBatchSize.ToString() } } ); @@ -110,7 +110,7 @@ await this.WaitForProductChanges( () => { this.InsertProducts(firstId, lastId); return Task.CompletedTask; }, id => $"Product {id}", id => id * 100, - this.GetBatchProcessingTimeout(firstId, lastId, batchSize: batchSize)); + this.GetBatchProcessingTimeout(firstId, lastId, maxBatchSize: maxBatchSize)); await taskCompletionSource.Task.TimeoutAfter(TimeSpan.FromSeconds(5000), "Timed out waiting for BatchSize configuration message"); } @@ -124,7 +124,7 @@ public async Task PollingIntervalOverrideTriggerTest() // Use enough items to require 5 batches to be processed - the test will // only wait for the expected time and timeout if the default polling // interval isn't actually modified. - const int lastId = SqlTableChangeMonitor.DefaultBatchSize * 5; + const int lastId = SqlTableChangeMonitor.DefaultMaxBatchSize * 5; const int pollingIntervalMs = SqlTableChangeMonitor.DefaultPollingIntervalMs / 2; this.SetChangeTrackingForTable("Products"); var taskCompletionSource = new TaskCompletionSource(); From c62ebd3f8f40b144c59be4027c65b1de8af7dd84 Mon Sep 17 00:00:00 2001 From: Charles Gagnon Date: Tue, 7 Feb 2023 14:21:14 -0800 Subject: [PATCH 2/5] Add backwards compat support --- src/TriggerBinding/SqlTableChangeMonitor.cs | 15 +++-- src/TriggerBinding/SqlTriggerConstants.cs | 4 ++ .../SqlTriggerBindingIntegrationTestBase.cs | 4 +- .../SqlTriggerBindingIntegrationTests.cs | 55 +++++++++++++++++-- .../ProductsTriggerWithValidation.cs | 8 +-- 5 files changed, 69 insertions(+), 17 deletions(-) diff --git a/src/TriggerBinding/SqlTableChangeMonitor.cs b/src/TriggerBinding/SqlTableChangeMonitor.cs index 2e5543d18..ce9252516 100644 --- a/src/TriggerBinding/SqlTableChangeMonitor.cs +++ b/src/TriggerBinding/SqlTableChangeMonitor.cs @@ -131,10 +131,15 @@ public SqlTableChangeMonitor( this._userTableId = userTableId; this._telemetryProps = telemetryProps ?? new Dictionary(); - // Check if there's config settings to override the default batch size/polling interval values - int? configuredBatchSize = configuration.GetValue(ConfigKey_SqlTrigger_MaxBatchSize); + // Check if there's config settings to override the default max batch size/polling interval values + int? configuredMaxBatchSize = configuration.GetValue(ConfigKey_SqlTrigger_MaxBatchSize); + // Fall back to original value for backwards compat if the new value isn't specified + if (configuredMaxBatchSize == null) + { + configuredMaxBatchSize = configuration.GetValue(ConfigKey_SqlTrigger_BatchSize); + } int? configuredPollingInterval = configuration.GetValue(ConfigKey_SqlTrigger_PollingInterval); - this._maxBatchSize = configuredBatchSize ?? this._maxBatchSize; + this._maxBatchSize = configuredMaxBatchSize ?? this._maxBatchSize; if (this._maxBatchSize <= 0) { throw new InvalidOperationException($"Invalid value for configuration setting '{ConfigKey_SqlTrigger_MaxBatchSize}'. Ensure that the value is a positive integer."); @@ -147,7 +152,7 @@ public SqlTableChangeMonitor( TelemetryInstance.TrackEvent( TelemetryEventName.TriggerMonitorStart, new Dictionary(telemetryProps) { - { TelemetryPropertyName.HasConfiguredMaxBatchSize, (configuredBatchSize != null).ToString() }, + { TelemetryPropertyName.HasConfiguredMaxBatchSize, (configuredMaxBatchSize != null).ToString() }, { TelemetryPropertyName.HasConfiguredPollingInterval, (configuredPollingInterval != null).ToString() }, }, new Dictionary() { @@ -253,7 +258,7 @@ public async Task GetUnprocessedChangeCountAsync() /// private async Task RunChangeConsumptionLoopAsync() { - this._logger.LogInformationWithThreadId($"Starting change consumption loop. BatchSize: {this._maxBatchSize} PollingIntervalMs: {this._pollingIntervalInMs}"); + this._logger.LogInformationWithThreadId($"Starting change consumption loop. MaxBatchSize: {this._maxBatchSize} PollingIntervalMs: {this._pollingIntervalInMs}"); try { diff --git a/src/TriggerBinding/SqlTriggerConstants.cs b/src/TriggerBinding/SqlTriggerConstants.cs index 5f6f21c89..276381b4a 100644 --- a/src/TriggerBinding/SqlTriggerConstants.cs +++ b/src/TriggerBinding/SqlTriggerConstants.cs @@ -26,6 +26,10 @@ internal static class SqlTriggerConstants LeasesTableLeaseExpirationTimeColumnName }; + /// + /// Deprecated config value for MaxBatchSize, kept for backwards compat reasons + /// + public const string ConfigKey_SqlTrigger_BatchSize = "Sql_Trigger_BatchSize"; public const string ConfigKey_SqlTrigger_MaxBatchSize = "Sql_Trigger_MaxBatchSize"; public const string ConfigKey_SqlTrigger_PollingInterval = "Sql_Trigger_PollingIntervalMs"; public const string ConfigKey_SqlTrigger_MaxChangesPerWorker = "Sql_Trigger_MaxChangesPerWorker"; diff --git a/test/Integration/SqlTriggerBindingIntegrationTestBase.cs b/test/Integration/SqlTriggerBindingIntegrationTestBase.cs index 66c1a2c72..e3cecfcfd 100644 --- a/test/Integration/SqlTriggerBindingIntegrationTestBase.cs +++ b/test/Integration/SqlTriggerBindingIntegrationTestBase.cs @@ -180,11 +180,11 @@ void OutputHandler(object sender, DataReceivedEventArgs e) /// /// Gets a timeout value to use when processing the given number of changes, based on the - /// default batch size and polling interval. + /// default max batch size and polling interval. /// /// The first ID in the batch to process /// The last ID in the batch to process - /// The max batch size if different than the default batch size + /// The max batch size if different than the default max batch size /// The polling interval in ms if different than the default polling interval /// public int GetBatchProcessingTimeout(int firstId, int lastId, int maxBatchSize = SqlTableChangeMonitor.DefaultMaxBatchSize, int pollingIntervalMs = SqlTableChangeMonitor.DefaultPollingIntervalMs) diff --git a/test/Integration/SqlTriggerBindingIntegrationTests.cs b/test/Integration/SqlTriggerBindingIntegrationTests.cs index 9d8ce15a4..338949178 100644 --- a/test/Integration/SqlTriggerBindingIntegrationTests.cs +++ b/test/Integration/SqlTriggerBindingIntegrationTests.cs @@ -73,15 +73,16 @@ await this.WaitForProductChanges( } /// - /// Verifies that manually setting the batch size correctly changes the number of changes processed at once + /// Verifies that manually setting the batch size using the original config var correctly changes the + /// number of changes processed at once. /// [Fact] public async Task BatchSizeOverrideTriggerTest() { // Use enough items to require 4 batches to be processed but then - // set the batch size to the same value so they can all be processed in one + // set the max batch size to the same value so they can all be processed in one // batch. The test will only wait for ~1 batch worth of time so will timeout - // if the batch size isn't actually changed + // if the max batch size isn't actually changed const int maxBatchSize = SqlTableChangeMonitor.DefaultMaxBatchSize * 4; const int firstId = 1; const int lastId = maxBatchSize; @@ -89,7 +90,7 @@ public async Task BatchSizeOverrideTriggerTest() var taskCompletionSource = new TaskCompletionSource(); DataReceivedEventHandler handler = TestUtils.CreateOutputReceievedHandler( taskCompletionSource, - @"Starting change consumption loop. BatchSize: (\d*) PollingIntervalMs: \d*", + @"Starting change consumption loop. MaxBatchSize: (\d*) PollingIntervalMs: \d*", "BatchSize", maxBatchSize.ToString()); this.StartFunctionHost( @@ -98,7 +99,49 @@ public async Task BatchSizeOverrideTriggerTest() useTestFolder: true, customOutputHandler: handler, environmentVariables: new Dictionary() { - { "TEST_EXPECTED_BATCH_SIZE", maxBatchSize.ToString() }, + { "TEST_EXPECTED_MAX_BATCH_SIZE", maxBatchSize.ToString() }, + { "Sql_Trigger_BatchSize", maxBatchSize.ToString() } // Use old BatchSize config + } + ); + + await this.WaitForProductChanges( + firstId, + lastId, + SqlChangeOperation.Insert, + () => { this.InsertProducts(firstId, lastId); return Task.CompletedTask; }, + id => $"Product {id}", + id => id * 100, + this.GetBatchProcessingTimeout(firstId, lastId, maxBatchSize: maxBatchSize)); + await taskCompletionSource.Task.TimeoutAfter(TimeSpan.FromSeconds(5000), "Timed out waiting for MaxBatchSize configuration message"); + } + + /// + /// Verifies that manually setting the max batch size correctly changes the number of changes processed at once + /// + [Fact] + public async Task MaxBatchSizeOverrideTriggerTest() + { + // Use enough items to require 4 batches to be processed but then + // set the max batch size to the same value so they can all be processed in one + // batch. The test will only wait for ~1 batch worth of time so will timeout + // if the max batch size isn't actually changed + const int maxBatchSize = SqlTableChangeMonitor.DefaultMaxBatchSize * 4; + const int firstId = 1; + const int lastId = maxBatchSize; + this.SetChangeTrackingForTable("Products"); + var taskCompletionSource = new TaskCompletionSource(); + DataReceivedEventHandler handler = TestUtils.CreateOutputReceievedHandler( + taskCompletionSource, + @"Starting change consumption loop. MaxBatchSize: (\d*) PollingIntervalMs: \d*", + "BatchSize", + maxBatchSize.ToString()); + this.StartFunctionHost( + nameof(ProductsTriggerWithValidation), + SupportedLanguages.CSharp, + useTestFolder: true, + customOutputHandler: handler, + environmentVariables: new Dictionary() { + { "TEST_EXPECTED_MAX_BATCH_SIZE", maxBatchSize.ToString() }, { "Sql_Trigger_MaxBatchSize", maxBatchSize.ToString() } } ); @@ -111,7 +154,7 @@ await this.WaitForProductChanges( id => $"Product {id}", id => id * 100, this.GetBatchProcessingTimeout(firstId, lastId, maxBatchSize: maxBatchSize)); - await taskCompletionSource.Task.TimeoutAfter(TimeSpan.FromSeconds(5000), "Timed out waiting for BatchSize configuration message"); + await taskCompletionSource.Task.TimeoutAfter(TimeSpan.FromSeconds(5000), "Timed out waiting for MaxBatchSize configuration message"); } /// diff --git a/test/Integration/test-csharp/ProductsTriggerWithValidation.cs b/test/Integration/test-csharp/ProductsTriggerWithValidation.cs index bb6d22df4..cab9e42c0 100644 --- a/test/Integration/test-csharp/ProductsTriggerWithValidation.cs +++ b/test/Integration/test-csharp/ProductsTriggerWithValidation.cs @@ -13,7 +13,7 @@ public static class ProductsTriggerWithValidation { /// /// Simple trigger function with additional logic to allow for verifying that the expected number - /// of changes was recieved in each batch. + /// of changes was received in each batch. /// [FunctionName(nameof(ProductsTriggerWithValidation))] public static void Run( @@ -21,10 +21,10 @@ public static void Run( IReadOnlyList> changes, ILogger logger) { - string expectedBatchSize = Environment.GetEnvironmentVariable("TEST_EXPECTED_BATCH_SIZE"); - if (!string.IsNullOrEmpty(expectedBatchSize) && int.Parse(expectedBatchSize) != changes.Count) + string expectedMaxBatchSize = Environment.GetEnvironmentVariable("TEST_EXPECTED_MAX_BATCH_SIZE"); + if (!string.IsNullOrEmpty(expectedMaxBatchSize) && int.Parse(expectedMaxBatchSize) != changes.Count) { - throw new Exception($"Invalid batch size, got {changes.Count} changes but expected {expectedBatchSize}"); + throw new Exception($"Invalid max batch size, got {changes.Count} changes but expected {expectedMaxBatchSize}"); } // The output is used to inspect the trigger binding parameter in test methods. logger.LogInformation("SQL Changes: " + JsonConvert.SerializeObject(changes)); From c6989da5d94f302d9764a2a66e4f721457b1fce8 Mon Sep 17 00:00:00 2001 From: Charles Gagnon Date: Tue, 7 Feb 2023 21:09:13 -0800 Subject: [PATCH 3/5] updates --- src/Telemetry/Telemetry.cs | 1 + src/TriggerBinding/SqlTableChangeMonitor.cs | 2 +- test/Integration/SqlTriggerBindingIntegrationTests.cs | 4 ++-- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/Telemetry/Telemetry.cs b/src/Telemetry/Telemetry.cs index 31ea0d248..b3124fadd 100644 --- a/src/Telemetry/Telemetry.cs +++ b/src/Telemetry/Telemetry.cs @@ -383,6 +383,7 @@ public enum TelemetryMeasureName GetPrimaryKeysDurationMs, GetUnprocessedChangesDurationMs, InsertGlobalStateTableRowDurationMs, + MaxBatchSize, MaxChangesPerWorker, NumRows, PollingIntervalMs, diff --git a/src/TriggerBinding/SqlTableChangeMonitor.cs b/src/TriggerBinding/SqlTableChangeMonitor.cs index ce9252516..fe6dccf8e 100644 --- a/src/TriggerBinding/SqlTableChangeMonitor.cs +++ b/src/TriggerBinding/SqlTableChangeMonitor.cs @@ -156,7 +156,7 @@ public SqlTableChangeMonitor( { TelemetryPropertyName.HasConfiguredPollingInterval, (configuredPollingInterval != null).ToString() }, }, new Dictionary() { - { TelemetryMeasureName.BatchSize, this._maxBatchSize }, + { TelemetryMeasureName.MaxBatchSize, this._maxBatchSize }, { TelemetryMeasureName.PollingIntervalMs, this._pollingIntervalInMs } } ); diff --git a/test/Integration/SqlTriggerBindingIntegrationTests.cs b/test/Integration/SqlTriggerBindingIntegrationTests.cs index 338949178..ce935bce4 100644 --- a/test/Integration/SqlTriggerBindingIntegrationTests.cs +++ b/test/Integration/SqlTriggerBindingIntegrationTests.cs @@ -91,7 +91,7 @@ public async Task BatchSizeOverrideTriggerTest() DataReceivedEventHandler handler = TestUtils.CreateOutputReceievedHandler( taskCompletionSource, @"Starting change consumption loop. MaxBatchSize: (\d*) PollingIntervalMs: \d*", - "BatchSize", + "MaxBatchSize", maxBatchSize.ToString()); this.StartFunctionHost( nameof(ProductsTriggerWithValidation), @@ -133,7 +133,7 @@ public async Task MaxBatchSizeOverrideTriggerTest() DataReceivedEventHandler handler = TestUtils.CreateOutputReceievedHandler( taskCompletionSource, @"Starting change consumption loop. MaxBatchSize: (\d*) PollingIntervalMs: \d*", - "BatchSize", + "MaxBatchSize", maxBatchSize.ToString()); this.StartFunctionHost( nameof(ProductsTriggerWithValidation), From cce5f708bece78b974f7d3f290e76fcd22e9a583 Mon Sep 17 00:00:00 2001 From: Charles Gagnon Date: Fri, 10 Feb 2023 13:06:28 -0800 Subject: [PATCH 4/5] fix test --- test/Integration/SqlTriggerBindingIntegrationTests.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/Integration/SqlTriggerBindingIntegrationTests.cs b/test/Integration/SqlTriggerBindingIntegrationTests.cs index ce935bce4..4e268c51d 100644 --- a/test/Integration/SqlTriggerBindingIntegrationTests.cs +++ b/test/Integration/SqlTriggerBindingIntegrationTests.cs @@ -154,7 +154,7 @@ await this.WaitForProductChanges( id => $"Product {id}", id => id * 100, this.GetBatchProcessingTimeout(firstId, lastId, maxBatchSize: maxBatchSize)); - await taskCompletionSource.Task.TimeoutAfter(TimeSpan.FromSeconds(5000), "Timed out waiting for MaxBatchSize configuration message"); + await taskCompletionSource.Task.TimeoutAfter(TimeSpan.FromSeconds(5), "Timed out waiting for MaxBatchSize configuration message"); } /// @@ -173,7 +173,7 @@ public async Task PollingIntervalOverrideTriggerTest() var taskCompletionSource = new TaskCompletionSource(); DataReceivedEventHandler handler = TestUtils.CreateOutputReceievedHandler( taskCompletionSource, - @"Starting change consumption loop. BatchSize: \d* PollingIntervalMs: (\d*)", + @"Starting change consumption loop. MaxBatchSize: \d* PollingIntervalMs: (\d*)", "PollingInterval", pollingIntervalMs.ToString()); this.StartFunctionHost( @@ -194,7 +194,7 @@ await this.WaitForProductChanges( id => $"Product {id}", id => id * 100, this.GetBatchProcessingTimeout(firstId, lastId, pollingIntervalMs: pollingIntervalMs)); - await taskCompletionSource.Task.TimeoutAfter(TimeSpan.FromSeconds(5000), "Timed out waiting for PollingInterval configuration message"); + await taskCompletionSource.Task.TimeoutAfter(TimeSpan.FromSeconds(5), "Timed out waiting for PollingInterval configuration message"); } /// From ee2d1c069167ba9cb6cb66ba7e692736cbd4e27a Mon Sep 17 00:00:00 2001 From: Charles Gagnon Date: Fri, 10 Feb 2023 13:07:55 -0800 Subject: [PATCH 5/5] Fix one more --- test/Integration/SqlTriggerBindingIntegrationTests.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/Integration/SqlTriggerBindingIntegrationTests.cs b/test/Integration/SqlTriggerBindingIntegrationTests.cs index 4e268c51d..06cf487c9 100644 --- a/test/Integration/SqlTriggerBindingIntegrationTests.cs +++ b/test/Integration/SqlTriggerBindingIntegrationTests.cs @@ -112,7 +112,7 @@ await this.WaitForProductChanges( id => $"Product {id}", id => id * 100, this.GetBatchProcessingTimeout(firstId, lastId, maxBatchSize: maxBatchSize)); - await taskCompletionSource.Task.TimeoutAfter(TimeSpan.FromSeconds(5000), "Timed out waiting for MaxBatchSize configuration message"); + await taskCompletionSource.Task.TimeoutAfter(TimeSpan.FromSeconds(5), "Timed out waiting for MaxBatchSize configuration message"); } ///