Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/SqlAsyncCollector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,8 @@ private static void GenerateDataQueryForMerge(TableInformation table, IEnumerabl

public class TableInformation
{
private const string ISO_8061_DATETIME_FORMAT = "yyyy'-'MM'-'dd'T'HH':'mm':'ss'.'fff";

public IEnumerable<PropertyInfo> PrimaryKeys { get; }

/// <summary>
Expand Down Expand Up @@ -391,9 +393,13 @@ public TableInformation(IEnumerable<PropertyInfo> primaryKeys, IDictionary<strin
this.Query = query;
this.HasIdentityColumnPrimaryKeys = hasIdentityColumnPrimaryKeys;

// Convert datetime strings to ISO 8061 format to avoid potential errors on the server when converting into a datetime. This
// is the only format that are an international standard.
// https://docs.microsoft.com/previous-versions/sql/sql-server-2008-r2/ms180878(v=sql.105)
this.JsonSerializerSettings = new JsonSerializerSettings
{
ContractResolver = new DynamicPOCOContractResolver(columns, comparer)
ContractResolver = new DynamicPOCOContractResolver(columns, comparer),
DateFormatString = ISO_8061_DATETIME_FORMAT
};
}
public static bool GetCaseSensitivityFromCollation(string collation)
Expand Down
4 changes: 2 additions & 2 deletions src/SqlBindingUtilities.cs
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ public static SqlCommand BuildCommand(SqlAttribute attribute, SqlConnection conn
/// </summary>
/// <param name="reader">Used to determine the columns of the table as well as the next SQL row to process</param>
/// <returns>The built dictionary</returns>
public static IReadOnlyDictionary<string, string> BuildDictionaryFromSqlRow(SqlDataReader reader)
public static IReadOnlyDictionary<string, object> BuildDictionaryFromSqlRow(SqlDataReader reader)
{
return Enumerable.Range(0, reader.FieldCount).ToDictionary(reader.GetName, i => reader.GetValue(i).ToString());
return Enumerable.Range(0, reader.FieldCount).ToDictionary(reader.GetName, i => reader.GetValue(i));
}

/// <summary>
Expand Down
30 changes: 15 additions & 15 deletions src/TriggerBinding/SqlTableChangeMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ internal sealed class SqlTableChangeMonitor<T> : IDisposable

private readonly IDictionary<TelemetryPropertyName, string> _telemetryProps;

private IReadOnlyList<IReadOnlyDictionary<string, string>> _rows;
private IReadOnlyList<IReadOnlyDictionary<string, object>> _rows;
private int _leaseRenewalCount;
private State _state;

Expand Down Expand Up @@ -121,7 +121,7 @@ public SqlTableChangeMonitor(
this._telemetryProps = telemetryProps;

this._rowsLock = new SemaphoreSlim(1, 1);
this._rows = new List<IReadOnlyDictionary<string, string>>();
this._rows = new List<IReadOnlyDictionary<string, object>>();
this._leaseRenewalCount = 0;
this._state = State.CheckingForChanges;

Expand Down Expand Up @@ -222,7 +222,7 @@ private async Task GetTableChangesAsync(SqlConnection connection, CancellationTo
}
this._logger.LogDebugWithThreadId($"END UpdateTablesPreInvocation Duration={setLastSyncVersionDurationMs}ms");

var rows = new List<IReadOnlyDictionary<string, string>>();
var rows = new List<IReadOnlyDictionary<string, object>>();

// Use the version number to query for new changes.
using (SqlCommand getChangesCommand = this.BuildGetChangesCommand(connection, transaction))
Expand Down Expand Up @@ -292,7 +292,7 @@ private async Task GetTableChangesAsync(SqlConnection connection, CancellationTo
{
// If there's an exception in any part of the process, we want to clear all of our data in memory and
// retry checking for changes again.
this._rows = new List<IReadOnlyDictionary<string, string>>();
this._rows = new List<IReadOnlyDictionary<string, object>>();
this._logger.LogError($"Failed to check for changes in table '{this._userTable.FullName}' due to exception: {e.GetType()}. Exception message: {e.Message}");
TelemetryInstance.TrackException(TelemetryErrorName.GetChanges, e, this._telemetryProps);
}
Expand Down Expand Up @@ -477,7 +477,7 @@ private async Task ClearRowsAsync()

this._leaseRenewalCount = 0;
this._state = State.CheckingForChanges;
this._rows = new List<IReadOnlyDictionary<string, string>>();
this._rows = new List<IReadOnlyDictionary<string, object>>();

this._logger.LogDebugWithThreadId("ReleaseRowsLock - ClearRows");
this._rowsLock.Release();
Expand Down Expand Up @@ -576,9 +576,9 @@ private async Task ReleaseLeasesAsync(SqlConnection connection, CancellationToke
private long RecomputeLastSyncVersion()
{
var changeVersionSet = new SortedSet<long>();
foreach (IReadOnlyDictionary<string, string> row in this._rows)
foreach (IReadOnlyDictionary<string, object> row in this._rows)
{
string changeVersion = row["SYS_CHANGE_VERSION"];
string changeVersion = row["SYS_CHANGE_VERSION"].ToString();
changeVersionSet.Add(long.Parse(changeVersion, CultureInfo.InvariantCulture));
}

Expand All @@ -599,13 +599,13 @@ private IReadOnlyList<SqlChange<T>> ProcessChanges()
{
this._logger.LogDebugWithThreadId("BEGIN ProcessChanges");
var changes = new List<SqlChange<T>>();
foreach (IReadOnlyDictionary<string, string> row in this._rows)
foreach (IReadOnlyDictionary<string, object> row in this._rows)
{
SqlChangeOperation operation = GetChangeOperation(row);

// If the row has been deleted, there is no longer any data for it in the user table. The best we can do
// is populate the row-item with the primary key values of the row.
Dictionary<string, string> item = operation == SqlChangeOperation.Delete
Dictionary<string, object> item = operation == SqlChangeOperation.Delete
? this._primaryKeyColumns.ToDictionary(col => col, col => row[col])
: this._userTableColumns.ToDictionary(col => col, col => row[col]);

Expand All @@ -621,9 +621,9 @@ private IReadOnlyList<SqlChange<T>> ProcessChanges()
/// <param name="row">The (combined) row from the change table and leases table</param>
/// <exception cref="InvalidDataException">Thrown if the value of the "SYS_CHANGE_OPERATION" column is none of "I", "U", or "D"</exception>
/// <returns>SqlChangeOperation.Insert for an insert, SqlChangeOperation.Update for an update, and SqlChangeOperation.Delete for a delete</returns>
private static SqlChangeOperation GetChangeOperation(IReadOnlyDictionary<string, string> row)
private static SqlChangeOperation GetChangeOperation(IReadOnlyDictionary<string, object> row)
{
string operation = row["SYS_CHANGE_OPERATION"];
string operation = row["SYS_CHANGE_OPERATION"].ToString();
switch (operation)
{
case "I": return SqlChangeOperation.Insert;
Expand Down Expand Up @@ -704,14 +704,14 @@ LEFT OUTER JOIN {this._userTable.BracketQuotedFullName} AS u ON {userTableJoinCo
/// <param name="transaction">The transaction to add to the returned SqlCommand</param>
/// <param name="rows">Dictionary representing the table rows on which leases should be acquired</param>
/// <returns>The SqlCommand populated with the query and appropriate parameters</returns>
private SqlCommand BuildAcquireLeasesCommand(SqlConnection connection, SqlTransaction transaction, IReadOnlyList<IReadOnlyDictionary<string, string>> rows)
private SqlCommand BuildAcquireLeasesCommand(SqlConnection connection, SqlTransaction transaction, IReadOnlyList<IReadOnlyDictionary<string, object>> rows)
{
var acquireLeasesQuery = new StringBuilder();

for (int rowIndex = 0; rowIndex < rows.Count; rowIndex++)
{
string valuesList = string.Join(", ", this._primaryKeyColumns.Select((_, colIndex) => $"@{rowIndex}_{colIndex}"));
string changeVersion = rows[rowIndex]["SYS_CHANGE_VERSION"];
string changeVersion = rows[rowIndex]["SYS_CHANGE_VERSION"].ToString();

acquireLeasesQuery.Append($@"
IF NOT EXISTS (SELECT * FROM {this._leasesTableName} WITH (TABLOCKX) WHERE {this._rowMatchConditions[rowIndex]})
Expand Down Expand Up @@ -761,7 +761,7 @@ private SqlCommand BuildReleaseLeasesCommand(SqlConnection connection, SqlTransa

for (int rowIndex = 0; rowIndex < this._rows.Count; rowIndex++)
{
string changeVersion = this._rows[rowIndex]["SYS_CHANGE_VERSION"];
string changeVersion = this._rows[rowIndex]["SYS_CHANGE_VERSION"].ToString();

releaseLeasesQuery.Append($@"
SELECT @current_change_version = {SqlTriggerConstants.LeasesTableChangeVersionColumnName}
Expand Down Expand Up @@ -843,7 +843,7 @@ FROM CHANGETABLE(CHANGES {this._userTable.BracketQuotedFullName}, @current_last_
/// rebuild the SqlParameters each time.
/// </remarks>
private SqlCommand GetSqlCommandWithParameters(string commandText, SqlConnection connection,
SqlTransaction transaction, IReadOnlyList<IReadOnlyDictionary<string, string>> rows)
SqlTransaction transaction, IReadOnlyList<IReadOnlyDictionary<string, object>> rows)
{
var command = new SqlCommand(commandText, connection, transaction);

Expand Down
16 changes: 16 additions & 0 deletions test/Common/ProductColumnTypes.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;

namespace Microsoft.Azure.WebJobs.Extensions.Sql.Tests.Common
{
public class ProductColumnTypes
{
public int ProductID { get; set; }

public DateTime Datetime { get; set; }

public DateTime Datetime2 { get; set; }
}
}
5 changes: 5 additions & 0 deletions test/Database/Tables/ProductsColumnTypes.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE [ProductsColumnTypes] (
[ProductId] [int] NOT NULL PRIMARY KEY,
[Datetime] [datetime],
[Datetime2] [datetime2]
)
2 changes: 2 additions & 0 deletions test/GlobalSuppressions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,5 @@
[assembly: SuppressMessage("Style", "IDE0060:Remove unused parameter", Justification = "Unused parameter is required by functions binding", Scope = "member", Target = "~M:Microsoft.Azure.WebJobs.Extensions.Sql.Tests.Integration.ReservedPrimaryKeyColumnNamesTrigger.Run(System.Collections.Generic.IReadOnlyList{Microsoft.Azure.WebJobs.Extensions.Sql.SqlChange{Microsoft.Azure.WebJobs.Extensions.Sql.Samples.Common.Product})")]
[assembly: SuppressMessage("Style", "IDE0060:Remove unused parameter", Justification = "Unused parameter is required by functions binding", Scope = "member", Target = "~M:Microsoft.Azure.WebJobs.Extensions.Sql.Tests.Integration.TableNotPresentTrigger.Run(System.Collections.Generic.IReadOnlyList{Microsoft.Azure.WebJobs.Extensions.Sql.SqlChange{Microsoft.Azure.WebJobs.Extensions.Sql.Samples.Common.Product})")]
[assembly: SuppressMessage("Style", "IDE0060:Remove unused parameter", Justification = "Unused parameter is required by functions binding", Scope = "member", Target = "~M:Microsoft.Azure.WebJobs.Extensions.Sql.Tests.Integration.UnsupportedColumnTypesTrigger.Run(System.Collections.Generic.IReadOnlyList{Microsoft.Azure.WebJobs.Extensions.Sql.SqlChange{Microsoft.Azure.WebJobs.Extensions.Sql.Samples.Common.Product})")]
[assembly: SuppressMessage("Style", "IDE0060:Remove unused parameter", Justification = "<Pending>", Scope = "member", Target = "~M:Microsoft.Azure.WebJobs.Extensions.Sql.Samples.InputBindingSamples.GetProductsColumnTypesSerializationDifferentCulture.Run(Microsoft.AspNetCore.Http.HttpRequest,System.Collections.Generic.IAsyncEnumerable{Microsoft.Azure.WebJobs.Extensions.Sql.Tests.Common.ProductColumnTypes},Microsoft.Extensions.Logging.ILogger)~System.Threading.Tasks.Task{Microsoft.AspNetCore.Mvc.IActionResult}")]
[assembly: SuppressMessage("Style", "IDE0060:Remove unused parameter", Justification = "<Pending>", Scope = "member", Target = "~M:Microsoft.Azure.WebJobs.Extensions.Sql.Samples.InputBindingSamples.GetProductsColumnTypesSerialization.Run(Microsoft.AspNetCore.Http.HttpRequest,System.Collections.Generic.IAsyncEnumerable{Microsoft.Azure.WebJobs.Extensions.Sql.Tests.Common.ProductColumnTypes},Microsoft.Extensions.Logging.ILogger)~System.Threading.Tasks.Task{Microsoft.AspNetCore.Mvc.IActionResult}")]
40 changes: 40 additions & 0 deletions test/Integration/SqlInputBindingIntegrationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,5 +130,45 @@ public async void GetProductNamesViewTest(SupportedLanguages lang)

Assert.Equal(expectedResponse, TestUtils.CleanJsonString(actualResponse), StringComparer.OrdinalIgnoreCase);
}

/// <summary>
/// Verifies that serializing an item with various data types works when the language is
/// set to a non-enUS language.
/// </summary>
[Theory]
[SqlInlineData()]
[UnsupportedLanguages(SupportedLanguages.JavaScript)] // Javascript doesn't have the concept of a runtime language used during serialization
public async void GetProductsColumnTypesSerializationDifferentCultureTest(SupportedLanguages lang)
{
this.StartFunctionHost(nameof(GetProductsColumnTypesSerializationDifferentCulture), lang, true);

this.ExecuteNonQuery("INSERT INTO [dbo].[ProductsColumnTypes] VALUES (" +
"999, " + // ProductId
"GETDATE(), " + // Datetime field
"GETDATE())"); // Datetime2 field

await this.SendInputRequest("getproducts-columntypesserializationdifferentculture");

// If we get here the test has succeeded - it'll throw an exception if serialization fails
}

/// <summary>
/// Verifies that serializing an item with various data types works as expected
/// </summary>
[Theory]
[SqlInlineData()]
public async void GetProductsColumnTypesSerializationTest(SupportedLanguages lang)
{
this.StartFunctionHost(nameof(GetProductsColumnTypesSerialization), lang, true);

this.ExecuteNonQuery("INSERT INTO [dbo].[ProductsColumnTypes] VALUES (" +
"999, " + // ProductId
"GETDATE(), " + // Datetime field
"GETDATE())"); // Datetime2 field

await this.SendInputRequest("getproducts-columntypesserialization");

// If we get here the test has succeeded - it'll throw an exception if serialization fails
}
}
}
21 changes: 21 additions & 0 deletions test/Integration/SqlOutputBindingIntegrationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,27 @@ public void AddProductArrayTest(SupportedLanguages lang)
Assert.Equal(2, this.ExecuteScalar("SELECT ProductId FROM Products WHERE Cost = 12"));
}

/// <summary>
/// Test compatability with converting various data types to their respective
/// SQL server types.
/// </summary>
/// <param name="lang">The language to run the test against</param>
[Theory]
[SqlInlineData()]
public void AddProductColumnTypesTest(SupportedLanguages lang)
{
this.StartFunctionHost(nameof(AddProductColumnTypes), lang, true);

var queryParameters = new Dictionary<string, string>()
{
{ "productId", "999" }
};

this.SendOutputGetRequest("addproduct-columntypes", queryParameters).Wait();

// If we get here then the test is successful - an exception will be thrown if there were any problems
}

[Theory]
[SqlInlineData()]
[UnsupportedLanguages(SupportedLanguages.JavaScript)] // Collectors are only available in C#
Expand Down
35 changes: 35 additions & 0 deletions test/Integration/test-csharp/AddProductColumnTypes.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Azure.WebJobs.Extensions.Sql.Tests.Common;

namespace Microsoft.Azure.WebJobs.Extensions.Sql.Tests.Integration
{
public static class AddProductColumnTypes
{
/// <summary>
/// This function is used to test compatability with converting various data types to their respective
/// SQL server types.
/// </summary>
[FunctionName(nameof(AddProductColumnTypes))]
public static IActionResult Run(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "addproduct-columntypes")] HttpRequest req,
[Sql("dbo.ProductsColumnTypes", ConnectionStringSetting = "SqlConnectionString")] out ProductColumnTypes product)
{
product = new ProductColumnTypes()
{
ProductID = int.Parse(req.Query["productId"]),
Datetime = DateTime.UtcNow,
Datetime2 = DateTime.UtcNow
};

// Items were inserted successfully so return success, an exception would be thrown if there
// was any issues
return new OkObjectResult("Success!");
}
}
}
40 changes: 40 additions & 0 deletions test/Integration/test-csharp/GetProductColumnTypesSerialization.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Collections.Generic;
using System.Text.Json;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Azure.WebJobs.Extensions.Sql.Tests.Common;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.WebJobs.Extensions.Sql.Samples.InputBindingSamples
{
public static class GetProductsColumnTypesSerialization
{
/// <summary>
/// This function verifies that serializing an item with various data types
/// works as expected.
/// Note this uses IAsyncEnumerable because IEnumerable serializes the entire table directly,
/// instead of each item one by one (which is where issues can occur)
/// </summary>
[FunctionName(nameof(GetProductsColumnTypesSerialization))]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "getproducts-columntypesserialization")]
HttpRequest req,
[Sql("SELECT * FROM [dbo].[ProductsColumnTypes]",
CommandType = System.Data.CommandType.Text,
ConnectionStringSetting = "SqlConnectionString")]
IAsyncEnumerable<ProductColumnTypes> products,
ILogger log)
{
await foreach (ProductColumnTypes item in products)
{
log.LogInformation(JsonSerializer.Serialize(item));
}
return new OkObjectResult(products);
}
}
}
Loading