Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/BindingsOverview.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
- [az\_func.GlobalState](#az_funcglobalstate)
- [az\_func.Leases\_\*](#az_funcleases_)
- [Configuration for Trigger Bindings](#configuration-for-trigger-bindings)
- [Sql\_Trigger\_BatchSize](#sql_trigger_batchsize)
- [Sql\_Trigger\_MaxBatchSize](#sql_trigger_maxbatchsize)
- [Sql\_Trigger\_PollingIntervalMs](#sql_trigger_pollingintervalms)
- [Sql\_Trigger\_MaxChangesPerWorker](#sql_trigger_maxchangesperworker)
- [Scaling for Trigger Bindings](#scaling-for-trigger-bindings)
Expand Down Expand Up @@ -153,9 +153,9 @@ A row is created for every row in the target table that is modified. These are t

This section goes over some of the configuration values you can use to customize SQL trigger bindings. See [How to Use Azure Function App Settings](https://learn.microsoft.com/azure/azure-functions/functions-how-to-use-azure-function-app-settings) to learn more.

#### Sql_Trigger_BatchSize
#### Sql_Trigger_MaxBatchSize

This controls the number of changes processed at once before being sent to the triggered function.
This controls the maximum number of changes sent to the function during each iteration of the change processing loop.

#### Sql_Trigger_PollingIntervalMs

Expand Down
8 changes: 4 additions & 4 deletions performance/SqlTriggerPerformance_BatchOverride.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class SqlTriggerBindingPerformance_BatchOverride : SqlTriggerBindingPerfo
{

[Params(100, 1000)]
public int BatchSize;
public int MaxBatchSize;

[GlobalSetup]
public void GlobalSetup()
Expand All @@ -24,7 +24,7 @@ public void GlobalSetup()
nameof(ProductsTrigger),
SupportedLanguages.CSharp,
environmentVariables: new Dictionary<string, string>() {
{ "Sql_Trigger_BatchSize", this.BatchSize.ToString() }
{ "Sql_Trigger_MaxBatchSize", this.MaxBatchSize.ToString() }
});
}

Expand All @@ -35,15 +35,15 @@ public void GlobalSetup()
[Arguments(5)]
public async Task Run(double numBatches)
{
int count = (int)(numBatches * this.BatchSize);
int count = (int)(numBatches * this.MaxBatchSize);
await this.WaitForProductChanges(
1,
count,
SqlChangeOperation.Insert,
() => { this.InsertProducts(1, count); return Task.CompletedTask; },
id => $"Product {id}",
id => id * 100,
this.GetBatchProcessingTimeout(1, count, batchSize: this.BatchSize));
this.GetBatchProcessingTimeout(1, count, maxBatchSize: this.MaxBatchSize));
}
}
}
8 changes: 4 additions & 4 deletions performance/SqlTriggerPerformance_Overrides.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class SqlTriggerPerformance_Overrides : SqlTriggerBindingPerformanceTestB
public int PollingIntervalMs;

[Params(500, 2000)]
public int BatchSize;
public int MaxBatchSize;

[GlobalSetup]
public void GlobalSetup()
Expand All @@ -26,7 +26,7 @@ public void GlobalSetup()
nameof(ProductsTrigger),
SupportedLanguages.CSharp,
environmentVariables: new Dictionary<string, string>() {
{ "Sql_Trigger_BatchSize", this.BatchSize.ToString() },
{ "Sql_Trigger_MaxBatchSize", this.MaxBatchSize.ToString() },
{ "Sql_Trigger_PollingIntervalMs", this.PollingIntervalMs.ToString() }
});
}
Expand All @@ -37,15 +37,15 @@ public void GlobalSetup()
[Arguments(5)]
public async Task Run(double numBatches)
{
int count = (int)(numBatches * this.BatchSize);
int count = (int)(numBatches * this.MaxBatchSize);
await this.WaitForProductChanges(
1,
count,
SqlChangeOperation.Insert,
() => { this.InsertProducts(1, count); return Task.CompletedTask; },
id => $"Product {id}",
id => id * 100,
this.GetBatchProcessingTimeout(1, count, batchSize: this.BatchSize));
this.GetBatchProcessingTimeout(1, count, maxBatchSize: this.MaxBatchSize));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public void GlobalSetup()
[Benchmark]
public async Task Run()
{
int count = SqlTableChangeMonitor<object>.DefaultBatchSize * 2;
int count = SqlTableChangeMonitor<object>.DefaultMaxBatchSize * 2;
await this.WaitForProductChanges(
1,
count,
Expand Down
3 changes: 2 additions & 1 deletion src/Telemetry/Telemetry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ public enum TelemetryPropertyName
ErrorCode,
ErrorName,
HasIdentityColumn,
HasConfiguredBatchSize,
HasConfiguredMaxBatchSize,
HasConfiguredMaxChangesPerWorker,
HasConfiguredPollingInterval,
LeasesTableName,
Expand Down Expand Up @@ -383,6 +383,7 @@ public enum TelemetryMeasureName
GetPrimaryKeysDurationMs,
GetUnprocessedChangesDurationMs,
InsertGlobalStateTableRowDurationMs,
MaxBatchSize,
MaxChangesPerWorker,
NumRows,
PollingIntervalMs,
Expand Down
31 changes: 18 additions & 13 deletions src/TriggerBinding/SqlTableChangeMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ internal sealed class SqlTableChangeMonitor<T> : IDisposable
private const int LeaseRenewalIntervalInSeconds = 15;
private const int MaxRetryReleaseLeases = 3;

public const int DefaultBatchSize = 100;
public const int DefaultMaxBatchSize = 100;
public const int DefaultPollingIntervalMs = 1000;
#endregion Constants

Expand All @@ -60,9 +60,9 @@ internal sealed class SqlTableChangeMonitor<T> : IDisposable
private readonly ITriggeredFunctionExecutor _executor;
private readonly ILogger _logger;
/// <summary>
/// Number of changes to process in each iteration of the loop
/// Maximum number of changes to process in each iteration of the loop
/// </summary>
private readonly int _batchSize = DefaultBatchSize;
private readonly int _maxBatchSize = DefaultMaxBatchSize;
/// <summary>
/// Delay in ms between processing each batch of changes
/// </summary>
Expand Down Expand Up @@ -131,13 +131,18 @@ public SqlTableChangeMonitor(
this._userTableId = userTableId;
this._telemetryProps = telemetryProps ?? new Dictionary<TelemetryPropertyName, string>();

// Check if there's config settings to override the default batch size/polling interval values
int? configuredBatchSize = configuration.GetValue<int?>(ConfigKey_SqlTrigger_BatchSize);
// Check if there's config settings to override the default max batch size/polling interval values
int? configuredMaxBatchSize = configuration.GetValue<int?>(ConfigKey_SqlTrigger_MaxBatchSize);
// Fall back to original value for backwards compat if the new value isn't specified
if (configuredMaxBatchSize == null)
{
configuredMaxBatchSize = configuration.GetValue<int?>(ConfigKey_SqlTrigger_BatchSize);
}
int? configuredPollingInterval = configuration.GetValue<int?>(ConfigKey_SqlTrigger_PollingInterval);
this._batchSize = configuredBatchSize ?? this._batchSize;
if (this._batchSize <= 0)
this._maxBatchSize = configuredMaxBatchSize ?? this._maxBatchSize;
if (this._maxBatchSize <= 0)
{
throw new InvalidOperationException($"Invalid value for configuration setting '{ConfigKey_SqlTrigger_BatchSize}'. Ensure that the value is a positive integer.");
throw new InvalidOperationException($"Invalid value for configuration setting '{ConfigKey_SqlTrigger_MaxBatchSize}'. Ensure that the value is a positive integer.");
}
this._pollingIntervalInMs = configuredPollingInterval ?? this._pollingIntervalInMs;
if (this._pollingIntervalInMs <= 0)
Expand All @@ -147,17 +152,17 @@ public SqlTableChangeMonitor(
TelemetryInstance.TrackEvent(
TelemetryEventName.TriggerMonitorStart,
new Dictionary<TelemetryPropertyName, string>(telemetryProps) {
{ TelemetryPropertyName.HasConfiguredBatchSize, (configuredBatchSize != null).ToString() },
{ TelemetryPropertyName.HasConfiguredMaxBatchSize, (configuredMaxBatchSize != null).ToString() },
{ TelemetryPropertyName.HasConfiguredPollingInterval, (configuredPollingInterval != null).ToString() },
},
new Dictionary<TelemetryMeasureName, double>() {
{ TelemetryMeasureName.BatchSize, this._batchSize },
{ TelemetryMeasureName.MaxBatchSize, this._maxBatchSize },
{ TelemetryMeasureName.PollingIntervalMs, this._pollingIntervalInMs }
}
);

// Prep search-conditions that will be used besides WHERE clause to match table rows.
this._rowMatchConditions = Enumerable.Range(0, this._batchSize)
this._rowMatchConditions = Enumerable.Range(0, this._maxBatchSize)
.Select(rowIndex => string.Join(" AND ", this._primaryKeyColumns.Select((col, colIndex) => $"{col.name.AsBracketQuotedString()} = @{rowIndex}_{colIndex}")))
.ToList();

Expand Down Expand Up @@ -253,7 +258,7 @@ public async Task<long> GetUnprocessedChangeCountAsync()
/// </summary>
private async Task RunChangeConsumptionLoopAsync()
{
this._logger.LogInformationWithThreadId($"Starting change consumption loop. BatchSize: {this._batchSize} PollingIntervalMs: {this._pollingIntervalInMs}");
this._logger.LogInformationWithThreadId($"Starting change consumption loop. MaxBatchSize: {this._maxBatchSize} PollingIntervalMs: {this._pollingIntervalInMs}");

try
{
Expand Down Expand Up @@ -875,7 +880,7 @@ private SqlCommand BuildGetChangesCommand(SqlConnection connection, SqlTransacti
FROM {GlobalStateTableName}
WHERE UserFunctionID = '{this._userFunctionId}' AND UserTableID = {this._userTableId};

SELECT TOP {this._batchSize}
SELECT TOP {this._maxBatchSize}
{selectList},
c.{SysChangeVersionColumnName},
c.SYS_CHANGE_OPERATION,
Expand Down
4 changes: 4 additions & 0 deletions src/TriggerBinding/SqlTriggerConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ internal static class SqlTriggerConstants
LeasesTableLeaseExpirationTimeColumnName
};

/// <summary>
/// Deprecated config value for MaxBatchSize, kept for backwards compat reasons
/// </summary>
public const string ConfigKey_SqlTrigger_BatchSize = "Sql_Trigger_BatchSize";
public const string ConfigKey_SqlTrigger_MaxBatchSize = "Sql_Trigger_MaxBatchSize";
public const string ConfigKey_SqlTrigger_PollingInterval = "Sql_Trigger_PollingIntervalMs";
public const string ConfigKey_SqlTrigger_MaxChangesPerWorker = "Sql_Trigger_MaxChangesPerWorker";

Expand Down
8 changes: 4 additions & 4 deletions test/Integration/SqlTriggerBindingIntegrationTestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -181,17 +181,17 @@ void OutputHandler(object sender, DataReceivedEventArgs e)

/// <summary>
/// Gets a timeout value to use when processing the given number of changes, based on the
/// default batch size and polling interval.
/// default max batch size and polling interval.
/// </summary>
/// <param name="firstId">The first ID in the batch to process</param>
/// <param name="lastId">The last ID in the batch to process</param>
/// <param name="batchSize">The batch size if different than the default batch size</param>
/// <param name="maxBatchSize">The max batch size if different than the default max batch size</param>
/// <param name="pollingIntervalMs">The polling interval in ms if different than the default polling interval</param>
/// <returns></returns>
public int GetBatchProcessingTimeout(int firstId, int lastId, int batchSize = SqlTableChangeMonitor<object>.DefaultBatchSize, int pollingIntervalMs = SqlTableChangeMonitor<object>.DefaultPollingIntervalMs)
public int GetBatchProcessingTimeout(int firstId, int lastId, int maxBatchSize = SqlTableChangeMonitor<object>.DefaultMaxBatchSize, int pollingIntervalMs = SqlTableChangeMonitor<object>.DefaultPollingIntervalMs)
{
int changesToProcess = lastId - firstId + 1;
int calculatedTimeout = (int)(Math.Ceiling((double)changesToProcess / batchSize // The number of batches to process
int calculatedTimeout = (int)(Math.Ceiling((double)changesToProcess / maxBatchSize // The number of batches to process
/ this.FunctionHostList.Count) // The number of function host processes
* pollingIntervalMs // The length to process each batch
* 2); // Double to add buffer time for processing results & writing log messages
Expand Down
73 changes: 58 additions & 15 deletions test/Integration/SqlTriggerBindingIntegrationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,33 +73,34 @@ await this.WaitForProductChanges(
}

/// <summary>
/// Verifies that manually setting the batch size correctly changes the number of changes processed at once
/// Verifies that manually setting the batch size using the original config var correctly changes the
/// number of changes processed at once.
/// </summary>
[Fact]
public async Task BatchSizeOverrideTriggerTest()
{
// Use enough items to require 4 batches to be processed but then
// set the batch size to the same value so they can all be processed in one
// set the max batch size to the same value so they can all be processed in one
// batch. The test will only wait for ~1 batch worth of time so will timeout
// if the batch size isn't actually changed
const int batchSize = SqlTableChangeMonitor<object>.DefaultBatchSize * 4;
// if the max batch size isn't actually changed
const int maxBatchSize = SqlTableChangeMonitor<object>.DefaultMaxBatchSize * 4;
const int firstId = 1;
const int lastId = batchSize;
const int lastId = maxBatchSize;
this.SetChangeTrackingForTable("Products");
var taskCompletionSource = new TaskCompletionSource<bool>();
DataReceivedEventHandler handler = TestUtils.CreateOutputReceievedHandler(
taskCompletionSource,
@"Starting change consumption loop. BatchSize: (\d*) PollingIntervalMs: \d*",
"BatchSize",
batchSize.ToString());
@"Starting change consumption loop. MaxBatchSize: (\d*) PollingIntervalMs: \d*",
"MaxBatchSize",
maxBatchSize.ToString());
this.StartFunctionHost(
nameof(ProductsTriggerWithValidation),
SupportedLanguages.CSharp,
useTestFolder: true,
customOutputHandler: handler,
environmentVariables: new Dictionary<string, string>() {
{ "TEST_EXPECTED_BATCH_SIZE", batchSize.ToString() },
{ "Sql_Trigger_BatchSize", batchSize.ToString() }
{ "TEST_EXPECTED_MAX_BATCH_SIZE", maxBatchSize.ToString() },
{ "Sql_Trigger_BatchSize", maxBatchSize.ToString() } // Use old BatchSize config
}
);

Expand All @@ -110,8 +111,50 @@ await this.WaitForProductChanges(
() => { this.InsertProducts(firstId, lastId); return Task.CompletedTask; },
id => $"Product {id}",
id => id * 100,
this.GetBatchProcessingTimeout(firstId, lastId, batchSize: batchSize));
await taskCompletionSource.Task.TimeoutAfter(TimeSpan.FromSeconds(5000), "Timed out waiting for BatchSize configuration message");
this.GetBatchProcessingTimeout(firstId, lastId, maxBatchSize: maxBatchSize));
await taskCompletionSource.Task.TimeoutAfter(TimeSpan.FromSeconds(5), "Timed out waiting for MaxBatchSize configuration message");
}

/// <summary>
/// Verifies that manually setting the max batch size correctly changes the number of changes processed at once
/// </summary>
[Fact]
public async Task MaxBatchSizeOverrideTriggerTest()
{
// Use enough items to require 4 batches to be processed but then
// set the max batch size to the same value so they can all be processed in one
// batch. The test will only wait for ~1 batch worth of time so will timeout
// if the max batch size isn't actually changed
const int maxBatchSize = SqlTableChangeMonitor<object>.DefaultMaxBatchSize * 4;
const int firstId = 1;
const int lastId = maxBatchSize;
this.SetChangeTrackingForTable("Products");
var taskCompletionSource = new TaskCompletionSource<bool>();
DataReceivedEventHandler handler = TestUtils.CreateOutputReceievedHandler(
taskCompletionSource,
@"Starting change consumption loop. MaxBatchSize: (\d*) PollingIntervalMs: \d*",
"MaxBatchSize",
maxBatchSize.ToString());
this.StartFunctionHost(
nameof(ProductsTriggerWithValidation),
SupportedLanguages.CSharp,
useTestFolder: true,
customOutputHandler: handler,
environmentVariables: new Dictionary<string, string>() {
{ "TEST_EXPECTED_MAX_BATCH_SIZE", maxBatchSize.ToString() },
{ "Sql_Trigger_MaxBatchSize", maxBatchSize.ToString() }
}
);

await this.WaitForProductChanges(
firstId,
lastId,
SqlChangeOperation.Insert,
() => { this.InsertProducts(firstId, lastId); return Task.CompletedTask; },
id => $"Product {id}",
id => id * 100,
this.GetBatchProcessingTimeout(firstId, lastId, maxBatchSize: maxBatchSize));
await taskCompletionSource.Task.TimeoutAfter(TimeSpan.FromSeconds(5), "Timed out waiting for MaxBatchSize configuration message");
}

/// <summary>
Expand All @@ -124,13 +167,13 @@ public async Task PollingIntervalOverrideTriggerTest()
// Use enough items to require 5 batches to be processed - the test will
// only wait for the expected time and timeout if the default polling
// interval isn't actually modified.
const int lastId = SqlTableChangeMonitor<object>.DefaultBatchSize * 5;
const int lastId = SqlTableChangeMonitor<object>.DefaultMaxBatchSize * 5;
const int pollingIntervalMs = SqlTableChangeMonitor<object>.DefaultPollingIntervalMs / 2;
this.SetChangeTrackingForTable("Products");
var taskCompletionSource = new TaskCompletionSource<bool>();
DataReceivedEventHandler handler = TestUtils.CreateOutputReceievedHandler(
taskCompletionSource,
@"Starting change consumption loop. BatchSize: \d* PollingIntervalMs: (\d*)",
@"Starting change consumption loop. MaxBatchSize: \d* PollingIntervalMs: (\d*)",
"PollingInterval",
pollingIntervalMs.ToString());
this.StartFunctionHost(
Expand All @@ -151,7 +194,7 @@ await this.WaitForProductChanges(
id => $"Product {id}",
id => id * 100,
this.GetBatchProcessingTimeout(firstId, lastId, pollingIntervalMs: pollingIntervalMs));
await taskCompletionSource.Task.TimeoutAfter(TimeSpan.FromSeconds(5000), "Timed out waiting for PollingInterval configuration message");
await taskCompletionSource.Task.TimeoutAfter(TimeSpan.FromSeconds(5), "Timed out waiting for PollingInterval configuration message");
}

/// <summary>
Expand Down
Loading