Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ Azure SQL bindings for Azure Functions are supported for:
- [Python functions](#python-functions)
- [Input Binding Tutorial](#input-binding-tutorial-2)
- [Output Binding Tutorial](#output-binding-tutorial-2)
- [Configuration](#configuration)
- [Trigger Binding Configuration](#trigger-binding-configuration)
- [Sql_Trigger_BatchSize](#sql_trigger_batchsize)
- [Sql_Trigger_PollingIntervalMs](#sql_trigger_pollingintervalms)
- [More Samples](#more-samples)
- [Input Binding](#input-binding)
- [Query String](#query-string)
Expand Down Expand Up @@ -554,6 +558,20 @@ Note: This tutorial requires that a SQL database is setup as shown in [Create a
- Hit 'F5' to run your code. Click the link to upsert the output array values in your SQL table. Your upserted values should launch in the browser.
- Congratulations! You have successfully created your first SQL output binding! Checkout [Output Binding](#Output-Binding) for more information on how to use it and explore on your own!

## Configuration

This section goes over some of the configuration values you can use to customize the SQL bindings. See [How to Use Azure Function App Settings](https://learn.microsoft.com/azure/azure-functions/functions-how-to-use-azure-function-app-settings) to learn more.

### Trigger Binding Configuration

#### Sql_Trigger_BatchSize

This controls the number of changes processed at once before being sent to the triggered function.

#### Sql_Trigger_PollingIntervalMs

This controls the delay in milliseconds between processing each batch of changes.

## More Samples

### Input Binding
Expand Down
55 changes: 38 additions & 17 deletions src/TriggerBinding/SqlTableChangeMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Extensions.Configuration;

namespace Microsoft.Azure.WebJobs.Extensions.Sql
{
Expand All @@ -25,17 +26,25 @@ namespace Microsoft.Azure.WebJobs.Extensions.Sql
/// <typeparam name="T">POCO class representing the row in the user table</typeparam>
internal sealed class SqlTableChangeMonitor<T> : IDisposable
{
public const int BatchSize = 10;
public const int PollingIntervalInSeconds = 5;
public const int MaxAttemptCount = 5;

// Leases are held for approximately (LeaseRenewalIntervalInSeconds * MaxLeaseRenewalCount) seconds. It is
#region Constants
/// <summary>
/// The maximum number of times we'll attempt to process a change before giving up
/// </summary>
private const int MaxChangeProcessAttemptCount = 5;
/// <summary>
/// The maximum number of times that we'll attempt to renew a lease be
/// </summary>
/// <remarks>
/// Leases are held for approximately (LeaseRenewalIntervalInSeconds * MaxLeaseRenewalCount) seconds. It is
// required to have at least one of (LeaseIntervalInSeconds / LeaseRenewalIntervalInSeconds) attempts to
// renew the lease succeed to prevent it from expiring.
public const int MaxLeaseRenewalCount = 10;
public const int LeaseIntervalInSeconds = 60;
public const int LeaseRenewalIntervalInSeconds = 15;
public const int MaxRetryReleaseLeases = 3;
// </remarks>
private const int MaxLeaseRenewalCount = 10;
private const int LeaseIntervalInSeconds = 60;
private const int LeaseRenewalIntervalInSeconds = 15;
private const int MaxRetryReleaseLeases = 3;
#endregion Constants

private readonly string _connectionString;
private readonly int _userTableId;
private readonly SqlObject _userTable;
Expand All @@ -46,6 +55,14 @@ internal sealed class SqlTableChangeMonitor<T> : IDisposable
private readonly IReadOnlyList<string> _rowMatchConditions;
private readonly ITriggeredFunctionExecutor _executor;
private readonly ILogger _logger;
/// <summary>
/// Number of changes to process in each iteration of the loop
/// </summary>
private readonly int _batchSize = 10;
/// <summary>
/// Delay in ms between processing each batch of changes
/// </summary>
private readonly int _pollingIntervalInMs = 5000;

private readonly CancellationTokenSource _cancellationTokenSourceCheckForChanges = new CancellationTokenSource();
private readonly CancellationTokenSource _cancellationTokenSourceRenewLeases = new CancellationTokenSource();
Expand Down Expand Up @@ -87,6 +104,7 @@ public SqlTableChangeMonitor(
IReadOnlyList<string> primaryKeyColumns,
ITriggeredFunctionExecutor executor,
ILogger logger,
IConfiguration configuration,
IDictionary<TelemetryPropertyName, string> telemetryProps)
{
this._connectionString = !string.IsNullOrEmpty(connectionString) ? connectionString : throw new ArgumentNullException(nameof(connectionString));
Expand All @@ -99,9 +117,11 @@ public SqlTableChangeMonitor(
this._logger = logger ?? throw new ArgumentNullException(nameof(logger));

this._userTableId = userTableId;

// Check if there's config settings to override the default batch size/polling interval values
this._batchSize = configuration.GetValue<int?>(SqlTriggerConstants.ConfigKey_SqlTrigger_BatchSize) ?? this._batchSize;
this._pollingIntervalInMs = configuration.GetValue<int?>(SqlTriggerConstants.ConfigKey_SqlTrigger_PollingInterval) ?? this._pollingIntervalInMs;
// Prep search-conditions that will be used besides WHERE clause to match table rows.
this._rowMatchConditions = Enumerable.Range(0, BatchSize)
this._rowMatchConditions = Enumerable.Range(0, this._batchSize)
.Select(rowIndex => string.Join(" AND ", this._primaryKeyColumns.Select((col, colIndex) => $"{col.AsBracketQuotedString()} = @{rowIndex}_{colIndex}")))
.ToList();

Expand All @@ -122,7 +142,7 @@ public void Dispose()
}

/// <summary>
/// Executed once every <see cref="PollingIntervalInSeconds"/> period. If the state of the change monitor is
/// Executed once every <see cref="_pollingIntervalInMs"/> period. If the state of the change monitor is
/// <see cref="State.CheckingForChanges"/>, then the method query the change/leases tables for changes on the
/// user's table. If any are found, the state of the change monitor is transitioned to
/// <see cref="State.ProcessingChanges"/> and the user's function is executed with the found changes. If the
Expand All @@ -131,7 +151,7 @@ public void Dispose()
/// </summary>
private async Task RunChangeConsumptionLoopAsync()
{
this._logger.LogDebugWithThreadId("Starting change consumption loop.");
this._logger.LogInformationWithThreadId($"Starting change consumption loop. BatchSize: {this._batchSize} PollingIntervalMs: {this._pollingIntervalInMs}");
Copy link
Contributor

@JatinSanghvi JatinSanghvi Sep 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think such information should be logged at debug level as user won't be interested in general in internal workings of the extension. We can log the evaluated batch size and polling interval as separate message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still a little unsure about exactly what should be info and what should be debug, but does this sound reasonable?

info - information useful to the user to see during normal execution
debug - detailed information about the inner workings

The reason I changed this to info was because I was thinking that a user would find it useful to know the batch size and polling interval - especially for the default values which they may not know offhand.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to leave this as info for now for the reasons above, and we can come back and do a look over logging in the entire extension pre-GA to make sure everything lines up how we want (I'll make an issue to track)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I already mentioned that we should log batch size and polling interval but not the rest of the message.

Users normally use the logs to check the history of their own function invocations. The extension should in general log all the information that might be useful for the user during host/listener startup only. Logging "Starting change consumption loop" even if for one-time will only confuse users and will prompt them to check what the message means for them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh sorry, I misread your original comment. What would you suggest the info message with the batch size/polling interval would be then?


try
{
Expand All @@ -153,7 +173,8 @@ private async Task RunChangeConsumptionLoopAsync()
await this.ProcessTableChangesAsync(connection, token);
}
this._logger.LogDebugWithThreadId("END CheckingForChanges");
await Task.Delay(TimeSpan.FromSeconds(PollingIntervalInSeconds), token);
this._logger.LogDebugWithThreadId($"Delaying for {this._pollingIntervalInMs}ms");
await Task.Delay(TimeSpan.FromMilliseconds(this._pollingIntervalInMs), token);
}
}
}
Expand Down Expand Up @@ -666,7 +687,7 @@ private SqlCommand BuildGetChangesCommand(SqlConnection connection, SqlTransacti
FROM {SqlTriggerConstants.GlobalStateTableName}
WHERE UserFunctionID = '{this._userFunctionId}' AND UserTableID = {this._userTableId};

SELECT TOP {BatchSize}
SELECT TOP {this._batchSize}
{selectList},
c.SYS_CHANGE_VERSION, c.SYS_CHANGE_OPERATION,
l.{SqlTriggerConstants.LeasesTableChangeVersionColumnName}, l.{SqlTriggerConstants.LeasesTableAttemptCountColumnName}, l.{SqlTriggerConstants.LeasesTableLeaseExpirationTimeColumnName}
Expand All @@ -677,7 +698,7 @@ LEFT OUTER JOIN {this._userTable.BracketQuotedFullName} AS u ON {userTableJoinCo
(l.{SqlTriggerConstants.LeasesTableLeaseExpirationTimeColumnName} IS NULL AND
(l.{SqlTriggerConstants.LeasesTableChangeVersionColumnName} IS NULL OR l.{SqlTriggerConstants.LeasesTableChangeVersionColumnName} < c.SYS_CHANGE_VERSION) OR
l.{SqlTriggerConstants.LeasesTableLeaseExpirationTimeColumnName} < SYSDATETIME()) AND
(l.{SqlTriggerConstants.LeasesTableAttemptCountColumnName} IS NULL OR l.{SqlTriggerConstants.LeasesTableAttemptCountColumnName} < {MaxAttemptCount})
(l.{SqlTriggerConstants.LeasesTableAttemptCountColumnName} IS NULL OR l.{SqlTriggerConstants.LeasesTableAttemptCountColumnName} < {MaxChangeProcessAttemptCount})
ORDER BY c.SYS_CHANGE_VERSION ASC;
";

Expand Down Expand Up @@ -798,7 +819,7 @@ FROM CHANGETABLE(CHANGES {this._userTable.BracketQuotedFullName}, @current_last_
((l.{SqlTriggerConstants.LeasesTableChangeVersionColumnName} IS NULL OR
l.{SqlTriggerConstants.LeasesTableChangeVersionColumnName} != c.SYS_CHANGE_VERSION OR
l.{SqlTriggerConstants.LeasesTableLeaseExpirationTimeColumnName} IS NOT NULL) AND
(l.{SqlTriggerConstants.LeasesTableAttemptCountColumnName} IS NULL OR l.{SqlTriggerConstants.LeasesTableAttemptCountColumnName} < {MaxAttemptCount}))) AS Changes
(l.{SqlTriggerConstants.LeasesTableAttemptCountColumnName} IS NULL OR l.{SqlTriggerConstants.LeasesTableAttemptCountColumnName} < {MaxChangeProcessAttemptCount}))) AS Changes

IF @unprocessed_changes = 0 AND @current_last_sync_version < {newLastSyncVersion}
BEGIN
Expand Down
8 changes: 6 additions & 2 deletions src/TriggerBinding/SqlTriggerBinding.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using Microsoft.Azure.WebJobs.Host.Protocols;
using Microsoft.Azure.WebJobs.Host.Triggers;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Configuration;

namespace Microsoft.Azure.WebJobs.Extensions.Sql
{
Expand All @@ -29,6 +30,7 @@ internal sealed class SqlTriggerBinding<T> : ITriggerBinding
private readonly ParameterInfo _parameter;
private readonly IHostIdProvider _hostIdProvider;
private readonly ILogger _logger;
private readonly IConfiguration _configuration;

private static readonly IReadOnlyDictionary<string, Type> _emptyBindingContract = new Dictionary<string, Type>();
private static readonly IReadOnlyDictionary<string, object> _emptyBindingData = new Dictionary<string, object>();
Expand All @@ -41,13 +43,15 @@ internal sealed class SqlTriggerBinding<T> : ITriggerBinding
/// <param name="parameter">Trigger binding parameter information</param>
/// <param name="hostIdProvider">Provider of unique host identifier</param>
/// <param name="logger">Facilitates logging of messages</param>
public SqlTriggerBinding(string connectionString, string tableName, ParameterInfo parameter, IHostIdProvider hostIdProvider, ILogger logger)
/// <param name="configuration">Provides configuration values</param>
public SqlTriggerBinding(string connectionString, string tableName, ParameterInfo parameter, IHostIdProvider hostIdProvider, ILogger logger, IConfiguration configuration)
{
this._connectionString = connectionString ?? throw new ArgumentNullException(nameof(connectionString));
this._tableName = tableName ?? throw new ArgumentNullException(nameof(tableName));
this._parameter = parameter ?? throw new ArgumentNullException(nameof(parameter));
this._hostIdProvider = hostIdProvider ?? throw new ArgumentNullException(nameof(hostIdProvider));
this._logger = logger ?? throw new ArgumentNullException(nameof(logger));
this._configuration = configuration ?? throw new ArgumentNullException(nameof(configuration));
}

/// <summary>
Expand All @@ -68,7 +72,7 @@ public async Task<IListener> CreateListenerAsync(ListenerFactoryContext context)
_ = context ?? throw new ArgumentNullException(nameof(context), "Missing listener context");

string userFunctionId = await this.GetUserFunctionIdAsync();
return new SqlTriggerListener<T>(this._connectionString, this._tableName, userFunctionId, context.Executor, this._logger);
return new SqlTriggerListener<T>(this._connectionString, this._tableName, userFunctionId, context.Executor, this._logger, this._configuration);
}

public ParameterDescriptor ToParameterDescriptor()
Expand Down
4 changes: 2 additions & 2 deletions src/TriggerBinding/SqlTriggerBindingProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ public Task<ITriggerBinding> TryCreateAsync(TriggerBindingProviderContext contex
Type userType = parameter.ParameterType.GetGenericArguments()[0].GetGenericArguments()[0];
Type bindingType = typeof(SqlTriggerBinding<>).MakeGenericType(userType);

var constructorParameterTypes = new Type[] { typeof(string), typeof(string), typeof(ParameterInfo), typeof(IHostIdProvider), typeof(ILogger) };
var constructorParameterTypes = new Type[] { typeof(string), typeof(string), typeof(ParameterInfo), typeof(IHostIdProvider), typeof(ILogger), typeof(IConfiguration) };
ConstructorInfo bindingConstructor = bindingType.GetConstructor(constructorParameterTypes);

object[] constructorParameterValues = new object[] { connectionString, attribute.TableName, parameter, this._hostIdProvider, this._logger };
object[] constructorParameterValues = new object[] { connectionString, attribute.TableName, parameter, this._hostIdProvider, this._logger, this._configuration };
var triggerBinding = (ITriggerBinding)bindingConstructor.Invoke(constructorParameterValues);

return Task.FromResult(triggerBinding);
Expand Down
3 changes: 3 additions & 0 deletions src/TriggerBinding/SqlTriggerConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,8 @@ internal static class SqlTriggerConstants
public const string LeasesTableChangeVersionColumnName = "_az_func_ChangeVersion";
public const string LeasesTableAttemptCountColumnName = "_az_func_AttemptCount";
public const string LeasesTableLeaseExpirationTimeColumnName = "_az_func_LeaseExpirationTime";

public const string ConfigKey_SqlTrigger_BatchSize = "Sql_Trigger_BatchSize";
public const string ConfigKey_SqlTrigger_PollingInterval = "Sql_Trigger_PollingIntervalMs";
}
}
9 changes: 7 additions & 2 deletions src/TriggerBinding/SqlTriggerListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Configuration;

namespace Microsoft.Azure.WebJobs.Extensions.Sql
{
Expand All @@ -34,6 +35,7 @@ internal sealed class SqlTriggerListener<T> : IListener
private readonly string _userFunctionId;
private readonly ITriggeredFunctionExecutor _executor;
private readonly ILogger _logger;
private readonly IConfiguration _configuration;

private readonly IDictionary<TelemetryPropertyName, string> _telemetryProps = new Dictionary<TelemetryPropertyName, string>();

Expand All @@ -48,13 +50,15 @@ internal sealed class SqlTriggerListener<T> : IListener
/// <param name="userFunctionId">Unique identifier for the user function</param>
/// <param name="executor">Defines contract for triggering user function</param>
/// <param name="logger">Facilitates logging of messages</param>
public SqlTriggerListener(string connectionString, string tableName, string userFunctionId, ITriggeredFunctionExecutor executor, ILogger logger)
/// <param name="configuration">Provides configuration values</param>
public SqlTriggerListener(string connectionString, string tableName, string userFunctionId, ITriggeredFunctionExecutor executor, ILogger logger, IConfiguration configuration)
{
this._connectionString = !string.IsNullOrEmpty(connectionString) ? connectionString : throw new ArgumentNullException(nameof(connectionString));
this._userTable = !string.IsNullOrEmpty(tableName) ? new SqlObject(tableName) : throw new ArgumentNullException(nameof(tableName));
this._userFunctionId = !string.IsNullOrEmpty(userFunctionId) ? userFunctionId : throw new ArgumentNullException(nameof(userFunctionId));
this._executor = executor ?? throw new ArgumentNullException(nameof(executor));
this._logger = logger ?? throw new ArgumentNullException(nameof(logger));
this._configuration = configuration ?? throw new ArgumentNullException(nameof(configuration));
}

public void Cancel()
Expand Down Expand Up @@ -122,6 +126,7 @@ public async Task StartAsync(CancellationToken cancellationToken)
primaryKeyColumns.Select(col => col.name).ToList(),
this._executor,
this._logger,
this._configuration,
this._telemetryProps);

this._listenerState = ListenerStarted;
Expand Down Expand Up @@ -467,7 +472,7 @@ PRIMARY KEY ({primaryKeys})
}

/// <summary>
/// Clears the current telemetry property dictionary and initializes the default initial properties.
/// Clears the current telemetry property dictionary and initializes the default initial properties.
/// </summary>
private void InitializeTelemetryProps()
{
Expand Down
5 changes: 5 additions & 0 deletions src/Utils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,5 +99,10 @@ public static void LogDebugWithThreadId(this ILogger logger, string message, par
{
logger.LogDebug($"TID:{Environment.CurrentManagedThreadId} {message}", args);
}

public static void LogInformationWithThreadId(this ILogger logger, string message, params object[] args)
{
logger.LogInformation($"TID:{Environment.CurrentManagedThreadId} {message}", args);
}
}
}
3 changes: 2 additions & 1 deletion test/.editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@
# Disabled
dotnet_diagnostic.CA1309.severity = silent # Use ordinal StringComparison - this isn't important for tests and just adds clutter
dotnet_diagnostic.CA1305.severity = silent # Specify IFormatProvider - this isn't important for tests and just adds clutter
dotnet_diagnostic.CA1707.severity = silent # Identifiers should not contain underscores - this helps make test names more readable
dotnet_diagnostic.CA1707.severity = silent # Identifiers should not contain underscores - this helps make test names more readable
dotnet_diagnostic.CA2201.severity = silent # Do not raise reserved exception types - tests can throw whatever they want
Loading