From 7150d2b6e1c67e5c20ff2ba34f932d8e4227e4be Mon Sep 17 00:00:00 2001
From: Jatin Sanghvi <20547963+JatinSanghvi@users.noreply.github.com>
Date: Sat, 10 Sep 2022 21:50:24 +0530
Subject: [PATCH 1/6] Add test for multiple function hosts
---
test/Integration/IntegrationTestBase.cs | 86 +++++++++++++------
.../SqlTriggerBindingIntegrationTests.cs | 79 +++++++++++++----
2 files changed, 119 insertions(+), 46 deletions(-)
diff --git a/test/Integration/IntegrationTestBase.cs b/test/Integration/IntegrationTestBase.cs
index 444feedf1..6d5badaf8 100644
--- a/test/Integration/IntegrationTestBase.cs
+++ b/test/Integration/IntegrationTestBase.cs
@@ -1,29 +1,36 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
-using Microsoft.Data.SqlClient;
-using Microsoft.Azure.WebJobs.Extensions.Sql.Tests.Common;
using System;
+using System.Collections.Generic;
using System.Data.Common;
using System.Diagnostics;
using System.IO;
+using System.Linq;
using System.Net.Http;
using System.Reflection;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading.Tasks;
+using Microsoft.Azure.WebJobs.Extensions.Sql.Samples.Common;
+using Microsoft.Azure.WebJobs.Extensions.Sql.Tests.Common;
+using Microsoft.Data.SqlClient;
using Xunit;
using Xunit.Abstractions;
-using Microsoft.Azure.WebJobs.Extensions.Sql.Samples.Common;
namespace Microsoft.Azure.WebJobs.Extensions.Sql.Tests.Integration
{
public class IntegrationTestBase : IDisposable
{
///
- /// Host process for Azure Function CLI
+ /// Host process for Azure Function CLI. Useful when only one host process is involved.
///
- protected Process FunctionHost { get; private set; }
+ protected Process FunctionHost => this.FunctionHostList.FirstOrDefault();
+
+ ///
+ /// Host processes for Azure Function CLI.
+ ///
+ protected List FunctionHostList { get; } = new List();
///
/// Host process for Azurite local storage emulator. This is required for non-HTTP trigger functions:
@@ -164,44 +171,58 @@ protected void StartAzurite()
/// - The functionName is different than its route.
/// - You can start multiple functions by passing in a space-separated list of function names.
///
- protected void StartFunctionHost(string functionName, SupportedLanguages language, bool useTestFolder = false, DataReceivedEventHandler customOutputHandler = null)
+ protected void StartFunctionHost(string functionName, SupportedLanguages language, bool useTestFolder = false, DataReceivedEventHandler[] customOutputHandlers = null)
{
string workingDirectory = useTestFolder ? GetPathToBin() : Path.Combine(GetPathToBin(), "SqlExtensionSamples", Enum.GetName(typeof(SupportedLanguages), language));
if (!Directory.Exists(workingDirectory))
{
throw new FileNotFoundException("Working directory not found at " + workingDirectory);
}
+
+ // Use a different port for each new host process, starting with the default port number: 7071.
+ int port = this.Port + this.FunctionHostList.Count;
+
var startInfo = new ProcessStartInfo
{
// The full path to the Functions CLI is required in the ProcessStartInfo because UseShellExecute is set to false.
// We cannot both use shell execute and redirect output at the same time: https://docs.microsoft.com//dotnet/api/system.diagnostics.processstartinfo.redirectstandardoutput#remarks
FileName = GetFunctionsCoreToolsPath(),
- Arguments = $"start --verbose --port {this.Port} --functions {functionName}",
+ Arguments = $"start --verbose --port {port} --functions {functionName}",
WorkingDirectory = workingDirectory,
WindowStyle = ProcessWindowStyle.Hidden,
RedirectStandardOutput = true,
RedirectStandardError = true,
UseShellExecute = false
};
+
this.TestOutput.WriteLine($"Starting {startInfo.FileName} {startInfo.Arguments} in {startInfo.WorkingDirectory}");
- this.FunctionHost = new Process
+
+ var functionHost = new Process
{
StartInfo = startInfo
};
+ this.FunctionHostList.Add(functionHost);
+
// Register all handlers before starting the functions host process.
var taskCompletionSource = new TaskCompletionSource();
- this.FunctionHost.OutputDataReceived += this.TestOutputHandler;
- this.FunctionHost.OutputDataReceived += SignalStartupHandler;
- this.FunctionHost.OutputDataReceived += customOutputHandler;
+ functionHost.OutputDataReceived += SignalStartupHandler;
- this.FunctionHost.ErrorDataReceived += this.TestOutputHandler;
+ if (customOutputHandlers != null)
+ {
+ foreach (DataReceivedEventHandler handler in customOutputHandlers)
+ {
+ functionHost.OutputDataReceived += handler;
+ }
+ }
- this.FunctionHost.Start();
- this.FunctionHost.BeginOutputReadLine();
- this.FunctionHost.BeginErrorReadLine();
+ functionHost.Start();
+ functionHost.OutputDataReceived += this.GetTestOutputHandler(functionHost.Id);
+ functionHost.ErrorDataReceived += this.GetTestOutputHandler(functionHost.Id);
+ functionHost.BeginOutputReadLine();
+ functionHost.BeginErrorReadLine();
- this.TestOutput.WriteLine($"Waiting for Azure Function host to start...");
+ this.TestOutput.WriteLine("Waiting for Azure Function host to start...");
const int FunctionHostStartupTimeoutInSeconds = 60;
bool isCompleted = taskCompletionSource.Task.Wait(TimeSpan.FromSeconds(FunctionHostStartupTimeoutInSeconds));
@@ -212,8 +233,8 @@ protected void StartFunctionHost(string functionName, SupportedLanguages languag
const int BufferTimeInSeconds = 5;
Task.Delay(TimeSpan.FromSeconds(BufferTimeInSeconds)).Wait();
- this.TestOutput.WriteLine($"Azure Function host started!");
- this.FunctionHost.OutputDataReceived -= SignalStartupHandler;
+ this.TestOutput.WriteLine("Azure Function host started!");
+ functionHost.OutputDataReceived -= SignalStartupHandler;
void SignalStartupHandler(object sender, DataReceivedEventArgs e)
{
@@ -259,11 +280,17 @@ private static string GetFunctionsCoreToolsPath()
return funcPath;
}
- private void TestOutputHandler(object sender, DataReceivedEventArgs e)
+
+ private DataReceivedEventHandler GetTestOutputHandler(int processId)
{
- if (e != null && !string.IsNullOrEmpty(e.Data))
+ return TestOutputHandler;
+
+ void TestOutputHandler(object sender, DataReceivedEventArgs e)
{
- this.TestOutput.WriteLine(e.Data);
+ if (e != null && !string.IsNullOrEmpty(e.Data))
+ {
+ this.TestOutput.WriteLine($"[{processId}] {e.Data}");
+ }
}
}
@@ -343,14 +370,17 @@ public void Dispose()
this.TestOutput.WriteLine($"Failed to close connection. Error: {e1.Message}");
}
- try
- {
- this.FunctionHost?.Kill();
- this.FunctionHost?.Dispose();
- }
- catch (Exception e2)
+ foreach (Process functionHost in this.FunctionHostList)
{
- this.TestOutput.WriteLine($"Failed to stop function host, Error: {e2.Message}");
+ try
+ {
+ functionHost?.Kill();
+ functionHost?.Dispose();
+ }
+ catch (Exception e2)
+ {
+ this.TestOutput.WriteLine($"Failed to stop function host, Error: {e2.Message}");
+ }
}
try
diff --git a/test/Integration/SqlTriggerBindingIntegrationTests.cs b/test/Integration/SqlTriggerBindingIntegrationTests.cs
index c92ab7a83..dd7c6e924 100644
--- a/test/Integration/SqlTriggerBindingIntegrationTests.cs
+++ b/test/Integration/SqlTriggerBindingIntegrationTests.cs
@@ -30,10 +30,10 @@ public SqlTriggerBindingIntegrationTests(ITestOutputHelper output) : base(output
public async Task SingleOperationTriggerTest()
{
this.EnableChangeTrackingForTable("Products");
- this.StartFunctionHost(nameof(ProductsTrigger), Common.SupportedLanguages.CSharp);
var changes = new List>();
- this.MonitorProductChanges(changes);
+ DataReceivedEventHandler[] changeHandlers = new[] { this.GetProductChangeHandler(changes, "SQL Changes: ") };
+ this.StartFunctionHost(nameof(ProductsTrigger), Common.SupportedLanguages.CSharp, useTestFolder: false, changeHandlers);
// Considering the polling interval of 5 seconds and batch-size of 10, it should take around 15 seconds to
// process 30 insert operations. Similar reasoning is used to set delays for update and delete operations.
@@ -65,10 +65,10 @@ public async Task SingleOperationTriggerTest()
public async Task MultiOperationTriggerTest()
{
this.EnableChangeTrackingForTable("Products");
- this.StartFunctionHost(nameof(ProductsTrigger), Common.SupportedLanguages.CSharp);
var changes = new List>();
- this.MonitorProductChanges(changes);
+ DataReceivedEventHandler[] changeHandlers = new[] { this.GetProductChangeHandler(changes, "SQL Changes: ") };
+ this.StartFunctionHost(nameof(ProductsTrigger), Common.SupportedLanguages.CSharp, useTestFolder: false, changeHandlers);
// Insert + multiple updates to a row are treated as single insert with latest row values.
this.InsertProducts(1, 5);
@@ -97,13 +97,51 @@ public async Task MultiOperationTriggerTest()
changes.Clear();
}
+ ///
+ /// Ensures correct functionality with user functions running across multiple functions host processes.
+ ///
+ [Fact]
+ public async Task MultiHostTriggerTest()
+ {
+ this.EnableChangeTrackingForTable("Products");
+
+ var changes = new List>();
+ DataReceivedEventHandler[] changeHandlers = new[] { this.GetProductChangeHandler(changes, "SQL Changes: ") };
+
+ // Prepare three function host processes
+ this.StartFunctionHost(nameof(ProductsTrigger), Common.SupportedLanguages.CSharp, useTestFolder: false, changeHandlers);
+ this.StartFunctionHost(nameof(ProductsTrigger), Common.SupportedLanguages.CSharp, useTestFolder: false, changeHandlers);
+ this.StartFunctionHost(nameof(ProductsTrigger), Common.SupportedLanguages.CSharp, useTestFolder: false, changeHandlers);
+
+ // Considering the polling interval of 5 seconds and batch-size of 10, it should take around 15 seconds to
+ // process 90 insert operations across all functions host processes. Similar reasoning is used to set delays
+ // for update and delete operations.
+ this.InsertProducts(1, 90);
+ await Task.Delay(TimeSpan.FromSeconds(20));
+ ValidateProductChanges(changes, 1, 90, SqlChangeOperation.Insert, id => $"Product {id}", id => id * 100);
+ changes.Clear();
+
+ // All table columns (not just the columns that were updated) would be returned for update operation.
+ this.UpdateProducts(1, 60);
+ await Task.Delay(TimeSpan.FromSeconds(15));
+ ValidateProductChanges(changes, 1, 60, SqlChangeOperation.Update, id => $"Updated Product {id}", id => id * 100);
+ changes.Clear();
+
+ // The properties corresponding to non-primary key columns would be set to the C# type's default values
+ // (null and 0) for delete operation.
+ this.DeleteProducts(31, 90);
+ await Task.Delay(TimeSpan.FromSeconds(15));
+ ValidateProductChanges(changes, 31, 90, SqlChangeOperation.Delete, _ => null, _ => 0);
+ changes.Clear();
+ }
+
///
/// Tests the error message when the user table is not present in the database.
///
[Fact]
public void TableNotPresentTriggerTest()
{
- this.StartFunctionsHostAndWaitForError(
+ this.StartFunctionHostAndWaitForError(
nameof(TableNotPresentTrigger),
true,
"Could not find table: 'dbo.TableNotPresent'.");
@@ -115,7 +153,7 @@ public void TableNotPresentTriggerTest()
[Fact]
public void PrimaryKeyNotCreatedTriggerTest()
{
- this.StartFunctionsHostAndWaitForError(
+ this.StartFunctionHostAndWaitForError(
nameof(PrimaryKeyNotPresentTrigger),
true,
"Could not find primary key created in table: 'dbo.ProductsWithoutPrimaryKey'.");
@@ -128,7 +166,7 @@ public void PrimaryKeyNotCreatedTriggerTest()
[Fact]
public void ReservedPrimaryKeyColumnNamesTriggerTest()
{
- this.StartFunctionsHostAndWaitForError(
+ this.StartFunctionHostAndWaitForError(
nameof(ReservedPrimaryKeyColumnNamesTrigger),
true,
"Found reserved column name(s): '_az_func_ChangeVersion', '_az_func_AttemptCount', '_az_func_LeaseExpirationTime' in table: 'dbo.ProductsWithReservedPrimaryKeyColumnNames'." +
@@ -141,7 +179,7 @@ public void ReservedPrimaryKeyColumnNamesTriggerTest()
[Fact]
public void UnsupportedColumnTypesTriggerTest()
{
- this.StartFunctionsHostAndWaitForError(
+ this.StartFunctionHostAndWaitForError(
nameof(UnsupportedColumnTypesTrigger),
true,
"Found column(s) with unsupported type(s): 'Location' (type: geography), 'Geometry' (type: geometry), 'Organization' (type: hierarchyid)" +
@@ -154,7 +192,7 @@ public void UnsupportedColumnTypesTriggerTest()
[Fact]
public void ChangeTrackingNotEnabledTriggerTest()
{
- this.StartFunctionsHostAndWaitForError(
+ this.StartFunctionHostAndWaitForError(
nameof(ProductsTrigger),
false,
"Could not find change tracking enabled for table: 'dbo.Products'.");
@@ -177,17 +215,22 @@ ALTER TABLE [dbo].[{tableName}]
");
}
- private void MonitorProductChanges(List> changes)
+ private DataReceivedEventHandler GetProductChangeHandler(List> changes, string messagePrefix)
{
- int index = 0;
- string prefix = "SQL Changes: ";
+ return ProductChangeHandler;
- this.FunctionHost.OutputDataReceived += (sender, e) =>
+ void ProductChangeHandler(object sender, DataReceivedEventArgs e)
{
- if (e.Data != null && (index = e.Data.IndexOf(prefix, StringComparison.Ordinal)) >= 0)
+ int index = 0;
+
+ if (e.Data != null && (index = e.Data.IndexOf(messagePrefix, StringComparison.Ordinal)) >= 0)
{
- string json = e.Data[(index + prefix.Length)..];
- changes.AddRange(JsonConvert.DeserializeObject>>(json));
+ string json = e.Data[(index + messagePrefix.Length)..];
+
+ lock (changes)
+ {
+ changes.AddRange(JsonConvert.DeserializeObject>>(json));
+ }
}
};
}
@@ -248,7 +291,7 @@ private static void ValidateProductChanges(List> changes, int
/// Name of the user function that should cause error in trigger listener
/// Whether the functions host should be launched from test folder
/// Expected error message string
- private void StartFunctionsHostAndWaitForError(string functionName, bool useTestFolder, string expectedErrorMessage)
+ private void StartFunctionHostAndWaitForError(string functionName, bool useTestFolder, string expectedErrorMessage)
{
string errorMessage = null;
var tcs = new TaskCompletionSource();
@@ -268,7 +311,7 @@ void OutputHandler(object sender, DataReceivedEventArgs e)
};
// All trigger integration tests are only using C# functions for testing at the moment.
- this.StartFunctionHost(functionName, Common.SupportedLanguages.CSharp, useTestFolder, OutputHandler);
+ this.StartFunctionHost(functionName, Common.SupportedLanguages.CSharp, useTestFolder, new DataReceivedEventHandler[] { OutputHandler });
this.FunctionHost.OutputDataReceived -= OutputHandler;
this.FunctionHost.Kill();
From 8578143f32c2ebcd1d3449a2a6b74fb66ff55b63 Mon Sep 17 00:00:00 2001
From: Jatin Sanghvi <20547963+JatinSanghvi@users.noreply.github.com>
Date: Tue, 13 Sep 2022 15:15:56 +0530
Subject: [PATCH 2/6] Address review comments
---
test/Integration/IntegrationTestBase.cs | 8 ++++----
test/Integration/SqlTriggerBindingIntegrationTests.cs | 2 +-
2 files changed, 5 insertions(+), 5 deletions(-)
diff --git a/test/Integration/IntegrationTestBase.cs b/test/Integration/IntegrationTestBase.cs
index 6d5badaf8..d235dc812 100644
--- a/test/Integration/IntegrationTestBase.cs
+++ b/test/Integration/IntegrationTestBase.cs
@@ -23,7 +23,7 @@ namespace Microsoft.Azure.WebJobs.Extensions.Sql.Tests.Integration
public class IntegrationTestBase : IDisposable
{
///
- /// Host process for Azure Function CLI. Useful when only one host process is involved.
+ /// The first Function Host process that was started. Null if no process has been started yet.
///
protected Process FunctionHost => this.FunctionHostList.FirstOrDefault();
@@ -287,7 +287,7 @@ private DataReceivedEventHandler GetTestOutputHandler(int processId)
void TestOutputHandler(object sender, DataReceivedEventArgs e)
{
- if (e != null && !string.IsNullOrEmpty(e.Data))
+ if (!string.IsNullOrEmpty(e.Data))
{
this.TestOutput.WriteLine($"[{processId}] {e.Data}");
}
@@ -374,8 +374,8 @@ public void Dispose()
{
try
{
- functionHost?.Kill();
- functionHost?.Dispose();
+ functionHost.Kill();
+ functionHost.Dispose();
}
catch (Exception e2)
{
diff --git a/test/Integration/SqlTriggerBindingIntegrationTests.cs b/test/Integration/SqlTriggerBindingIntegrationTests.cs
index dd7c6e924..66d86f9dc 100644
--- a/test/Integration/SqlTriggerBindingIntegrationTests.cs
+++ b/test/Integration/SqlTriggerBindingIntegrationTests.cs
@@ -108,7 +108,7 @@ public async Task MultiHostTriggerTest()
var changes = new List>();
DataReceivedEventHandler[] changeHandlers = new[] { this.GetProductChangeHandler(changes, "SQL Changes: ") };
- // Prepare three function host processes
+ // Prepare three function host processes.
this.StartFunctionHost(nameof(ProductsTrigger), Common.SupportedLanguages.CSharp, useTestFolder: false, changeHandlers);
this.StartFunctionHost(nameof(ProductsTrigger), Common.SupportedLanguages.CSharp, useTestFolder: false, changeHandlers);
this.StartFunctionHost(nameof(ProductsTrigger), Common.SupportedLanguages.CSharp, useTestFolder: false, changeHandlers);
From 6a4550a0589dcd777c352a9e88030bb3196e3c05 Mon Sep 17 00:00:00 2001
From: Jatin Sanghvi <20547963+JatinSanghvi@users.noreply.github.com>
Date: Tue, 4 Oct 2022 15:49:22 +0530
Subject: [PATCH 3/6] Remove unnecessary changes
---
.../SqlTriggerBindingIntegrationTests.cs | 23 +------------------
1 file changed, 1 insertion(+), 22 deletions(-)
diff --git a/test/Integration/SqlTriggerBindingIntegrationTests.cs b/test/Integration/SqlTriggerBindingIntegrationTests.cs
index 6a671dbf9..5ff7903af 100644
--- a/test/Integration/SqlTriggerBindingIntegrationTests.cs
+++ b/test/Integration/SqlTriggerBindingIntegrationTests.cs
@@ -455,21 +455,6 @@ ALTER TABLE [dbo].[{tableName}]
");
}
- private void MonitorProductChanges(List> changes)
- {
- int index = 0;
- string prefix = "SQL Changes: ";
-
- this.FunctionHost.OutputDataReceived += (sender, e) =>
- {
- if (e.Data != null && (index = e.Data.IndexOf(prefix, StringComparison.Ordinal)) >= 0)
- {
- string json = e.Data[(index + prefix.Length)..];
- changes.AddRange(JsonConvert.DeserializeObject>>(json));
- }
- };
- }
-
protected void InsertProducts(int firstId, int lastId)
{
int count = lastId - firstId + 1;
@@ -505,8 +490,6 @@ protected async Task WaitForProductChanges(
int timeoutMs,
string messagePrefix = "SQL Changes: ")
{
- this.LogOutput($"{timeoutMs}");
-
var expectedIds = Enumerable.Range(firstId, lastId - firstId + 1).ToHashSet();
int index = 0;
@@ -528,7 +511,6 @@ void MonitorOutputData(object sender, DataReceivedEventArgs e)
Assert.Equal(getName(product.ProductID), product.Name); // The product has the expected name
Assert.Equal(getCost(product.ProductID), product.Cost); // The product has the expected cost
}
-
if (expectedIds.Count == 0)
{
taskCompletion.SetResult(true);
@@ -545,9 +527,7 @@ void MonitorOutputData(object sender, DataReceivedEventArgs e)
await actions();
// Now wait until either we timeout or we've gotten all the expected changes, whichever comes first
- var stopwatch = Stopwatch.StartNew();
await taskCompletion.Task.TimeoutAfter(TimeSpan.FromMilliseconds(timeoutMs), $"Timed out waiting for {operation} changes.");
- this.LogOutput($"{stopwatch.ElapsedMilliseconds}");
// Unhook handler since we're done monitoring these changes so we aren't checking other changes done later
foreach (Process functionHost in this.FunctionHostList)
@@ -605,14 +585,13 @@ void OutputHandler(object sender, DataReceivedEventArgs e)
/// The batch size if different than the default batch size
/// The polling interval in ms if different than the default polling interval
///
- protected int GetBatchProcessingTimeout(int firstId, int lastId, int batchSize = SqlTableChangeMonitor