Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions src/SqlAsyncCollector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ public async Task FlushAsync(CancellationToken cancellationToken = default)
/// <param name="configuration"> Used to build up the connection </param>
private async Task UpsertRowsAsync(IEnumerable<T> rows, SqlAttribute attribute, IConfiguration configuration)
{
this._logger.LogDebugWithThreadId("BEGIN UpsertRowsAsync");
var upsertRowsAsyncSw = Stopwatch.StartNew();
using (SqlConnection connection = SqlBindingUtilities.BuildConnection(attribute.ConnectionStringSetting, configuration))
{
await connection.OpenAsync();
Expand All @@ -171,7 +173,6 @@ private async Task UpsertRowsAsync(IEnumerable<T> rows, SqlAttribute attribute,
AbsoluteExpiration = DateTimeOffset.Now.AddMinutes(10)
};

this._logger.LogInformation($"DB and Table: {connection.Database}.{fullTableName}. Primary keys: [{string.Join(",", tableInfo.PrimaryKeys.Select(pk => pk.Name))}]. SQL Column and Definitions: [{string.Join(",", tableInfo.ColumnDefinitions)}]");
cachedTables.Set(cacheKey, tableInfo, policy);
}
else
Expand All @@ -189,6 +190,7 @@ private async Task UpsertRowsAsync(IEnumerable<T> rows, SqlAttribute attribute,
}

TelemetryInstance.TrackEvent(TelemetryEventName.UpsertStart, props);
this._logger.LogDebugWithThreadId("BEGIN UpsertRowsTransaction");
var transactionSw = Stopwatch.StartNew();
int batchSize = 1000;
SqlTransaction transaction = connection.BeginTransaction();
Expand All @@ -209,14 +211,17 @@ private async Task UpsertRowsAsync(IEnumerable<T> rows, SqlAttribute attribute,
await command.ExecuteNonQueryAsync();
}
transaction.Commit();
transactionSw.Stop();
upsertRowsAsyncSw.Stop();
var measures = new Dictionary<TelemetryMeasureName, double>()
{
{ TelemetryMeasureName.BatchCount, batchCount },
{ TelemetryMeasureName.TransactionDurationMs, transactionSw.ElapsedMilliseconds },
{ TelemetryMeasureName.CommandDurationMs, commandSw.ElapsedMilliseconds }
};
TelemetryInstance.TrackEvent(TelemetryEventName.UpsertEnd, props, measures);
this._logger.LogInformation($"Upserted {rows.Count()} row(s) into database: {connection.Database} and table: {fullTableName}.");
this._logger.LogDebugWithThreadId($"END UpsertRowsTransaction Duration={transactionSw.ElapsedMilliseconds}ms Upserted {rows.Count()} row(s) into database: {connection.Database} and table: {fullTableName}.");
this._logger.LogDebugWithThreadId($"END UpsertRowsAsync Duration={upsertRowsAsyncSw.ElapsedMilliseconds}ms");
}
catch (Exception ex)
{
Expand Down Expand Up @@ -504,6 +509,7 @@ public static async Task<TableInformation> RetrieveTableInformationAsync(SqlConn
{
Dictionary<TelemetryPropertyName, string> sqlConnProps = sqlConnection.AsConnectionProps();
TelemetryInstance.TrackEvent(TelemetryEventName.GetTableInfoStart, sqlConnProps);
logger.LogDebugWithThreadId("BEGIN RetrieveTableInformationAsync");
var table = new SqlObject(fullName);

// Get case sensitivity from database collation (default to false if any exception occurs)
Expand All @@ -512,7 +518,9 @@ public static async Task<TableInformation> RetrieveTableInformationAsync(SqlConn
var caseSensitiveSw = Stopwatch.StartNew();
try
{
var cmdCollation = new SqlCommand(GetDatabaseCollationQuery(sqlConnection), sqlConnection);
string getDatabaseCollationQuery = GetDatabaseCollationQuery(sqlConnection);
logger.LogDebugWithThreadId($"BEGIN GetCaseSensitivity Query=\"{getDatabaseCollationQuery}\"");
var cmdCollation = new SqlCommand(getDatabaseCollationQuery, sqlConnection);
using (SqlDataReader rdr = await cmdCollation.ExecuteReaderAsync())
{
while (await rdr.ReadAsync())
Expand All @@ -521,6 +529,7 @@ public static async Task<TableInformation> RetrieveTableInformationAsync(SqlConn
}
caseSensitiveSw.Stop();
TelemetryInstance.TrackDuration(TelemetryEventName.GetCaseSensitivity, caseSensitiveSw.ElapsedMilliseconds, sqlConnProps);
logger.LogDebugWithThreadId($"END GetCaseSensitivity Duration={caseSensitiveSw.ElapsedMilliseconds}ms");
}
}
catch (Exception ex)
Expand All @@ -539,7 +548,9 @@ public static async Task<TableInformation> RetrieveTableInformationAsync(SqlConn
var columnDefinitionsSw = Stopwatch.StartNew();
try
{
var cmdColDef = new SqlCommand(GetColumnDefinitionsQuery(table), sqlConnection);
string getColumnDefinitionsQuery = GetColumnDefinitionsQuery(table);
logger.LogDebugWithThreadId($"BEGIN GetColumnDefinitions Query=\"{getColumnDefinitionsQuery}\"");
var cmdColDef = new SqlCommand(getColumnDefinitionsQuery, sqlConnection);
using (SqlDataReader rdr = await cmdColDef.ExecuteReaderAsync())
{
while (await rdr.ReadAsync())
Expand All @@ -549,6 +560,7 @@ public static async Task<TableInformation> RetrieveTableInformationAsync(SqlConn
}
columnDefinitionsSw.Stop();
TelemetryInstance.TrackDuration(TelemetryEventName.GetColumnDefinitions, columnDefinitionsSw.ElapsedMilliseconds, sqlConnProps);
logger.LogDebugWithThreadId($"END GetColumnDefinitions Duration={columnDefinitionsSw.ElapsedMilliseconds}ms");
}

}
Expand All @@ -573,7 +585,9 @@ public static async Task<TableInformation> RetrieveTableInformationAsync(SqlConn
var primaryKeysSw = Stopwatch.StartNew();
try
{
var cmd = new SqlCommand(GetPrimaryKeysQuery(table), sqlConnection);
string getPrimaryKeysQuery = GetPrimaryKeysQuery(table);
logger.LogDebugWithThreadId($"BEGIN GetPrimaryKeys Query=\"{getPrimaryKeysQuery}\"");
var cmd = new SqlCommand(getPrimaryKeysQuery, sqlConnection);
using (SqlDataReader rdr = await cmd.ExecuteReaderAsync())
{
while (await rdr.ReadAsync())
Expand All @@ -583,6 +597,7 @@ public static async Task<TableInformation> RetrieveTableInformationAsync(SqlConn
}
primaryKeysSw.Stop();
TelemetryInstance.TrackDuration(TelemetryEventName.GetPrimaryKeys, primaryKeysSw.ElapsedMilliseconds, sqlConnProps);
logger.LogDebugWithThreadId($"END GetPrimaryKeys Duration={primaryKeysSw.ElapsedMilliseconds}ms");
}
}
catch (Exception ex)
Expand Down Expand Up @@ -633,6 +648,7 @@ public static async Task<TableInformation> RetrieveTableInformationAsync(SqlConn
sqlConnProps.Add(TelemetryPropertyName.QueryType, usingInsertQuery ? "insert" : "merge");
sqlConnProps.Add(TelemetryPropertyName.HasIdentityColumn, hasIdentityColumnPrimaryKeys.ToString());
TelemetryInstance.TrackDuration(TelemetryEventName.GetTableInfoEnd, tableInfoSw.ElapsedMilliseconds, sqlConnProps, durations);
logger.LogDebugWithThreadId($"END RetrieveTableInformationAsync Duration={tableInfoSw.ElapsedMilliseconds}ms DB and Table: {sqlConnection.Database}.{fullName}. Primary keys: [{string.Join(",", primaryKeyFields.Select(pk => pk.Name))}]. SQL Column and Definitions: [{string.Join(",", columnDefinitionsFromSQL)}]");
return new TableInformation(primaryKeyFields, columnDefinitionsFromSQL, comparer, query, hasIdentityColumnPrimaryKeys);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/SqlBindingConfigProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void Initialize(ExtensionConfigContext context)
TelemetryInstance.Initialize(this._configuration, logger);
#pragma warning disable CS0618 // Fine to use this for our stuff
FluentBindingRule<SqlAttribute> inputOutputRule = context.AddBindingRule<SqlAttribute>();
var converter = new SqlConverter(this._configuration);
var converter = new SqlConverter(this._configuration, logger);
inputOutputRule.BindToInput(converter);
inputOutputRule.BindToInput<string>(typeof(SqlGenericsConverter<string>), this._configuration, logger);
inputOutputRule.BindToCollector<SQLObjectOpenType>(typeof(SqlAsyncCollectorBuilder<>), this._configuration, logger);
Expand Down
24 changes: 20 additions & 4 deletions src/SqlConverters.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Collections.Generic;
using System.Data;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Extensions.Sql.Telemetry;
Expand All @@ -21,17 +22,20 @@ internal class SqlConverters
internal class SqlConverter : IConverter<SqlAttribute, SqlCommand>
{
private readonly IConfiguration _configuration;
private readonly ILogger _logger;

/// <summary>
/// Initializes a new instance of the <see cref="SqlConverter/>"/> class.
/// </summary>
/// <param name="configuration"></param>
/// <param name="logger">ILogger used to log information and warnings</param>
/// <exception cref="ArgumentNullException">
/// Thrown if the configuration is null
/// </exception>
public SqlConverter(IConfiguration configuration)
public SqlConverter(IConfiguration configuration, ILogger logger)
{
this._configuration = configuration ?? throw new ArgumentNullException(nameof(configuration));
this._logger = logger;
TelemetryInstance.TrackCreate(CreateType.SqlConverter);
}

Expand All @@ -46,10 +50,14 @@ public SqlConverter(IConfiguration configuration)
public SqlCommand Convert(SqlAttribute attribute)
{
TelemetryInstance.TrackConvert(ConvertType.SqlCommand);
this._logger.LogDebugWithThreadId("BEGIN Convert (SqlCommand)");
var sw = Stopwatch.StartNew();
try
{
return SqlBindingUtilities.BuildCommand(attribute, SqlBindingUtilities.BuildConnection(
SqlCommand command = SqlBindingUtilities.BuildCommand(attribute, SqlBindingUtilities.BuildConnection(
attribute.ConnectionStringSetting, this._configuration));
this._logger.LogDebugWithThreadId($"END Convert (SqlCommand) Duration={sw.ElapsedMilliseconds}ms");
return command;
}
catch (Exception ex)
{
Expand Down Expand Up @@ -98,10 +106,14 @@ public SqlGenericsConverter(IConfiguration configuration, ILogger logger)
public async Task<IEnumerable<T>> ConvertAsync(SqlAttribute attribute, CancellationToken cancellationToken)
{
TelemetryInstance.TrackConvert(ConvertType.IEnumerable);
this._logger.LogDebugWithThreadId("BEGIN ConvertAsync (IEnumerable)");
var sw = Stopwatch.StartNew();
try
{
string json = await this.BuildItemFromAttributeAsync(attribute);
return JsonConvert.DeserializeObject<IEnumerable<T>>(json);
IEnumerable<T> result = JsonConvert.DeserializeObject<IEnumerable<T>>(json);
this._logger.LogDebugWithThreadId($"END ConvertAsync (IEnumerable) Duration={sw.ElapsedMilliseconds}ms");
return result;
}
catch (Exception ex)
{
Expand Down Expand Up @@ -129,9 +141,13 @@ public async Task<IEnumerable<T>> ConvertAsync(SqlAttribute attribute, Cancellat
async Task<string> IAsyncConverter<SqlAttribute, string>.ConvertAsync(SqlAttribute attribute, CancellationToken cancellationToken)
{
TelemetryInstance.TrackConvert(ConvertType.Json);
this._logger.LogDebugWithThreadId("BEGIN ConvertAsync (Json)");
var sw = Stopwatch.StartNew();
try
{
return await this.BuildItemFromAttributeAsync(attribute);
string result = await this.BuildItemFromAttributeAsync(attribute);
this._logger.LogDebugWithThreadId($"END ConvertAsync (Json) Duration={sw.ElapsedMilliseconds}ms");
return result;
}
catch (Exception ex)
{
Expand Down
6 changes: 6 additions & 0 deletions src/Utils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Linq;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using MoreLinq;
using Newtonsoft.Json.Linq;

Expand Down Expand Up @@ -93,5 +94,10 @@ public static void LowercasePropertyNames(this JObject obj)
property.Replace(new JProperty(property.Name.ToLowerInvariant(), property.Value));
}
}

public static void LogDebugWithThreadId(this ILogger logger, string message, params object[] args)
{
logger.LogDebug($"TID:{Environment.CurrentManagedThreadId} {message}", args);
}
}
}
2 changes: 1 addition & 1 deletion test/Unit/SqlInputBindingTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public void TestNullConfiguration()
{
Assert.Throws<ArgumentNullException>(() => new SqlBindingConfigProvider(null, loggerFactory.Object));
Assert.Throws<ArgumentNullException>(() => new SqlBindingConfigProvider(config.Object, null));
Assert.Throws<ArgumentNullException>(() => new SqlConverter(null));
Assert.Throws<ArgumentNullException>(() => new SqlConverter(null, logger.Object));
Assert.Throws<ArgumentNullException>(() => new SqlGenericsConverter<string>(null, logger.Object));
}

Expand Down