From 0c9ad52737bf13b36f9d4b0425cfe9f26c220b13 Mon Sep 17 00:00:00 2001 From: Lucy Zhang Date: Fri, 23 Sep 2022 12:09:39 -0700 Subject: [PATCH 01/17] log primarykeys (#367) --- src/SqlAsyncCollector.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/SqlAsyncCollector.cs b/src/SqlAsyncCollector.cs index 508f82138..b806998c5 100644 --- a/src/SqlAsyncCollector.cs +++ b/src/SqlAsyncCollector.cs @@ -659,7 +659,7 @@ public static async Task RetrieveTableInformationAsync(SqlConn sqlConnProps.Add(TelemetryPropertyName.QueryType, usingInsertQuery ? "insert" : "merge"); sqlConnProps.Add(TelemetryPropertyName.HasIdentityColumn, hasIdentityColumnPrimaryKeys.ToString()); TelemetryInstance.TrackDuration(TelemetryEventName.GetTableInfoEnd, tableInfoSw.ElapsedMilliseconds, sqlConnProps, durations); - logger.LogDebugWithThreadId($"END RetrieveTableInformationAsync Duration={tableInfoSw.ElapsedMilliseconds}ms DB and Table: {sqlConnection.Database}.{fullName}. Primary keys: [{string.Join(",", primaryKeyProperties.Select(pk => pk.Name))}]. SQL Column and Definitions: [{string.Join(",", columnDefinitionsFromSQL)}]"); + logger.LogDebugWithThreadId($"END RetrieveTableInformationAsync Duration={tableInfoSw.ElapsedMilliseconds}ms DB and Table: {sqlConnection.Database}.{fullName}. Primary keys: [{string.Join(",", primaryKeys.Select(pk => pk.Name))}]. SQL Column and Definitions: [{string.Join(",", columnDefinitionsFromSQL)}]"); return new TableInformation(primaryKeyProperties, columnDefinitionsFromSQL, comparer, query, hasIdentityColumnPrimaryKeys); } } From 85e0d9e3b2712cefb794a7489fe3ae7a172be4a1 Mon Sep 17 00:00:00 2001 From: Charles Gagnon Date: Tue, 27 Sep 2022 09:16:21 -0700 Subject: [PATCH 02/17] Port fix to always disable telemetry to main (#373) --- test/Integration/IntegrationTestBase.cs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/test/Integration/IntegrationTestBase.cs b/test/Integration/IntegrationTestBase.cs index 665fbf3fa..4fe1db996 100644 --- a/test/Integration/IntegrationTestBase.cs +++ b/test/Integration/IntegrationTestBase.cs @@ -17,6 +17,7 @@ using Microsoft.Azure.WebJobs.Extensions.Sql.Samples.Common; using Microsoft.AspNetCore.WebUtilities; using System.Collections.Generic; +using static Microsoft.Azure.WebJobs.Extensions.Sql.Telemetry.Telemetry; namespace Microsoft.Azure.WebJobs.Extensions.Sql.Tests.Integration { @@ -194,6 +195,10 @@ protected void StartFunctionHost(string functionName, SupportedLanguages languag RedirectStandardError = true, UseShellExecute = false }; + + // Always disable telemetry during test runs + startInfo.EnvironmentVariables[TelemetryOptoutEnvVar] = "1"; + this.LogOutput($"Starting {startInfo.FileName} {startInfo.Arguments} in {startInfo.WorkingDirectory}"); this.FunctionHost = new Process { From 6a0134918a35a7b5b7df4dee6ee87e0c1bb17d6b Mon Sep 17 00:00:00 2001 From: Charles Gagnon Date: Tue, 27 Sep 2022 13:48:03 -0700 Subject: [PATCH 03/17] Only package extension --- Directory.Build.props | 1 + src/Microsoft.Azure.WebJobs.Extensions.Sql.csproj | 1 + test/Microsoft.Azure.WebJobs.Extensions.Sql.Tests.csproj | 5 ----- 3 files changed, 2 insertions(+), 5 deletions(-) diff --git a/Directory.Build.props b/Directory.Build.props index 58cfe7cb1..50110af53 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -10,5 +10,6 @@ True True $(MSBuildThisFileDirectory)/SQL2003.snk + false \ No newline at end of file diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Sql.csproj b/src/Microsoft.Azure.WebJobs.Extensions.Sql.csproj index 51d0b3807..e7dc1d6b8 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Sql.csproj +++ b/src/Microsoft.Azure.WebJobs.Extensions.Sql.csproj @@ -9,6 +9,7 @@ 99.99.99 true + true true true true diff --git a/test/Microsoft.Azure.WebJobs.Extensions.Sql.Tests.csproj b/test/Microsoft.Azure.WebJobs.Extensions.Sql.Tests.csproj index cad70ffbc..78af40db5 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.Sql.Tests.csproj +++ b/test/Microsoft.Azure.WebJobs.Extensions.Sql.Tests.csproj @@ -1,9 +1,4 @@  - - - false - - From 930e7fd66b27f6b91f92bd1799cc50e500931079 Mon Sep 17 00:00:00 2001 From: Charles Gagnon Date: Mon, 3 Oct 2022 09:57:11 -0700 Subject: [PATCH 04/17] Add site event properties --- src/Telemetry/TelemetryCommonProperties.cs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/Telemetry/TelemetryCommonProperties.cs b/src/Telemetry/TelemetryCommonProperties.cs index 47a153d79..3e46a214e 100644 --- a/src/Telemetry/TelemetryCommonProperties.cs +++ b/src/Telemetry/TelemetryCommonProperties.cs @@ -14,6 +14,8 @@ public class TelemetryCommonProperties private readonly string _productVersion; private readonly string _azureFunctionsEnvironment; private readonly bool _hasWebsiteInstanceId; + private readonly string _websiteInstanceId; + private readonly string _websiteSiteName; private readonly string _functionsWorkerRuntime; public TelemetryCommonProperties(string productVersion, TelemetryClient telemetryClient, IConfiguration config) @@ -21,7 +23,9 @@ public TelemetryCommonProperties(string productVersion, TelemetryClient telemetr this._productVersion = productVersion; this._userLevelCacheWriter = new UserLevelCacheWriter(telemetryClient); this._azureFunctionsEnvironment = config.GetValue(AZURE_FUNCTIONS_ENVIRONMENT_KEY, ""); - this._hasWebsiteInstanceId = config.GetValue(WEBSITE_INSTANCE_ID_KEY, "") != ""; + this._websiteInstanceId = config.GetValue(WEBSITE_INSTANCE_ID_KEY, ""); + this._hasWebsiteInstanceId = !string.IsNullOrEmpty(this._websiteInstanceId); + this._websiteSiteName = config.GetValue(WEBSITE_SITE_NAME_KEY, ""); this._functionsWorkerRuntime = config.GetValue(FUNCTIONS_WORKER_RUNTIME_KEY, ""); } @@ -32,10 +36,13 @@ public TelemetryCommonProperties(string productVersion, TelemetryClient telemetr private const string MachineId = "MachineId"; private const string AzureFunctionsEnvironment = "AzureFunctionsEnvironment"; private const string HasWebsiteInstanceId = "HasWebsiteInstanceId"; + private const string WebsiteInstanceId = "WebsiteInstanceId"; + private const string WebsiteSiteName = "WebsiteSiteName"; private const string FunctionsWorkerRuntime = "FunctionsWorkerRuntime"; private const string AZURE_FUNCTIONS_ENVIRONMENT_KEY = "AZURE_FUNCTIONS_ENVIRONMENT"; private const string WEBSITE_INSTANCE_ID_KEY = "WEBSITE_INSTANCE_ID"; + private const string WEBSITE_SITE_NAME_KEY = "WEBSITE_SITE_NAME"; private const string FUNCTIONS_WORKER_RUNTIME_KEY = "FUNCTIONS_WORKER_RUNTIME"; public Dictionary GetTelemetryCommonProperties() @@ -47,6 +54,8 @@ public Dictionary GetTelemetryCommonProperties() {MachineId, this.GetMachineId()}, {AzureFunctionsEnvironment, this._azureFunctionsEnvironment}, {HasWebsiteInstanceId, this._hasWebsiteInstanceId.ToString()}, + {WebsiteInstanceId, this._websiteInstanceId }, + {WebsiteSiteName, this._websiteSiteName}, {FunctionsWorkerRuntime, this._functionsWorkerRuntime} }; } From 90a953e2256e5faa88c0b522d27f0ac0280d0913 Mon Sep 17 00:00:00 2001 From: Lucy Zhang Date: Mon, 3 Oct 2022 13:25:04 -0700 Subject: [PATCH 05/17] add sql binding java annotations (#379) --- java-library/pom.xml | 120 ++++++++++++++++++ .../functions/sql/annotation/SQLInput.java | 41 ++++++ .../functions/sql/annotation/SQLOutput.java | 30 +++++ 3 files changed, 191 insertions(+) create mode 100644 java-library/pom.xml create mode 100644 java-library/src/main/java/com/microsoft/azure/functions/sql/annotation/SQLInput.java create mode 100644 java-library/src/main/java/com/microsoft/azure/functions/sql/annotation/SQLOutput.java diff --git a/java-library/pom.xml b/java-library/pom.xml new file mode 100644 index 000000000..0392c154a --- /dev/null +++ b/java-library/pom.xml @@ -0,0 +1,120 @@ + + + 4.0.0 + + com.microsoft.azure.functions + azure-functions-java-library-sql + 0.0.0 + jar + + + com.microsoft.maven + java-8-parent + 8.0.1 + + + Microsoft Azure Functions Java SQL Types + This package contains all Java annotations to interact with Microsoft Azure Functions runtime for SQL Bindings. + https://aka.ms/sqlbindings + + Microsoft Azure + https://azure.microsoft.com + + + + UTF-8 + + + + + MIT License + https://opensource.org/licenses/MIT + repo + + + + + scm:git:https://github.com/Azure/azure-functions-sql-extension + scm:git:git@github.com:Azure/azure-functions-sql-extension + https://github.com/Azure/azure-functions-sql-extension + HEAD + + + + + LucyZhang + Lucy Zhang + luczhan@microsoft.com + + + + + + ossrh + Sonatype Snapshots + https://oss.sonatype.org/content/repositories/snapshots/ + true + default + + + + + + maven.snapshots + Maven Central Snapshot Repository + https://oss.sonatype.org/content/repositories/snapshots/ + + false + + + true + + + + + + + com.microsoft.azure.functions + azure-functions-java-library + 1.4.2 + + + + + + + maven-compiler-plugin + ${maven-compiler.version} + + + org.apache.maven.plugins + maven-source-plugin + ${maven-source.version} + + + attach-sources + + jar + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + ${maven-javadoc.version} + + none + + + + attach-javadocs + + jar + + + + + + + \ No newline at end of file diff --git a/java-library/src/main/java/com/microsoft/azure/functions/sql/annotation/SQLInput.java b/java-library/src/main/java/com/microsoft/azure/functions/sql/annotation/SQLInput.java new file mode 100644 index 000000000..258b2b647 --- /dev/null +++ b/java-library/src/main/java/com/microsoft/azure/functions/sql/annotation/SQLInput.java @@ -0,0 +1,41 @@ +/** + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See License.txt in the project root for + * license information. + */ + +package com.microsoft.azure.functions.sql.annotation; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.ElementType; + +import com.microsoft.azure.functions.annotation.CustomBinding; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.PARAMETER) +@CustomBinding(direction = "in", name = "sql", type = "sql") +public @interface SQLInput { + /** + * Query string or name of stored procedure to be run. + */ + String commandText() default ""; + + /** + * Text or Stored Procedure. + */ + String commandType() default ""; + + /** + * Parameters to the query or stored procedure. This string must follow the format + * "@param1=param1,@param2=param2" where @param1 is the name of the parameter and + * param1 is the parameter value. + */ + String parameters() default ""; + + /** + * Setting name for SQL connection string. + */ + String connectionStringSetting() default ""; +} diff --git a/java-library/src/main/java/com/microsoft/azure/functions/sql/annotation/SQLOutput.java b/java-library/src/main/java/com/microsoft/azure/functions/sql/annotation/SQLOutput.java new file mode 100644 index 000000000..2cec96561 --- /dev/null +++ b/java-library/src/main/java/com/microsoft/azure/functions/sql/annotation/SQLOutput.java @@ -0,0 +1,30 @@ +/** + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See License.txt in the project root for + * license information. + */ + + package com.microsoft.azure.functions.sql.annotation; + + import java.lang.annotation.Retention; + import java.lang.annotation.Target; + import java.lang.annotation.RetentionPolicy; + import java.lang.annotation.ElementType; + + import com.microsoft.azure.functions.annotation.CustomBinding; + + +@Retention(RetentionPolicy.RUNTIME) +@Target({ ElementType.PARAMETER, ElementType.METHOD }) +@CustomBinding(direction = "out", name = "sql", type = "sql") +public @interface SQLOutput { + /** + * Name of the table to upsert data to. + */ + String commandText() default ""; + + /** + * Setting name for SQL connection string. + */ + String connectionStringSetting() default ""; +} From 2b043b8be394886b7a675b700476ba1b74e18ae8 Mon Sep 17 00:00:00 2001 From: Jatin Sanghvi <20547963+JatinSanghvi@users.noreply.github.com> Date: Wed, 5 Oct 2022 20:07:44 +0530 Subject: [PATCH 06/17] Add test for multiple function hosts (#344) --- performance/SqlTriggerBindingPerformance.cs | 2 +- test/Integration/IntegrationTestBase.cs | 78 ++++++----- .../SqlTriggerBindingIntegrationTests.cs | 122 ++++++++++++------ 3 files changed, 132 insertions(+), 70 deletions(-) diff --git a/performance/SqlTriggerBindingPerformance.cs b/performance/SqlTriggerBindingPerformance.cs index 60e3053f5..43a63f5ea 100644 --- a/performance/SqlTriggerBindingPerformance.cs +++ b/performance/SqlTriggerBindingPerformance.cs @@ -32,7 +32,7 @@ await this.WaitForProductChanges( () => { this.InsertProducts(1, count); return Task.CompletedTask; }, id => $"Product {id}", id => id * 100, - GetBatchProcessingTimeout(1, count)); + this.GetBatchProcessingTimeout(1, count)); } [IterationCleanup] diff --git a/test/Integration/IntegrationTestBase.cs b/test/Integration/IntegrationTestBase.cs index 95170d4cd..99c958cc9 100644 --- a/test/Integration/IntegrationTestBase.cs +++ b/test/Integration/IntegrationTestBase.cs @@ -1,23 +1,24 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. -using Microsoft.Data.SqlClient; -using Microsoft.Azure.WebJobs.Extensions.Sql.Tests.Common; using System; +using System.Collections.Generic; using System.Data.Common; using System.Diagnostics; using System.IO; +using System.Linq; using System.Net.Http; using System.Reflection; using System.Runtime.InteropServices; using System.Text; using System.Threading.Tasks; +using Microsoft.AspNetCore.WebUtilities; +using Microsoft.Azure.WebJobs.Extensions.Sql.Samples.Common; +using Microsoft.Azure.WebJobs.Extensions.Sql.Tests.Common; +using Microsoft.Data.SqlClient; using Xunit; using Xunit.Abstractions; -using Microsoft.Azure.WebJobs.Extensions.Sql.Samples.Common; -using Microsoft.AspNetCore.WebUtilities; -using System.Collections.Generic; -using System.Linq; + using static Microsoft.Azure.WebJobs.Extensions.Sql.Telemetry.Telemetry; namespace Microsoft.Azure.WebJobs.Extensions.Sql.Tests.Integration @@ -25,9 +26,14 @@ namespace Microsoft.Azure.WebJobs.Extensions.Sql.Tests.Integration public class IntegrationTestBase : IDisposable { /// - /// Host process for Azure Function CLI + /// The first Function Host process that was started. Null if no process has been started yet. + /// + protected Process FunctionHost => this.FunctionHostList.FirstOrDefault(); + + /// + /// Host processes for Azure Function CLI. /// - protected Process FunctionHost { get; private set; } + protected List FunctionHostList { get; } = new List(); /// /// Host process for Azurite local storage emulator. This is required for non-HTTP trigger functions: @@ -184,12 +190,16 @@ protected void StartFunctionHost(string functionName, SupportedLanguages languag { throw new FileNotFoundException("Working directory not found at " + workingDirectory); } + + // Use a different port for each new host process, starting with the default port number: 7071. + int port = this.Port + this.FunctionHostList.Count; + var startInfo = new ProcessStartInfo { // The full path to the Functions CLI is required in the ProcessStartInfo because UseShellExecute is set to false. // We cannot both use shell execute and redirect output at the same time: https://docs.microsoft.com//dotnet/api/system.diagnostics.processstartinfo.redirectstandardoutput#remarks FileName = GetFunctionsCoreToolsPath(), - Arguments = $"start --verbose --port {this.Port} --functions {functionName}", + Arguments = $"start --verbose --port {port} --functions {functionName}", WorkingDirectory = workingDirectory, WindowStyle = ProcessWindowStyle.Hidden, RedirectStandardOutput = true, @@ -205,24 +215,26 @@ protected void StartFunctionHost(string functionName, SupportedLanguages languag startInfo.EnvironmentVariables[TelemetryOptoutEnvVar] = "1"; this.LogOutput($"Starting {startInfo.FileName} {startInfo.Arguments} in {startInfo.WorkingDirectory}"); - this.FunctionHost = new Process + + var functionHost = new Process { StartInfo = startInfo }; + this.FunctionHostList.Add(functionHost); + // Register all handlers before starting the functions host process. var taskCompletionSource = new TaskCompletionSource(); - this.FunctionHost.OutputDataReceived += this.TestOutputHandler; - this.FunctionHost.OutputDataReceived += SignalStartupHandler; + functionHost.OutputDataReceived += SignalStartupHandler; this.FunctionHost.OutputDataReceived += customOutputHandler; - this.FunctionHost.ErrorDataReceived += this.TestOutputHandler; + functionHost.Start(); + functionHost.OutputDataReceived += this.GetTestOutputHandler(functionHost.Id); + functionHost.ErrorDataReceived += this.GetTestOutputHandler(functionHost.Id); + functionHost.BeginOutputReadLine(); + functionHost.BeginErrorReadLine(); - this.FunctionHost.Start(); - this.FunctionHost.BeginOutputReadLine(); - this.FunctionHost.BeginErrorReadLine(); - - this.LogOutput($"Waiting for Azure Function host to start..."); + this.LogOutput("Waiting for Azure Function host to start..."); const int FunctionHostStartupTimeoutInSeconds = 60; bool isCompleted = taskCompletionSource.Task.Wait(TimeSpan.FromSeconds(FunctionHostStartupTimeoutInSeconds)); @@ -233,7 +245,7 @@ protected void StartFunctionHost(string functionName, SupportedLanguages languag const int BufferTimeInSeconds = 5; Task.Delay(TimeSpan.FromSeconds(BufferTimeInSeconds)).Wait(); - this.LogOutput($"Azure Function host started!"); + this.LogOutput("Azure Function host started!"); this.FunctionHost.OutputDataReceived -= SignalStartupHandler; void SignalStartupHandler(object sender, DataReceivedEventArgs e) @@ -293,11 +305,16 @@ private void LogOutput(string output) } } - private void TestOutputHandler(object sender, DataReceivedEventArgs e) + private DataReceivedEventHandler GetTestOutputHandler(int processId) { - if (e != null && !string.IsNullOrEmpty(e.Data)) + return TestOutputHandler; + + void TestOutputHandler(object sender, DataReceivedEventArgs e) { - this.LogOutput(e.Data); + if (!string.IsNullOrEmpty(e.Data)) + { + this.LogOutput($"[{processId}] {e.Data}"); + } } } @@ -377,14 +394,17 @@ public void Dispose() this.LogOutput($"Failed to close connection. Error: {e1.Message}"); } - try - { - this.FunctionHost?.Kill(); - this.FunctionHost?.Dispose(); - } - catch (Exception e2) + foreach (Process functionHost in this.FunctionHostList) { - this.LogOutput($"Failed to stop function host, Error: {e2.Message}"); + try + { + functionHost.Kill(); + functionHost.Dispose(); + } + catch (Exception e2) + { + this.LogOutput($"Failed to stop function host, Error: {e2.Message}"); + } } try diff --git a/test/Integration/SqlTriggerBindingIntegrationTests.cs b/test/Integration/SqlTriggerBindingIntegrationTests.cs index ff5285e0c..baca890af 100644 --- a/test/Integration/SqlTriggerBindingIntegrationTests.cs +++ b/test/Integration/SqlTriggerBindingIntegrationTests.cs @@ -41,7 +41,7 @@ await this.WaitForProductChanges( () => { this.InsertProducts(firstId, lastId); return Task.CompletedTask; }, id => $"Product {id}", id => id * 100, - GetBatchProcessingTimeout(firstId, lastId)); + this.GetBatchProcessingTimeout(firstId, lastId)); firstId = 1; lastId = 20; @@ -53,7 +53,7 @@ await this.WaitForProductChanges( () => { this.UpdateProducts(firstId, lastId); return Task.CompletedTask; }, id => $"Updated Product {id}", id => id * 100, - GetBatchProcessingTimeout(firstId, lastId)); + this.GetBatchProcessingTimeout(firstId, lastId)); firstId = 11; lastId = 30; @@ -66,7 +66,7 @@ await this.WaitForProductChanges( () => { this.DeleteProducts(firstId, lastId); return Task.CompletedTask; }, _ => null, _ => 0, - GetBatchProcessingTimeout(firstId, lastId)); + this.GetBatchProcessingTimeout(firstId, lastId)); } /// @@ -91,7 +91,7 @@ await this.WaitForProductChanges( () => { this.InsertProducts(firstId, lastId); return Task.CompletedTask; }, id => $"Product {id}", id => id * 100, - GetBatchProcessingTimeout(firstId, lastId, batchSize: batchSize)); + this.GetBatchProcessingTimeout(firstId, lastId, batchSize: batchSize)); } /// @@ -115,7 +115,7 @@ await this.WaitForProductChanges( () => { this.InsertProducts(firstId, lastId); return Task.CompletedTask; }, id => $"Product {id}", id => id * 100, - GetBatchProcessingTimeout(firstId, lastId, pollingIntervalMs: pollingIntervalMs)); + this.GetBatchProcessingTimeout(firstId, lastId, pollingIntervalMs: pollingIntervalMs)); } @@ -145,7 +145,7 @@ await this.WaitForProductChanges( }, id => $"Updated Updated Product {id}", id => id * 100, - GetBatchProcessingTimeout(firstId, lastId)); + this.GetBatchProcessingTimeout(firstId, lastId)); firstId = 6; lastId = 10; @@ -162,7 +162,7 @@ await this.WaitForProductChanges( }, id => $"Product {id}", id => id * 100, - GetBatchProcessingTimeout(firstId, lastId)); + this.GetBatchProcessingTimeout(firstId, lastId)); firstId = 6; lastId = 10; @@ -179,7 +179,7 @@ await this.WaitForProductChanges( }, id => $"Updated Updated Product {id}", id => id * 100, - GetBatchProcessingTimeout(firstId, lastId)); + this.GetBatchProcessingTimeout(firstId, lastId)); firstId = 11; lastId = 20; @@ -197,10 +197,9 @@ await this.WaitForProductChanges( }, _ => null, _ => 0, - GetBatchProcessingTimeout(firstId, lastId)); + this.GetBatchProcessingTimeout(firstId, lastId)); } - /// /// Ensures correct functionality with multiple user functions tracking the same table. /// @@ -229,7 +228,7 @@ public async Task MultiFunctionTriggerTest() }, id => $"Product {id}", id => id * 100, - GetBatchProcessingTimeout(firstId, lastId), + this.GetBatchProcessingTimeout(firstId, lastId), Trigger1Changes ); @@ -244,7 +243,7 @@ public async Task MultiFunctionTriggerTest() }, id => $"Product {id}", id => id * 100, - GetBatchProcessingTimeout(firstId, lastId), + this.GetBatchProcessingTimeout(firstId, lastId), Trigger2Changes ); @@ -267,7 +266,7 @@ public async Task MultiFunctionTriggerTest() }, id => $"Updated Product {id}", id => id * 100, - GetBatchProcessingTimeout(firstId, lastId), + this.GetBatchProcessingTimeout(firstId, lastId), Trigger1Changes); // Set up monitoring for Trigger 2... @@ -281,7 +280,7 @@ public async Task MultiFunctionTriggerTest() }, id => $"Updated Product {id}", id => id * 100, - GetBatchProcessingTimeout(firstId, lastId), + this.GetBatchProcessingTimeout(firstId, lastId), Trigger2Changes); // Now that monitoring is set up make the changes and then wait for the monitoring tasks to see them and complete @@ -304,7 +303,7 @@ public async Task MultiFunctionTriggerTest() }, _ => null, _ => 0, - GetBatchProcessingTimeout(firstId, lastId), + this.GetBatchProcessingTimeout(firstId, lastId), Trigger1Changes); // Set up monitoring for Trigger 2... @@ -318,7 +317,7 @@ public async Task MultiFunctionTriggerTest() }, _ => null, _ => 0, - GetBatchProcessingTimeout(firstId, lastId), + this.GetBatchProcessingTimeout(firstId, lastId), Trigger2Changes); // Now that monitoring is set up make the changes and then wait for the monitoring tasks to see them and complete @@ -326,13 +325,63 @@ public async Task MultiFunctionTriggerTest() await Task.WhenAll(changes1Task, changes2Task); } + /// + /// Ensures correct functionality with user functions running across multiple functions host processes. + /// + [Fact] + public async Task MultiHostTriggerTest() + { + this.EnableChangeTrackingForTable("Products"); + + // Prepare three function host processes. + this.StartFunctionHost(nameof(ProductsTrigger), SupportedLanguages.CSharp); + this.StartFunctionHost(nameof(ProductsTrigger), SupportedLanguages.CSharp); + this.StartFunctionHost(nameof(ProductsTrigger), SupportedLanguages.CSharp); + + int firstId = 1; + int lastId = 90; + await this.WaitForProductChanges( + firstId, + lastId, + SqlChangeOperation.Insert, + () => { this.InsertProducts(firstId, lastId); return Task.CompletedTask; }, + id => $"Product {id}", + id => id * 100, + this.GetBatchProcessingTimeout(firstId, lastId)); + + firstId = 1; + lastId = 60; + // All table columns (not just the columns that were updated) would be returned for update operation. + await this.WaitForProductChanges( + firstId, + lastId, + SqlChangeOperation.Update, + () => { this.UpdateProducts(firstId, lastId); return Task.CompletedTask; }, + id => $"Updated Product {id}", + id => id * 100, + this.GetBatchProcessingTimeout(firstId, lastId)); + + firstId = 31; + lastId = 90; + // The properties corresponding to non-primary key columns would be set to the C# type's default values + // (null and 0) for delete operation. + await this.WaitForProductChanges( + firstId, + lastId, + SqlChangeOperation.Delete, + () => { this.DeleteProducts(firstId, lastId); return Task.CompletedTask; }, + _ => null, + _ => 0, + this.GetBatchProcessingTimeout(firstId, lastId)); + } + /// /// Tests the error message when the user table is not present in the database. /// [Fact] public void TableNotPresentTriggerTest() { - this.StartFunctionsHostAndWaitForError( + this.StartFunctionHostAndWaitForError( nameof(TableNotPresentTrigger), true, "Could not find table: 'dbo.TableNotPresent'."); @@ -344,7 +393,7 @@ public void TableNotPresentTriggerTest() [Fact] public void PrimaryKeyNotCreatedTriggerTest() { - this.StartFunctionsHostAndWaitForError( + this.StartFunctionHostAndWaitForError( nameof(PrimaryKeyNotPresentTrigger), true, "Could not find primary key created in table: 'dbo.ProductsWithoutPrimaryKey'."); @@ -357,7 +406,7 @@ public void PrimaryKeyNotCreatedTriggerTest() [Fact] public void ReservedPrimaryKeyColumnNamesTriggerTest() { - this.StartFunctionsHostAndWaitForError( + this.StartFunctionHostAndWaitForError( nameof(ReservedPrimaryKeyColumnNamesTrigger), true, "Found reserved column name(s): '_az_func_ChangeVersion', '_az_func_AttemptCount', '_az_func_LeaseExpirationTime' in table: 'dbo.ProductsWithReservedPrimaryKeyColumnNames'." + @@ -370,7 +419,7 @@ public void ReservedPrimaryKeyColumnNamesTriggerTest() [Fact] public void UnsupportedColumnTypesTriggerTest() { - this.StartFunctionsHostAndWaitForError( + this.StartFunctionHostAndWaitForError( nameof(UnsupportedColumnTypesTrigger), true, "Found column(s) with unsupported type(s): 'Location' (type: geography), 'Geometry' (type: geometry), 'Organization' (type: hierarchyid)" + @@ -383,7 +432,7 @@ public void UnsupportedColumnTypesTriggerTest() [Fact] public void ChangeTrackingNotEnabledTriggerTest() { - this.StartFunctionsHostAndWaitForError( + this.StartFunctionHostAndWaitForError( nameof(ProductsTrigger), false, "Could not find change tracking enabled for table: 'dbo.Products'."); @@ -406,20 +455,6 @@ ALTER TABLE [dbo].[{tableName}] "); } - private void MonitorProductChanges(List> changes, string messagePrefix) - { - int index = 0; - - this.FunctionHost.OutputDataReceived += (sender, e) => - { - if (e.Data != null && (index = e.Data.IndexOf(messagePrefix, StringComparison.Ordinal)) >= 0) - { - string json = e.Data[(index + messagePrefix.Length)..]; - changes.AddRange(JsonConvert.DeserializeObject>>(json)); - } - }; - } - protected void InsertProducts(int firstId, int lastId) { int count = lastId - firstId + 1; @@ -483,7 +518,10 @@ void MonitorOutputData(object sender, DataReceivedEventArgs e) } }; // Set up listener for the changes coming in - this.FunctionHost.OutputDataReceived += MonitorOutputData; + foreach (Process functionHost in this.FunctionHostList) + { + functionHost.OutputDataReceived += MonitorOutputData; + } // Now that we've set up our listener trigger the actions to monitor await actions(); @@ -492,7 +530,10 @@ void MonitorOutputData(object sender, DataReceivedEventArgs e) await taskCompletion.Task.TimeoutAfter(TimeSpan.FromMilliseconds(timeoutMs), $"Timed out waiting for {operation} changes."); // Unhook handler since we're done monitoring these changes so we aren't checking other changes done later - this.FunctionHost.OutputDataReceived -= MonitorOutputData; + foreach (Process functionHost in this.FunctionHostList) + { + functionHost.OutputDataReceived -= MonitorOutputData; + } } /// @@ -502,7 +543,7 @@ void MonitorOutputData(object sender, DataReceivedEventArgs e) /// Name of the user function that should cause error in trigger listener /// Whether the functions host should be launched from test folder /// Expected error message string - private void StartFunctionsHostAndWaitForError(string functionName, bool useTestFolder, string expectedErrorMessage) + private void StartFunctionHostAndWaitForError(string functionName, bool useTestFolder, string expectedErrorMessage) { string errorMessage = null; var tcs = new TaskCompletionSource(); @@ -544,10 +585,11 @@ void OutputHandler(object sender, DataReceivedEventArgs e) /// The batch size if different than the default batch size /// The polling interval in ms if different than the default polling interval /// - protected static int GetBatchProcessingTimeout(int firstId, int lastId, int batchSize = SqlTableChangeMonitor.DefaultBatchSize, int pollingIntervalMs = SqlTableChangeMonitor.DefaultPollingIntervalMs) + protected int GetBatchProcessingTimeout(int firstId, int lastId, int batchSize = SqlTableChangeMonitor.DefaultBatchSize, int pollingIntervalMs = SqlTableChangeMonitor.DefaultPollingIntervalMs) { int changesToProcess = lastId - firstId + 1; - int calculatedTimeout = (int)(Math.Ceiling((double)changesToProcess / batchSize) // The number of batches to process + int calculatedTimeout = (int)(Math.Ceiling((double)changesToProcess / batchSize // The number of batches to process + / this.FunctionHostList.Count) // The number of function host processes * pollingIntervalMs // The length to process each batch * 2); // Double to add buffer time for processing results return Math.Max(calculatedTimeout, 2000); // Always have a timeout of at least 2sec to ensure we have time for processing the results From ed8b2681deae55a365bc14072c57b9545853dff8 Mon Sep 17 00:00:00 2001 From: AmeyaRele <35621237+AmeyaRele@users.noreply.github.com> Date: Mon, 10 Oct 2022 09:17:12 +0530 Subject: [PATCH 07/17] Add comment for explaining token cancellation (#387) --- src/TriggerBinding/SqlTableChangeMonitor.cs | 1 + src/TriggerBinding/SqlTriggerListener.cs | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/src/TriggerBinding/SqlTableChangeMonitor.cs b/src/TriggerBinding/SqlTableChangeMonitor.cs index b07373f56..e52adc093 100644 --- a/src/TriggerBinding/SqlTableChangeMonitor.cs +++ b/src/TriggerBinding/SqlTableChangeMonitor.cs @@ -156,6 +156,7 @@ public SqlTableChangeMonitor( public void Dispose() { + // When the CheckForChanges loop is finished, it will cancel the lease renewal loop. this._cancellationTokenSourceCheckForChanges.Cancel(); } diff --git a/src/TriggerBinding/SqlTriggerListener.cs b/src/TriggerBinding/SqlTriggerListener.cs index e426f6f7a..bb485f629 100644 --- a/src/TriggerBinding/SqlTriggerListener.cs +++ b/src/TriggerBinding/SqlTriggerListener.cs @@ -115,7 +115,6 @@ public async Task StartAsync(CancellationToken cancellationToken) this._logger.LogInformation($"Starting SQL trigger listener for table: '{this._userTable.FullName}', function ID: '{this._userFunctionId}'."); - // TODO: Check if passing the cancellation token would be beneficial. this._changeMonitor = new SqlTableChangeMonitor( this._connectionString, userTableId, From 339b874530be567fd6dc04c3f79a96c3dd502180 Mon Sep 17 00:00:00 2001 From: Lucy Zhang Date: Mon, 10 Oct 2022 18:03:18 -0700 Subject: [PATCH 08/17] Allow columns with default values to be excluded from the POCO (#388) * allow missing default columns in poco * add test * query format * add default column section to readme * fix wording --- README.md | 14 +++++-- samples/samples-csharp/Common/Product.cs | 7 ++++ .../Database/Tables/ProductsWithDefaultPK.sql | 5 +++ .../AddProductWithDefaultPK.cs | 31 ++++++++++++++ .../AddProductWithDefaultPK/function.json | 27 ++++++++++++ .../AddProductWithDefaultPK/index.js | 16 +++++++ src/SqlAsyncCollector.cs | 42 ++++++++++++------- .../SqlOutputBindingIntegrationTests.cs | 20 +++++++++ 8 files changed, 143 insertions(+), 19 deletions(-) create mode 100644 samples/samples-csharp/Database/Tables/ProductsWithDefaultPK.sql create mode 100644 samples/samples-csharp/OutputBindingSamples/AddProductWithDefaultPK.cs create mode 100644 samples/samples-js/AddProductWithDefaultPK/function.json create mode 100644 samples/samples-js/AddProductWithDefaultPK/index.js diff --git a/README.md b/README.md index be424d058..78b21974a 100644 --- a/README.md +++ b/README.md @@ -43,7 +43,7 @@ Azure SQL bindings for Azure Functions are supported for: - [ICollector<T>/IAsyncCollector<T>](#icollectortiasynccollectort) - [Array](#array) - [Single Row](#single-row) - - [Primary Keys and Identity Columns](#primary-keys-and-identity-columns) + - [Primary Key Special Cases](#primary-key-special-cases) - [Known Issues](#known-issues) - [Telemetry](#telemetry) - [Trademarks](#trademarks) @@ -756,20 +756,26 @@ public static IActionResult Run( } ``` -#### Primary Keys and Identity Columns +#### Primary Key Special Cases Normally Output Bindings require two things : 1. The table being upserted to contains a Primary Key constraint (composed of one or more columns) 2. Each of those columns must be present in the POCO object used in the attribute -If either of these are false then an error will be thrown. +Normally if either of these are false then an error will be thrown. Below are the situations in which this is not the case : -This changes if one of the primary key columns is an identity column though. In that case there are two options based on how the function defines the output object: +##### Identity Columns +In the case where one of the primary key columns is an identity column, there are two options based on how the function defines the output object: 1. If the identity column isn't included in the output object then a straight insert is always performed with the other column values. See [AddProductWithIdentityColumn](./samples/samples-csharp/OutputBindingSamples/AddProductWithIdentityColumn.cs) for an example. 2. If the identity column is included (even if it's an optional nullable value) then a merge is performed similar to what happens when no identity column is present. This merge will either insert a new row or update an existing row based on the existence of a row that matches the primary keys (including the identity column). See [AddProductWithIdentityColumnIncluded](./samples/samples-csharp/OutputBindingSamples/AddProductWithIdentityColumnIncluded.cs) for an example. +##### Columns with Default Values +In the case where one of the primary key columns has a default value, there are also two options based on how the function defines the output object: +1. If the column with a default value is not included in the output object, then a straight insert is always performed with the other values. See [AddProductWithDefaultPK](./samples/samples-csharp/OutputBindingSamples/AddProductWithDefaultPK.cs) for an example. +2. If the column with a default value is included then a merge is performed similar to what happens when no default column is present. If there is a nullable column with a default value, then the provided column value in the output object will be upserted even if it is null. + ## Known Issues - Output bindings against tables with columns of data types `NTEXT`, `TEXT`, or `IMAGE` are not supported and data upserts will fail. These types [will be removed](https://docs.microsoft.com/sql/t-sql/data-types/ntext-text-and-image-transact-sql) in a future version of SQL Server and are not compatible with the `OPENJSON` function used by this Azure Functions binding. diff --git a/samples/samples-csharp/Common/Product.cs b/samples/samples-csharp/Common/Product.cs index 1aff84662..2f2bdb1e9 100644 --- a/samples/samples-csharp/Common/Product.cs +++ b/samples/samples-csharp/Common/Product.cs @@ -25,4 +25,11 @@ public class ProductName { public string Name { get; set; } } + + public class ProductWithDefaultPK + { + public string Name { get; set; } + + public int Cost { get; set; } + } } \ No newline at end of file diff --git a/samples/samples-csharp/Database/Tables/ProductsWithDefaultPK.sql b/samples/samples-csharp/Database/Tables/ProductsWithDefaultPK.sql new file mode 100644 index 000000000..e353a3c37 --- /dev/null +++ b/samples/samples-csharp/Database/Tables/ProductsWithDefaultPK.sql @@ -0,0 +1,5 @@ +CREATE TABLE [ProductsWithDefaultPK] ( + [ProductGuid] [uniqueidentifier] PRIMARY KEY NOT NULL DEFAULT(newsequentialid()), + [Name] [nvarchar](100) NULL, + [Cost] [int] NULL +) \ No newline at end of file diff --git a/samples/samples-csharp/OutputBindingSamples/AddProductWithDefaultPK.cs b/samples/samples-csharp/OutputBindingSamples/AddProductWithDefaultPK.cs new file mode 100644 index 000000000..8e343fa93 --- /dev/null +++ b/samples/samples-csharp/OutputBindingSamples/AddProductWithDefaultPK.cs @@ -0,0 +1,31 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using Microsoft.AspNetCore.Mvc; +using Microsoft.Azure.WebJobs.Extensions.Http; +using Microsoft.Azure.WebJobs.Extensions.Sql.Samples.Common; + +namespace Microsoft.Azure.WebJobs.Extensions.Sql.Samples.OutputBindingSamples +{ + + public static class AddProductWithDefaultPK + { + /// + /// This shows an example of a SQL Output binding where the target table has a default primary key + /// of type uniqueidentifier and the column is not included in the output object. A new row will + /// be inserted and the uniqueidentifier will be generated by the engine. + /// + /// The original request that triggered the function + /// The created ProductWithDefaultPK object + /// The CreatedResult containing the new object that was inserted + [FunctionName(nameof(AddProductWithDefaultPK))] + public static IActionResult Run( + [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "addproductwithdefaultpk")] + [FromBody] ProductWithDefaultPK product, + [Sql("dbo.ProductsWithDefaultPK", ConnectionStringSetting = "SqlConnectionString")] out ProductWithDefaultPK output) + { + output = product; + return new CreatedResult($"/api/addproductwithdefaultpk", output); + } + } +} \ No newline at end of file diff --git a/samples/samples-js/AddProductWithDefaultPK/function.json b/samples/samples-js/AddProductWithDefaultPK/function.json new file mode 100644 index 000000000..c0afd493f --- /dev/null +++ b/samples/samples-js/AddProductWithDefaultPK/function.json @@ -0,0 +1,27 @@ +{ + "bindings": [ + { + "authLevel": "function", + "name": "req", + "direction": "in", + "type": "httpTrigger", + "methods": [ + "post" + ], + "route": "addproductwithdefaultpk" + }, + { + "name": "$return", + "type": "http", + "direction": "out" + }, + { + "name": "products", + "type": "sql", + "direction": "out", + "commandText": "[dbo].[ProductsWithDefaultPK]", + "connectionStringSetting": "SqlConnectionString" + } + ], + "disabled": false + } \ No newline at end of file diff --git a/samples/samples-js/AddProductWithDefaultPK/index.js b/samples/samples-js/AddProductWithDefaultPK/index.js new file mode 100644 index 000000000..26448eea8 --- /dev/null +++ b/samples/samples-js/AddProductWithDefaultPK/index.js @@ -0,0 +1,16 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +// This shows an example of a SQL Output binding where the target table has a default primary key +// of type uniqueidentifier and the column is not included in the output object. A new row will +// be inserted and the uniqueidentifier will be generated by the engine. +module.exports = async function (context, req) { + // Note that this expects the body to be a JSON object + // matching each of the columns in the table to upsert to. + context.bindings.products = req.body; + + return { + status: 201, + body: req.body + }; +} \ No newline at end of file diff --git a/src/SqlAsyncCollector.cs b/src/SqlAsyncCollector.cs index b806998c5..ca6e420b1 100644 --- a/src/SqlAsyncCollector.cs +++ b/src/SqlAsyncCollector.cs @@ -30,10 +30,13 @@ internal class PrimaryKey public readonly bool IsIdentity; - public PrimaryKey(string name, bool isIdentity) + public readonly bool HasDefault; + + public PrimaryKey(string name, bool isIdentity, bool hasDefault) { this.Name = name; this.IsIdentity = isIdentity; + this.HasDefault = hasDefault; } public override string ToString() @@ -49,6 +52,7 @@ internal class SqlAsyncCollector : IAsyncCollector, IDisposable private const string ColumnName = "COLUMN_NAME"; private const string ColumnDefinition = "COLUMN_DEFINITION"; + private const string HasDefault = "has_default"; private const string IsIdentity = "is_identity"; private const string CteName = "cte"; @@ -170,7 +174,7 @@ private async Task UpsertRowsAsync(IEnumerable rows, SqlAttribute attribute, if (tableInfo == null) { TelemetryInstance.TrackEvent(TelemetryEventName.TableInfoCacheMiss, props); - // set the columnNames for supporting T as JObject since it doesn't have columns in the memeber info. + // set the columnNames for supporting T as JObject since it doesn't have columns in the member info. tableInfo = await TableInformation.RetrieveTableInformationAsync(connection, fullTableName, this._logger, GetColumnNamesFromItem(rows.First())); var policy = new CacheItemPolicy { @@ -426,13 +430,20 @@ public static string GetPrimaryKeysQuery(SqlObject table) { return $@" SELECT - {ColumnName}, c.is_identity + ccu.{ColumnName}, + c.is_identity, + case + when isc.COLUMN_DEFAULT = NULL then 'false' + else 'true' + end as {HasDefault} FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc INNER JOIN INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE ccu ON ccu.CONSTRAINT_NAME = tc.CONSTRAINT_NAME AND ccu.TABLE_NAME = tc.TABLE_NAME INNER JOIN sys.columns c ON c.object_id = OBJECT_ID({table.QuotedFullName}) AND c.name = ccu.COLUMN_NAME + INNER JOIN + INFORMATION_SCHEMA.COLUMNS isc ON isc.TABLE_NAME = {table.QuotedName} AND isc.COLUMN_NAME = ccu.COLUMN_NAME WHERE tc.CONSTRAINT_TYPE = 'PRIMARY KEY' and @@ -464,15 +475,15 @@ INFORMATION_SCHEMA.COLUMNS c c.TABLE_SCHEMA = {table.QuotedSchema}"; } - public static string GetInsertQuery(SqlObject table) + public static string GetInsertQuery(SqlObject table, IEnumerable bracketedColumnNamesFromItem) { - return $"INSERT INTO {table.BracketQuotedFullName} SELECT * FROM {CteName}"; + return $"INSERT INTO {table.BracketQuotedFullName} ({string.Join(",", bracketedColumnNamesFromItem)}) SELECT * FROM {CteName}"; } /// /// Generates reusable SQL query that will be part of every upsert command. /// - public static string GetMergeQuery(IList primaryKeys, SqlObject table, StringComparison comparison, IEnumerable columnNames) + public static string GetMergeQuery(IList primaryKeys, SqlObject table, IEnumerable bracketedColumnNamesFromItem) { IList bracketedPrimaryKeys = primaryKeys.Select(p => p.Name.AsBracketQuotedString()).ToList(); // Generate the ON part of the merge query (compares new data against existing data) @@ -483,9 +494,6 @@ public static string GetMergeQuery(IList primaryKeys, SqlObject tabl } // Generate the UPDATE part of the merge query (all columns that should be updated) - IEnumerable bracketedColumnNamesFromItem = columnNames - .Where(prop => !primaryKeys.Any(k => k.IsIdentity && string.Equals(k.Name, prop, comparison))) // Skip any identity columns, those should never be updated - .Select(prop => prop.AsBracketQuotedString()); var columnMatchingQueryBuilder = new StringBuilder(); foreach (string column in bracketedColumnNamesFromItem) { @@ -604,7 +612,7 @@ public static async Task RetrieveTableInformationAsync(SqlConn while (await rdr.ReadAsync()) { string columnName = caseSensitive ? rdr[ColumnName].ToString() : rdr[ColumnName].ToString().ToLowerInvariant(); - primaryKeys.Add(new PrimaryKey(columnName, bool.Parse(rdr[IsIdentity].ToString()))); + primaryKeys.Add(new PrimaryKey(columnName, bool.Parse(rdr[IsIdentity].ToString()), bool.Parse(rdr[HasDefault].ToString()))); } primaryKeysSw.Stop(); TelemetryInstance.TrackDuration(TelemetryEventName.GetPrimaryKeys, primaryKeysSw.ElapsedMilliseconds, sqlConnProps); @@ -634,9 +642,10 @@ public static async Task RetrieveTableInformationAsync(SqlConn IEnumerable missingPrimaryKeysFromItem = primaryKeys .Where(k => !primaryKeysFromObject.Contains(k.Name, comparer)); bool hasIdentityColumnPrimaryKeys = primaryKeys.Any(k => k.IsIdentity); - // If none of the primary keys are an identity column then we require that all primary keys be present in the POCO so we can + bool hasDefaultColumnPrimaryKeys = primaryKeys.Any(k => k.HasDefault); + // If none of the primary keys are an identity column or have a default value then we require that all primary keys be present in the POCO so we can // generate the MERGE statement correctly - if (!hasIdentityColumnPrimaryKeys && missingPrimaryKeysFromItem.Any()) + if (!hasIdentityColumnPrimaryKeys && !hasDefaultColumnPrimaryKeys && missingPrimaryKeysFromItem.Any()) { string message = $"All primary keys for SQL table {table} need to be found in '{typeof(T)}.' Missing primary keys: [{string.Join(",", missingPrimaryKeysFromItem)}]"; var ex = new InvalidOperationException(message); @@ -644,10 +653,13 @@ public static async Task RetrieveTableInformationAsync(SqlConn throw ex; } - // If any identity columns aren't included in the object then we have to generate a basic insert since the merge statement expects all primary key + // If any identity columns or columns with default values aren't included in the object then we have to generate a basic insert since the merge statement expects all primary key // columns to exist. (the merge statement can handle nullable columns though if those exist) - bool usingInsertQuery = hasIdentityColumnPrimaryKeys && missingPrimaryKeysFromItem.Any(); - string query = usingInsertQuery ? GetInsertQuery(table) : GetMergeQuery(primaryKeys, table, comparison, columnNames); + bool usingInsertQuery = (hasIdentityColumnPrimaryKeys || hasDefaultColumnPrimaryKeys) && missingPrimaryKeysFromItem.Any(); + IEnumerable bracketedColumnNamesFromItem = columnNames + .Where(prop => !primaryKeys.Any(k => k.IsIdentity && string.Equals(k.Name, prop, comparison))) // Skip any identity columns, those should never be updated + .Select(prop => prop.AsBracketQuotedString()); + string query = usingInsertQuery ? GetInsertQuery(table, bracketedColumnNamesFromItem) : GetMergeQuery(primaryKeys, table, bracketedColumnNamesFromItem); tableInfoSw.Stop(); var durations = new Dictionary() diff --git a/test/Integration/SqlOutputBindingIntegrationTests.cs b/test/Integration/SqlOutputBindingIntegrationTests.cs index 5b2d17d63..9891cb526 100644 --- a/test/Integration/SqlOutputBindingIntegrationTests.cs +++ b/test/Integration/SqlOutputBindingIntegrationTests.cs @@ -377,5 +377,25 @@ public void AddProductCaseSensitiveTest(SupportedLanguages lang) Assert.Equal("test", this.ExecuteScalar($"select Name from Products where ProductId={1}")); Assert.Equal(100, this.ExecuteScalar($"select cost from Products where ProductId={1}")); } + + /// + /// Tests that a row is inserted successfully when the object is missing + /// the primary key column with a default value. + /// + [Theory] + [SqlInlineData()] + public void AddProductWithDefaultPKTest(SupportedLanguages lang) + { + this.StartFunctionHost(nameof(AddProductWithDefaultPK), lang); + var product = new Dictionary() + { + { "name", "MyProduct" }, + { "cost", "1" } + }; + Assert.Equal(0, this.ExecuteScalar("SELECT COUNT(*) FROM dbo.ProductsWithDefaultPK")); + this.SendOutputPostRequest("addproductwithdefaultpk", JsonConvert.SerializeObject(product)).Wait(); + this.SendOutputPostRequest("addproductwithdefaultpk", JsonConvert.SerializeObject(product)).Wait(); + Assert.Equal(2, this.ExecuteScalar("SELECT COUNT(*) FROM dbo.ProductsWithDefaultPK")); + } } } From 203837f7dab11a3a63ec9a09d3916b228f22be38 Mon Sep 17 00:00:00 2001 From: Lucy Zhang Date: Tue, 11 Oct 2022 11:41:26 -0700 Subject: [PATCH 09/17] Fix folder used for Integration tests (#386) * fix working folder for integration tests * fix datetime format --- test/Integration/IntegrationTestBase.cs | 2 +- test/Integration/test-js/AddProductColumnTypes/index.js | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/test/Integration/IntegrationTestBase.cs b/test/Integration/IntegrationTestBase.cs index 4fe1db996..a59f384ac 100644 --- a/test/Integration/IntegrationTestBase.cs +++ b/test/Integration/IntegrationTestBase.cs @@ -178,7 +178,7 @@ protected void StartAzurite() /// protected void StartFunctionHost(string functionName, SupportedLanguages language, bool useTestFolder = false, DataReceivedEventHandler customOutputHandler = null) { - string workingDirectory = useTestFolder ? GetPathToBin() : Path.Combine(GetPathToBin(), "SqlExtensionSamples", Enum.GetName(typeof(SupportedLanguages), language)); + string workingDirectory = language == SupportedLanguages.CSharp && useTestFolder ? GetPathToBin() : Path.Combine(GetPathToBin(), "SqlExtensionSamples", Enum.GetName(typeof(SupportedLanguages), language)); if (!Directory.Exists(workingDirectory)) { throw new FileNotFoundException("Working directory not found at " + workingDirectory); diff --git a/test/Integration/test-js/AddProductColumnTypes/index.js b/test/Integration/test-js/AddProductColumnTypes/index.js index 8d83ac300..465bb67ab 100644 --- a/test/Integration/test-js/AddProductColumnTypes/index.js +++ b/test/Integration/test-js/AddProductColumnTypes/index.js @@ -5,8 +5,8 @@ module.exports = async function (context, req) { const product = { "productId": req.query.productId, - "datetime": Date.now(), - "datetime2": Date.now() + "datetime": new Date().toISOString(), + "datetime2": new Date().toISOString() }; context.bindings.product = JSON.stringify(product); From 72751fe14666e836dd106ba628a7f1b8dbb10e9f Mon Sep 17 00:00:00 2001 From: Charles Gagnon Date: Wed, 12 Oct 2022 10:21:23 -0700 Subject: [PATCH 10/17] Update README and code comments (#391) * Update README and code comments * more * PR comments * Add cleanup docs * Fix spelling * pr comments --- README.md | 31 +++++++++++++++++++++ src/TriggerBinding/SqlTableChangeMonitor.cs | 11 +++++--- 2 files changed, 38 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index fb6166bac..d3b3dab10 100644 --- a/README.md +++ b/README.md @@ -58,6 +58,9 @@ Azure SQL bindings for Azure Functions are supported for: - [Primary Keys and Identity Columns](#primary-keys-and-identity-columns) - [Trigger Binding](#trigger-binding) - [Change Tracking](#change-tracking) + - [Internal State Tables](#internal-state-tables) + - [az_func.GlobalState](#az_funcglobalstate) + - [az_func.Leases_*](#az_funcleases_) - [Trigger Samples](#trigger-samples) - [Known Issues](#known-issues) - [Telemetry](#telemetry) @@ -869,6 +872,34 @@ The trigger binding utilizes SQL [change tracking](https://docs.microsoft.com/sq > **NOTE:** The leases table contains all columns corresponding to the primary key from the user table and three additional columns named `_az_func_ChangeVersion`, `_az_func_AttemptCount` and `_az_func_LeaseExpirationTime`. If any of the primary key columns happen to have the same name, that will result in an error message listing any conflicts. In this case, the listed primary key columns must be renamed for the trigger to work. +#### Internal State Tables + +The trigger functionality creates several tables to use for tracking the current state of the trigger. This allows state to be persisted across sessions and for multiple instances of a trigger binding to execute in parallel (for scaling purposes). + +In addition, a schema named `az_func` will be created that the tables will belong to. + +The login the trigger is configured to use must be given permissions to create these tables and schema. If not, then an error will be thrown and the trigger will fail to run. + +If the tables are deleted or modified, then unexpected behavior may occur. To reset the state of the triggers, first stop all currently running functions with trigger bindings and then either truncate or delete the tables. The next time a function with a trigger binding is started, it will recreate the tables as necessary. + +##### az_func.GlobalState + +This table stores information about each function being executed, what table that function is watching and what the [last sync state](https://learn.microsoft.com/sql/relational-databases/track-changes/work-with-change-tracking-sql-server) that has been processed. + +##### az_func.Leases_* + +A `Leases_*` table is created for every unique instance of a function and table. The full name will be in the format `Leases__` where `` is generated from the function ID and `` is the object ID of the table being tracked. Such as `Leases_7d12c06c6ddff24c_1845581613`. + +This table is used to ensure that all changes are processed and that no change is processed more than once. This table consists of two groups of columns: + + * A column for each column in the primary key of the target table - used to identify the row that it maps to in the target table + * A couple columns for tracking the state of each row. These are: + * `_az_func_ChangeVersion` for the change version of the row currently being processed + * `_az_func_AttemptCount` for tracking the number of times that a change has attempted to be processed to avoid getting stuck trying to process a change it's unable to handle + * `_az_func_LeaseExpirationTime` for tracking when the lease on this row for a particular instance is set to expire. This ensures that if an instance exits unexpectedly another instance will be able to pick up and process any changes it had leases for after the expiration time has passed. + +A row is created for every row in the target table that is modified. These are then cleaned up after the changes are processed for a set of changes corresponding to a change tracking sync version. + #### Trigger Samples The trigger binding takes two [arguments](https://github.com/Azure/azure-functions-sql-extension/blob/main/src/TriggerBinding/SqlTriggerAttribute.cs) diff --git a/src/TriggerBinding/SqlTableChangeMonitor.cs b/src/TriggerBinding/SqlTableChangeMonitor.cs index e52adc093..5ed030ae3 100644 --- a/src/TriggerBinding/SqlTableChangeMonitor.cs +++ b/src/TriggerBinding/SqlTableChangeMonitor.cs @@ -331,14 +331,17 @@ private async Task ProcessTableChangesAsync(SqlConnection connection, Cancellati try { - // What should we do if this fails? It doesn't make sense to retry since it's not a connection based - // thing. We could still try to trigger on the correctly processed changes, but that adds additional - // complication because we don't want to release the leases on the incorrectly processed changes. - // For now, just give up I guess? changes = this.ProcessChanges(); } catch (Exception e) { + // Either there's a bug or we're in a bad state so not much we can do here. We'll try clearing + // our state and retry getting the changes from the top again in case something broke while + // fetching the changes. + // It doesn't make sense to retry processing the changes immediately since this isn't a connection-based issue. + // We could probably send up the changes we were able to process and just skip the ones we couldn't, but given + // that this is not a case we expect would happen during normal execution we'll err on the side of caution for + // now and just retry getting the whole set of changes. this._logger.LogError($"Failed to compose trigger parameter value for table: '{this._userTable.FullName} due to exception: {e.GetType()}. Exception message: {e.Message}"); TelemetryInstance.TrackException(TelemetryErrorName.ProcessChanges, e, this._telemetryProps); await this.ClearRowsAsync(); From 04fc7c4f3a49dfbc12acc86f877eaf97f22d7a63 Mon Sep 17 00:00:00 2001 From: Lucy Zhang Date: Thu, 13 Oct 2022 13:19:23 -0700 Subject: [PATCH 11/17] Copy latest sql dll to extension bundle in pipeline (#395) * cp sql dll to extension bundle * remove quotes + add windows condition * fix path * add bin to path + add linux * use source folder * use preview path * add overwrite and ignore mkdir errors * fix target folder path * remove ignoreMakeDirErrors * add wait for case sensitivity test * use one task * set cache timeout to 0 * fix target path * pr comments + add timeout var linux * set cache to 0 for one test --- .../template-steps-build-test.yml | 12 +++++++++++ src/SqlAsyncCollector.cs | 21 +++++++++++++++++-- test/Integration/IntegrationTestBase.cs | 10 ++++++++- .../SqlOutputBindingIntegrationTests.cs | 11 +++++++--- 4 files changed, 48 insertions(+), 6 deletions(-) diff --git a/builds/azure-pipelines/template-steps-build-test.yml b/builds/azure-pipelines/template-steps-build-test.yml index 40b6b9165..c8dd777f0 100644 --- a/builds/azure-pipelines/template-steps-build-test.yml +++ b/builds/azure-pipelines/template-steps-build-test.yml @@ -32,6 +32,10 @@ steps: displayName: 'Set npm installation path for Windows' condition: and(succeeded(), eq(variables['Agent.OS'], 'Windows_NT')) +- bash: echo "##vso[task.setvariable variable=azureFunctionsExtensionBundlePath]$(func GetExtensionBundlePath)" + displayName: 'Set Azure Functions extension bundle path' + workingDirectory: $(Build.SourcesDirectory)/samples/samples-js + - task: DockerInstaller@0 displayName: Docker Installer inputs: @@ -67,6 +71,14 @@ steps: projects: '${{ parameters.solution }}' arguments: '--configuration ${{ parameters.configuration }} -p:GeneratePackageOnBuild=false -p:Version=${{ parameters.binariesVersion }}' +- task: CopyFiles@2 + displayName: 'Copy Sql extension dll to Azure Functions extension bundle' + inputs: + sourceFolder: $(Build.SourcesDirectory)/src/bin/${{ parameters.configuration }}/netstandard2.0 + contents: Microsoft.Azure.WebJobs.Extensions.Sql.dll + targetFolder: $(azureFunctionsExtensionBundlePath)/bin + overWrite: true + - script: | npm install npm run lint diff --git a/src/SqlAsyncCollector.cs b/src/SqlAsyncCollector.cs index ca6e420b1..07644df75 100644 --- a/src/SqlAsyncCollector.cs +++ b/src/SqlAsyncCollector.cs @@ -2,6 +2,7 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using System; +using System.Globalization; using System.Collections.Generic; using System.Data; using System.Linq; @@ -58,6 +59,8 @@ internal class SqlAsyncCollector : IAsyncCollector, IDisposable private const string Collation = "Collation"; + private const int AZ_FUNC_TABLE_INFO_CACHE_TIMEOUT_MINUTES = 10; + private readonly IConfiguration _configuration; private readonly SqlAttribute _attribute; private readonly ILogger _logger; @@ -171,6 +174,20 @@ private async Task UpsertRowsAsync(IEnumerable rows, SqlAttribute attribute, ObjectCache cachedTables = MemoryCache.Default; var tableInfo = cachedTables[cacheKey] as TableInformation; + int timeout = AZ_FUNC_TABLE_INFO_CACHE_TIMEOUT_MINUTES; + string timeoutEnvVar = Environment.GetEnvironmentVariable("AZ_FUNC_TABLE_INFO_CACHE_TIMEOUT_MINUTES"); + if (!string.IsNullOrEmpty(timeoutEnvVar)) + { + if (int.TryParse(timeoutEnvVar, NumberStyles.Integer, CultureInfo.InvariantCulture, out timeout)) + { + this._logger.LogDebugWithThreadId($"Overriding default table info cache timeout with new value {timeout}"); + } + else + { + timeout = AZ_FUNC_TABLE_INFO_CACHE_TIMEOUT_MINUTES; + } + } + if (tableInfo == null) { TelemetryInstance.TrackEvent(TelemetryEventName.TableInfoCacheMiss, props); @@ -178,8 +195,8 @@ private async Task UpsertRowsAsync(IEnumerable rows, SqlAttribute attribute, tableInfo = await TableInformation.RetrieveTableInformationAsync(connection, fullTableName, this._logger, GetColumnNamesFromItem(rows.First())); var policy = new CacheItemPolicy { - // Re-look up the primary key(s) after 10 minutes (they should not change very often!) - AbsoluteExpiration = DateTimeOffset.Now.AddMinutes(10) + // Re-look up the primary key(s) after timeout (default timeout is 10 minutes) + AbsoluteExpiration = DateTimeOffset.Now.AddMinutes(timeout) }; cachedTables.Set(cacheKey, tableInfo, policy); diff --git a/test/Integration/IntegrationTestBase.cs b/test/Integration/IntegrationTestBase.cs index a59f384ac..bfbe1f435 100644 --- a/test/Integration/IntegrationTestBase.cs +++ b/test/Integration/IntegrationTestBase.cs @@ -176,7 +176,7 @@ protected void StartAzurite() /// - The functionName is different than its route.
/// - You can start multiple functions by passing in a space-separated list of function names.
/// - protected void StartFunctionHost(string functionName, SupportedLanguages language, bool useTestFolder = false, DataReceivedEventHandler customOutputHandler = null) + protected void StartFunctionHost(string functionName, SupportedLanguages language, bool useTestFolder = false, DataReceivedEventHandler customOutputHandler = null, Dictionary environmentVariables = null) { string workingDirectory = language == SupportedLanguages.CSharp && useTestFolder ? GetPathToBin() : Path.Combine(GetPathToBin(), "SqlExtensionSamples", Enum.GetName(typeof(SupportedLanguages), language)); if (!Directory.Exists(workingDirectory)) @@ -196,6 +196,14 @@ protected void StartFunctionHost(string functionName, SupportedLanguages languag UseShellExecute = false }; + if (environmentVariables != null) + { + foreach (KeyValuePair variable in environmentVariables) + { + startInfo.EnvironmentVariables[variable.Key] = variable.Value; + } + } + // Always disable telemetry during test runs startInfo.EnvironmentVariables[TelemetryOptoutEnvVar] = "1"; diff --git a/test/Integration/SqlOutputBindingIntegrationTests.cs b/test/Integration/SqlOutputBindingIntegrationTests.cs index 9891cb526..179810dac 100644 --- a/test/Integration/SqlOutputBindingIntegrationTests.cs +++ b/test/Integration/SqlOutputBindingIntegrationTests.cs @@ -352,10 +352,15 @@ public void AddProductWithIdentity_MissingPrimaryColumn(SupportedLanguages lang) [SqlInlineData()] public void AddProductCaseSensitiveTest(SupportedLanguages lang) { - this.StartFunctionHost(nameof(AddProductParams), lang); + // Set table info cache timeout to 0 minutes so that new collation gets picked up + var environmentVariables = new Dictionary() + { + { "AZ_FUNC_TABLE_INFO_CACHE_TIMEOUT_MINUTES", "0" } + }; + this.StartFunctionHost(nameof(AddProductParams), lang, false, null, environmentVariables); // Change database collation to case sensitive - this.ExecuteNonQuery($"ALTER DATABASE {this.DatabaseName} COLLATE Latin1_General_CS_AS"); + this.ExecuteNonQuery($"ALTER DATABASE {this.DatabaseName} SET Single_User WITH ROLLBACK IMMEDIATE; ALTER DATABASE {this.DatabaseName} COLLATE Latin1_General_CS_AS; ALTER DATABASE {this.DatabaseName} SET Multi_User;"); var query = new Dictionary() { @@ -369,7 +374,7 @@ public void AddProductCaseSensitiveTest(SupportedLanguages lang) Assert.Throws(() => this.SendOutputGetRequest("addproduct-params", query).Wait()); // Change database collation back to case insensitive - this.ExecuteNonQuery($"ALTER DATABASE {this.DatabaseName} COLLATE Latin1_General_CI_AS"); + this.ExecuteNonQuery($"ALTER DATABASE {this.DatabaseName} SET Single_User WITH ROLLBACK IMMEDIATE; ALTER DATABASE {this.DatabaseName} COLLATE Latin1_General_CI_AS; ALTER DATABASE {this.DatabaseName} SET Multi_User;"); this.SendOutputGetRequest("addproduct-params", query).Wait(); From 66a1d06a2fede766c57842d4f297438d2d500a79 Mon Sep 17 00:00:00 2001 From: Charles Gagnon Date: Thu, 13 Oct 2022 13:43:24 -0700 Subject: [PATCH 12/17] Add additional logging when opening connections (#397) --- src/SqlAsyncCollector.cs | 4 +++- src/SqlConverters.cs | 2 ++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/SqlAsyncCollector.cs b/src/SqlAsyncCollector.cs index 07644df75..b232fc913 100644 --- a/src/SqlAsyncCollector.cs +++ b/src/SqlAsyncCollector.cs @@ -163,7 +163,9 @@ private async Task UpsertRowsAsync(IEnumerable rows, SqlAttribute attribute, var upsertRowsAsyncSw = Stopwatch.StartNew(); using (SqlConnection connection = SqlBindingUtilities.BuildConnection(attribute.ConnectionStringSetting, configuration)) { + this._logger.LogDebugWithThreadId("BEGIN OpenUpsertRowsAsyncConnection"); await connection.OpenAsync(); + this._logger.LogDebugWithThreadId("END OpenUpsertRowsAsyncConnection"); Dictionary props = connection.AsConnectionProps(); string fullTableName = attribute.CommandText; @@ -420,7 +422,7 @@ public TableInformation(IEnumerable primaryKeys, IDictionary BuildItemFromAttributeAsync(SqlAttribute attri using (SqlCommand command = SqlBindingUtilities.BuildCommand(attribute, connection)) { adapter.SelectCommand = command; + this._logger.LogDebugWithThreadId("BEGIN OpenBuildItemFromAttributeAsyncConnection"); await connection.OpenAsync(); + this._logger.LogDebugWithThreadId("END OpenBuildItemFromAttributeAsyncConnection"); Dictionary props = connection.AsConnectionProps(); TelemetryInstance.TrackConvert(type, props); var dataTable = new DataTable(); From 3a4ddd387d0b21f48282c79db3e9e646b94d7b23 Mon Sep 17 00:00:00 2001 From: Charles Gagnon Date: Fri, 14 Oct 2022 11:02:53 -0700 Subject: [PATCH 13/17] Fix polling size override using wrong configuration value --- src/TriggerBinding/SqlTableChangeMonitor.cs | 2 +- test/Common/TestUtils.cs | 31 +++++++++++++ .../SqlTriggerBindingIntegrationTests.cs | 43 +++++++++++++++---- 3 files changed, 66 insertions(+), 10 deletions(-) diff --git a/src/TriggerBinding/SqlTableChangeMonitor.cs b/src/TriggerBinding/SqlTableChangeMonitor.cs index 5ed030ae3..bdfccca79 100644 --- a/src/TriggerBinding/SqlTableChangeMonitor.cs +++ b/src/TriggerBinding/SqlTableChangeMonitor.cs @@ -124,7 +124,7 @@ public SqlTableChangeMonitor( // Check if there's config settings to override the default batch size/polling interval values int? configuredBatchSize = configuration.GetValue(SqlTriggerConstants.ConfigKey_SqlTrigger_BatchSize); - int? configuredPollingInterval = configuration.GetValue(SqlTriggerConstants.ConfigKey_SqlTrigger_BatchSize); + int? configuredPollingInterval = configuration.GetValue(SqlTriggerConstants.ConfigKey_SqlTrigger_PollingInterval); this._batchSize = configuredBatchSize ?? this._batchSize; this._pollingIntervalInMs = configuredPollingInterval ?? this._pollingIntervalInMs; var monitorStartProps = new Dictionary(telemetryProps) diff --git a/test/Common/TestUtils.cs b/test/Common/TestUtils.cs index ab45410a3..7cef4e51d 100644 --- a/test/Common/TestUtils.cs +++ b/test/Common/TestUtils.cs @@ -5,6 +5,7 @@ using System.Collections.Generic; using System.Data; using System.Diagnostics; +using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; @@ -177,5 +178,35 @@ public static async Task TimeoutAfter(this Task task, throw new TimeoutException(message); } } + + /// + /// Creates a DataReceievedEventHandler that will wait for the specified regex and then check that + /// the matched group matches the expected value. + /// + /// The task completion source to signal when the value is receieved + /// The regex. This must have a single group match for the specific value being looked for + /// The name of the value to output if the match fails + /// The value expected to be equal to the matched group from the regex + /// + public static DataReceivedEventHandler CreateOutputReceievedHandler(TaskCompletionSource taskCompletionSource, string regex, string valueName, string expectedValue) + { + return (object sender, DataReceivedEventArgs e) => + { + Match match = Regex.Match(e.Data, regex); + if (match.Success) + { + // We found the line so now check that the group matches our expected value + string actualValue = match.Groups[1].Value; + if (actualValue == expectedValue) + { + taskCompletionSource.SetResult(true); + } + else + { + taskCompletionSource.SetException(new Exception($"Expected {valueName} value of {expectedValue} but got value {actualValue}")); + } + } + }; + } } } diff --git a/test/Integration/SqlTriggerBindingIntegrationTests.cs b/test/Integration/SqlTriggerBindingIntegrationTests.cs index baca890af..a8c3ab4a7 100644 --- a/test/Integration/SqlTriggerBindingIntegrationTests.cs +++ b/test/Integration/SqlTriggerBindingIntegrationTests.cs @@ -79,10 +79,22 @@ public async Task BatchSizeOverrideTriggerTest() const int firstId = 1; const int lastId = 40; this.EnableChangeTrackingForTable("Products"); - this.StartFunctionHost(nameof(ProductsTriggerWithValidation), SupportedLanguages.CSharp, true, environmentVariables: new Dictionary() { - { "TEST_EXPECTED_BATCH_SIZE", batchSize.ToString() }, - { "Sql_Trigger_BatchSize", batchSize.ToString() } - }); + var taskCompletionSource = new TaskCompletionSource(); + DataReceivedEventHandler handler = TestUtils.CreateOutputReceievedHandler( + taskCompletionSource, + @"Starting change consumption loop. BatchSize: \d* PollingIntervalMs: (\d*)", + "PollingInterval", + batchSize.ToString()); + this.StartFunctionHost( + nameof(ProductsTriggerWithValidation), + SupportedLanguages.CSharp, + useTestFolder: true, + customOutputHandler: handler, + environmentVariables: new Dictionary() { + { "TEST_EXPECTED_BATCH_SIZE", batchSize.ToString() }, + { "Sql_Trigger_BatchSize", batchSize.ToString() } + } + ); await this.WaitForProductChanges( firstId, @@ -92,6 +104,7 @@ await this.WaitForProductChanges( id => $"Product {id}", id => id * 100, this.GetBatchProcessingTimeout(firstId, lastId, batchSize: batchSize)); + await taskCompletionSource.Task.TimeoutAfter(TimeSpan.FromSeconds(5000)); } /// @@ -100,13 +113,25 @@ await this.WaitForProductChanges( [Fact] public async Task PollingIntervalOverrideTriggerTest() { - const int pollingIntervalMs = 100; const int firstId = 1; const int lastId = 50; + const int pollingIntervalMs = 75; this.EnableChangeTrackingForTable("Products"); - this.StartFunctionHost(nameof(ProductsTriggerWithValidation), SupportedLanguages.CSharp, true, environmentVariables: new Dictionary() { - { "Sql_Trigger_PollingIntervalMs", pollingIntervalMs.ToString() } - }); + var taskCompletionSource = new TaskCompletionSource(); + DataReceivedEventHandler handler = TestUtils.CreateOutputReceievedHandler( + taskCompletionSource, + @"Starting change consumption loop. BatchSize: \d* PollingIntervalMs: (\d*)", + "PollingInterval", + pollingIntervalMs.ToString()); + this.StartFunctionHost( + nameof(ProductsTriggerWithValidation), + SupportedLanguages.CSharp, + useTestFolder: true, + customOutputHandler: handler, + environmentVariables: new Dictionary() { + { "Sql_Trigger_PollingIntervalMs", pollingIntervalMs.ToString() } + } + ); await this.WaitForProductChanges( firstId, @@ -116,9 +141,9 @@ await this.WaitForProductChanges( id => $"Product {id}", id => id * 100, this.GetBatchProcessingTimeout(firstId, lastId, pollingIntervalMs: pollingIntervalMs)); + await taskCompletionSource.Task.TimeoutAfter(TimeSpan.FromSeconds(5000)); } - /// /// Verifies that if several changes have happened to the table row since last invocation, then a single net /// change for that row is passed to the user function. From 2e0f3454a282d6d67b0f3aed335b366a931434cd Mon Sep 17 00:00:00 2001 From: Charles Gagnon Date: Fri, 14 Oct 2022 11:22:07 -0700 Subject: [PATCH 14/17] Fixes --- .../SqlTriggerBindingIntegrationTests.cs | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/test/Integration/SqlTriggerBindingIntegrationTests.cs b/test/Integration/SqlTriggerBindingIntegrationTests.cs index a8c3ab4a7..0579f1527 100644 --- a/test/Integration/SqlTriggerBindingIntegrationTests.cs +++ b/test/Integration/SqlTriggerBindingIntegrationTests.cs @@ -75,15 +75,19 @@ await this.WaitForProductChanges( [Fact] public async Task BatchSizeOverrideTriggerTest() { - const int batchSize = 20; + // Use enough items for the default batch size to require 4 batches but then + // set the batch size to the same value so they can all be processed in one + // batch. The test will only wait for ~1 batch worth of time so will timeout + // if the batch size isn't actually changed + const int batchSize = SqlTableChangeMonitor.DefaultBatchSize * 4; const int firstId = 1; - const int lastId = 40; + const int lastId = batchSize; this.EnableChangeTrackingForTable("Products"); var taskCompletionSource = new TaskCompletionSource(); DataReceivedEventHandler handler = TestUtils.CreateOutputReceievedHandler( taskCompletionSource, - @"Starting change consumption loop. BatchSize: \d* PollingIntervalMs: (\d*)", - "PollingInterval", + @"Starting change consumption loop. BatchSize: (\d*) PollingIntervalMs: \d*", + "BatchSize", batchSize.ToString()); this.StartFunctionHost( nameof(ProductsTriggerWithValidation), @@ -114,7 +118,10 @@ await this.WaitForProductChanges( public async Task PollingIntervalOverrideTriggerTest() { const int firstId = 1; - const int lastId = 50; + // Use enough items to require 5 batches to be processed - the test will + // only wait for the expected time and timeout if the default polling + // interval isn't actually modified. + const int lastId = SqlTableChangeMonitor.DefaultBatchSize * 5; const int pollingIntervalMs = 75; this.EnableChangeTrackingForTable("Products"); var taskCompletionSource = new TaskCompletionSource(); From 360587c291c3909db6e3e0577c898b06931c4174 Mon Sep 17 00:00:00 2001 From: Charles Gagnon Date: Fri, 14 Oct 2022 12:07:48 -0700 Subject: [PATCH 15/17] Update comments --- test/Common/TestUtils.cs | 2 +- test/Integration/SqlTriggerBindingIntegrationTests.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test/Common/TestUtils.cs b/test/Common/TestUtils.cs index 7cef4e51d..ae0d0f657 100644 --- a/test/Common/TestUtils.cs +++ b/test/Common/TestUtils.cs @@ -187,7 +187,7 @@ public static async Task TimeoutAfter(this Task task, /// The regex. This must have a single group match for the specific value being looked for /// The name of the value to output if the match fails /// The value expected to be equal to the matched group from the regex - /// + /// The event handler public static DataReceivedEventHandler CreateOutputReceievedHandler(TaskCompletionSource taskCompletionSource, string regex, string valueName, string expectedValue) { return (object sender, DataReceivedEventArgs e) => diff --git a/test/Integration/SqlTriggerBindingIntegrationTests.cs b/test/Integration/SqlTriggerBindingIntegrationTests.cs index 0579f1527..7fbb9a81c 100644 --- a/test/Integration/SqlTriggerBindingIntegrationTests.cs +++ b/test/Integration/SqlTriggerBindingIntegrationTests.cs @@ -75,7 +75,7 @@ await this.WaitForProductChanges( [Fact] public async Task BatchSizeOverrideTriggerTest() { - // Use enough items for the default batch size to require 4 batches but then + // Use enough items to require 4 batches to be processed but then // set the batch size to the same value so they can all be processed in one // batch. The test will only wait for ~1 batch worth of time so will timeout // if the batch size isn't actually changed From 42d301e8edde7488c39e7d1a6a33283e6a73006d Mon Sep 17 00:00:00 2001 From: Charles Gagnon Date: Fri, 14 Oct 2022 12:12:32 -0700 Subject: [PATCH 16/17] typo --- test/Common/TestUtils.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/Common/TestUtils.cs b/test/Common/TestUtils.cs index ae0d0f657..8ba874b9f 100644 --- a/test/Common/TestUtils.cs +++ b/test/Common/TestUtils.cs @@ -183,7 +183,7 @@ public static async Task TimeoutAfter(this Task task, /// Creates a DataReceievedEventHandler that will wait for the specified regex and then check that /// the matched group matches the expected value. /// - /// The task completion source to signal when the value is receieved + /// The task completion source to signal when the value is received /// The regex. This must have a single group match for the specific value being looked for /// The name of the value to output if the match fails /// The value expected to be equal to the matched group from the regex From 450180e43c9b1514be3786425898963ab89f0947 Mon Sep 17 00:00:00 2001 From: Charles Gagnon Date: Fri, 14 Oct 2022 14:26:53 -0700 Subject: [PATCH 17/17] more changes --- test/Integration/SqlTriggerBindingIntegrationTests.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/Integration/SqlTriggerBindingIntegrationTests.cs b/test/Integration/SqlTriggerBindingIntegrationTests.cs index 7fbb9a81c..a1f6d4c80 100644 --- a/test/Integration/SqlTriggerBindingIntegrationTests.cs +++ b/test/Integration/SqlTriggerBindingIntegrationTests.cs @@ -108,7 +108,7 @@ await this.WaitForProductChanges( id => $"Product {id}", id => id * 100, this.GetBatchProcessingTimeout(firstId, lastId, batchSize: batchSize)); - await taskCompletionSource.Task.TimeoutAfter(TimeSpan.FromSeconds(5000)); + await taskCompletionSource.Task.TimeoutAfter(TimeSpan.FromSeconds(5000), "Timed out waiting for BatchSize configuration message"); } /// @@ -122,7 +122,7 @@ public async Task PollingIntervalOverrideTriggerTest() // only wait for the expected time and timeout if the default polling // interval isn't actually modified. const int lastId = SqlTableChangeMonitor.DefaultBatchSize * 5; - const int pollingIntervalMs = 75; + const int pollingIntervalMs = SqlTableChangeMonitor.DefaultPollingIntervalMs / 2; this.EnableChangeTrackingForTable("Products"); var taskCompletionSource = new TaskCompletionSource(); DataReceivedEventHandler handler = TestUtils.CreateOutputReceievedHandler( @@ -148,7 +148,7 @@ await this.WaitForProductChanges( id => $"Product {id}", id => id * 100, this.GetBatchProcessingTimeout(firstId, lastId, pollingIntervalMs: pollingIntervalMs)); - await taskCompletionSource.Task.TimeoutAfter(TimeSpan.FromSeconds(5000)); + await taskCompletionSource.Task.TimeoutAfter(TimeSpan.FromSeconds(5000), "Timed out waiting for PollingInterval configuration message"); } ///