Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions src/SqlAsyncCollector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ private async Task UpsertRowsAsync(IEnumerable<T> rows, SqlAttribute attribute,
batchCount++;
GenerateDataQueryForMerge(tableInfo, batch, out string newDataQuery, out string rowData);
command.CommandText = $"{newDataQuery} {tableInfo.Query};";
this._logger.LogDebugWithThreadId($"UpsertRowsTransactionBatch - Query={command.CommandText}");
par.Value = rowData;
await command.ExecuteNonQueryAsync();
}
Expand Down Expand Up @@ -265,7 +266,7 @@ private async Task UpsertRowsAsync(IEnumerable<T> rows, SqlAttribute attribute,
string message2 = $"Encountered exception during upsert and rollback.";
throw new AggregateException(message2, new List<Exception> { ex, ex2 });
}
throw;
throw new InvalidOperationException($"Unexpected error upserting rows", ex);
}
}
}
Expand Down Expand Up @@ -396,7 +397,6 @@ public class TableInformation
public StringComparer Comparer { get; }

/// <summary>
/// T-SQL merge statement generated from primary keys
/// T-SQL merge or insert statement generated from primary keys
/// and column names for a specific table.
/// </summary>
Expand Down Expand Up @@ -540,9 +540,9 @@ WHEN NOT MATCHED THEN
/// <param name="sqlConnection">An open connection with which to query SQL against</param>
/// <param name="fullName">Full name of table, including schema (if exists).</param>
/// <param name="logger">ILogger used to log any errors or warnings.</param>
/// <param name="columnNames">Column names from the object</param>
/// <param name="objectColumnNames">Column names from the object</param>
/// <returns>TableInformation object containing primary keys, column types, etc.</returns>
public static async Task<TableInformation> RetrieveTableInformationAsync(SqlConnection sqlConnection, string fullName, ILogger logger, IEnumerable<string> columnNames)
public static async Task<TableInformation> RetrieveTableInformationAsync(SqlConnection sqlConnection, string fullName, ILogger logger, IEnumerable<string> objectColumnNames)
{
Dictionary<TelemetryPropertyName, string> sqlConnProps = sqlConnection.AsConnectionProps();
TelemetryInstance.TrackEvent(TelemetryEventName.GetTableInfoStart, sqlConnProps);
Expand All @@ -560,13 +560,15 @@ public static async Task<TableInformation> RetrieveTableInformationAsync(SqlConn
var cmdCollation = new SqlCommand(getDatabaseCollationQuery, sqlConnection);
using (SqlDataReader rdr = await cmdCollation.ExecuteReaderAsync())
{
string collation = "";
while (await rdr.ReadAsync())
{
caseSensitive = GetCaseSensitivityFromCollation(rdr[Collation].ToString());
collation = rdr[Collation].ToString();
caseSensitive = GetCaseSensitivityFromCollation(collation);
}
caseSensitiveSw.Stop();
TelemetryInstance.TrackDuration(TelemetryEventName.GetCaseSensitivity, caseSensitiveSw.ElapsedMilliseconds, sqlConnProps);
logger.LogDebugWithThreadId($"END GetCaseSensitivity Duration={caseSensitiveSw.ElapsedMilliseconds}ms");
logger.LogDebugWithThreadId($"END GetCaseSensitivity Collation={collation} CaseSensitive={caseSensitive} Duration={caseSensitiveSw.ElapsedMilliseconds}ms");
}
}
catch (Exception ex)
Expand Down Expand Up @@ -656,7 +658,7 @@ public static async Task<TableInformation> RetrieveTableInformationAsync(SqlConn
// Match SQL Primary Key column names to POCO property objects. Ensure none are missing.
StringComparison comparison = caseSensitive ? StringComparison.Ordinal : StringComparison.OrdinalIgnoreCase;
IEnumerable<PropertyInfo> primaryKeyProperties = typeof(T).GetProperties().Where(f => primaryKeys.Any(k => string.Equals(k.Name, f.Name, comparison)));
IEnumerable<string> primaryKeysFromObject = columnNames.Where(f => primaryKeys.Any(k => string.Equals(k.Name, f, comparison)));
IEnumerable<string> primaryKeysFromObject = objectColumnNames.Where(f => primaryKeys.Any(k => string.Equals(k.Name, f, comparison)));
IEnumerable<PrimaryKey> missingPrimaryKeysFromItem = primaryKeys
.Where(k => !primaryKeysFromObject.Contains(k.Name, comparer));
bool hasIdentityColumnPrimaryKeys = primaryKeys.Any(k => k.IsIdentity);
Expand All @@ -674,7 +676,8 @@ public static async Task<TableInformation> RetrieveTableInformationAsync(SqlConn
// If any identity columns or columns with default values aren't included in the object then we have to generate a basic insert since the merge statement expects all primary key
// columns to exist. (the merge statement can handle nullable columns though if those exist)
bool usingInsertQuery = (hasIdentityColumnPrimaryKeys || hasDefaultColumnPrimaryKeys) && missingPrimaryKeysFromItem.Any();
IEnumerable<string> bracketedColumnNamesFromItem = columnNames

IEnumerable<string> bracketedColumnNamesFromItem = objectColumnNames
.Where(prop => !primaryKeys.Any(k => k.IsIdentity && string.Equals(k.Name, prop, comparison))) // Skip any identity columns, those should never be updated
.Select(prop => prop.AsBracketQuotedString());
string query = usingInsertQuery ? GetInsertQuery(table, bracketedColumnNamesFromItem) : GetMergeQuery(primaryKeys, table, bracketedColumnNamesFromItem);
Expand All @@ -689,7 +692,7 @@ public static async Task<TableInformation> RetrieveTableInformationAsync(SqlConn
sqlConnProps.Add(TelemetryPropertyName.QueryType, usingInsertQuery ? "insert" : "merge");
sqlConnProps.Add(TelemetryPropertyName.HasIdentityColumn, hasIdentityColumnPrimaryKeys.ToString());
TelemetryInstance.TrackDuration(TelemetryEventName.GetTableInfoEnd, tableInfoSw.ElapsedMilliseconds, sqlConnProps, durations);
logger.LogDebugWithThreadId($"END RetrieveTableInformationAsync Duration={tableInfoSw.ElapsedMilliseconds}ms DB and Table: {sqlConnection.Database}.{fullName}. Primary keys: [{string.Join(",", primaryKeys.Select(pk => pk.Name))}]. SQL Column and Definitions: [{string.Join(",", columnDefinitionsFromSQL)}]");
logger.LogDebugWithThreadId($"END RetrieveTableInformationAsync Duration={tableInfoSw.ElapsedMilliseconds}ms DB and Table: {sqlConnection.Database}.{fullName}. Primary keys: [{string.Join(",", primaryKeys.Select(pk => pk.Name))}]. SQL Column and Definitions: [{string.Join(",", columnDefinitionsFromSQL)}] Object columns: [{string.Join(",", objectColumnNames)}]");
return new TableInformation(primaryKeyProperties, columnDefinitionsFromSQL, comparer, query, hasIdentityColumnPrimaryKeys);
}
}
Expand Down