Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/TriggerBinding/SqlTableChangeMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public SqlTableChangeMonitor(

// Check if there's config settings to override the default batch size/polling interval values
int? configuredBatchSize = configuration.GetValue<int?>(SqlTriggerConstants.ConfigKey_SqlTrigger_BatchSize);
int? configuredPollingInterval = configuration.GetValue<int?>(SqlTriggerConstants.ConfigKey_SqlTrigger_BatchSize);
int? configuredPollingInterval = configuration.GetValue<int?>(SqlTriggerConstants.ConfigKey_SqlTrigger_PollingInterval);
this._batchSize = configuredBatchSize ?? this._batchSize;
this._pollingIntervalInMs = configuredPollingInterval ?? this._pollingIntervalInMs;
var monitorStartProps = new Dictionary<TelemetryPropertyName, string>(telemetryProps)
Expand Down
31 changes: 31 additions & 0 deletions test/Common/TestUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Generic;
using System.Data;
using System.Diagnostics;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;

Expand Down Expand Up @@ -177,5 +178,35 @@ public static async Task<TResult> TimeoutAfter<TResult>(this Task<TResult> task,
throw new TimeoutException(message);
}
}

/// <summary>
/// Creates a DataReceievedEventHandler that will wait for the specified regex and then check that
/// the matched group matches the expected value.
/// </summary>
/// <param name="taskCompletionSource">The task completion source to signal when the value is received</param>
/// <param name="regex">The regex. This must have a single group match for the specific value being looked for</param>
/// <param name="valueName">The name of the value to output if the match fails</param>
/// <param name="expectedValue">The value expected to be equal to the matched group from the regex</param>
/// <returns>The event handler</returns>
public static DataReceivedEventHandler CreateOutputReceievedHandler(TaskCompletionSource<bool> taskCompletionSource, string regex, string valueName, string expectedValue)
{
return (object sender, DataReceivedEventArgs e) =>
{
Match match = Regex.Match(e.Data, regex);
if (match.Success)
{
// We found the line so now check that the group matches our expected value
string actualValue = match.Groups[1].Value;
if (actualValue == expectedValue)
{
taskCompletionSource.SetResult(true);
}
else
{
taskCompletionSource.SetException(new Exception($"Expected {valueName} value of {expectedValue} but got value {actualValue}"));
}
}
};
}
}
}
56 changes: 44 additions & 12 deletions test/Integration/SqlTriggerBindingIntegrationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,30 @@ await this.WaitForProductChanges(
[Fact]
public async Task BatchSizeOverrideTriggerTest()
{
const int batchSize = 20;
// Use enough items to require 4 batches to be processed but then
// set the batch size to the same value so they can all be processed in one
// batch. The test will only wait for ~1 batch worth of time so will timeout
// if the batch size isn't actually changed
const int batchSize = SqlTableChangeMonitor<object>.DefaultBatchSize * 4;
const int firstId = 1;
const int lastId = 40;
const int lastId = batchSize;
this.EnableChangeTrackingForTable("Products");
this.StartFunctionHost(nameof(ProductsTriggerWithValidation), SupportedLanguages.CSharp, true, environmentVariables: new Dictionary<string, string>() {
{ "TEST_EXPECTED_BATCH_SIZE", batchSize.ToString() },
{ "Sql_Trigger_BatchSize", batchSize.ToString() }
});
var taskCompletionSource = new TaskCompletionSource<bool>();
DataReceivedEventHandler handler = TestUtils.CreateOutputReceievedHandler(
taskCompletionSource,
@"Starting change consumption loop. BatchSize: (\d*) PollingIntervalMs: \d*",
"BatchSize",
batchSize.ToString());
this.StartFunctionHost(
nameof(ProductsTriggerWithValidation),
SupportedLanguages.CSharp,
useTestFolder: true,
customOutputHandler: handler,
environmentVariables: new Dictionary<string, string>() {
{ "TEST_EXPECTED_BATCH_SIZE", batchSize.ToString() },
{ "Sql_Trigger_BatchSize", batchSize.ToString() }
}
);

await this.WaitForProductChanges(
firstId,
Expand All @@ -92,6 +108,7 @@ await this.WaitForProductChanges(
id => $"Product {id}",
id => id * 100,
this.GetBatchProcessingTimeout(firstId, lastId, batchSize: batchSize));
await taskCompletionSource.Task.TimeoutAfter(TimeSpan.FromSeconds(5000), "Timed out waiting for BatchSize configuration message");
}

/// <summary>
Expand All @@ -100,13 +117,28 @@ await this.WaitForProductChanges(
[Fact]
public async Task PollingIntervalOverrideTriggerTest()
{
const int pollingIntervalMs = 100;
const int firstId = 1;
const int lastId = 50;
// Use enough items to require 5 batches to be processed - the test will
// only wait for the expected time and timeout if the default polling
// interval isn't actually modified.
const int lastId = SqlTableChangeMonitor<object>.DefaultBatchSize * 5;
const int pollingIntervalMs = SqlTableChangeMonitor<object>.DefaultPollingIntervalMs / 2;
this.EnableChangeTrackingForTable("Products");
this.StartFunctionHost(nameof(ProductsTriggerWithValidation), SupportedLanguages.CSharp, true, environmentVariables: new Dictionary<string, string>() {
{ "Sql_Trigger_PollingIntervalMs", pollingIntervalMs.ToString() }
});
var taskCompletionSource = new TaskCompletionSource<bool>();
DataReceivedEventHandler handler = TestUtils.CreateOutputReceievedHandler(
taskCompletionSource,
@"Starting change consumption loop. BatchSize: \d* PollingIntervalMs: (\d*)",
"PollingInterval",
pollingIntervalMs.ToString());
this.StartFunctionHost(
nameof(ProductsTriggerWithValidation),
SupportedLanguages.CSharp,
useTestFolder: true,
customOutputHandler: handler,
environmentVariables: new Dictionary<string, string>() {
{ "Sql_Trigger_PollingIntervalMs", pollingIntervalMs.ToString() }
}
);

await this.WaitForProductChanges(
firstId,
Expand All @@ -116,9 +148,9 @@ await this.WaitForProductChanges(
id => $"Product {id}",
id => id * 100,
this.GetBatchProcessingTimeout(firstId, lastId, pollingIntervalMs: pollingIntervalMs));
await taskCompletionSource.Task.TimeoutAfter(TimeSpan.FromSeconds(5000), "Timed out waiting for PollingInterval configuration message");
}


/// <summary>
/// Verifies that if several changes have happened to the table row since last invocation, then a single net
/// change for that row is passed to the user function.
Expand Down