Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 21 additions & 14 deletions src/TriggerBinding/SqlTableChangeMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,13 @@ private async Task GetTableChangesAsync(SqlConnection connection, CancellationTo
}
this._logger.LogDebugWithThreadId($"END UpdateTablesPreInvocation Duration={setLastSyncVersionDurationMs}ms");

var rows = new List<IReadOnlyDictionary<string, string>>();

// Use the version number to query for new changes.
using (SqlCommand getChangesCommand = this.BuildGetChangesCommand(connection, transaction))
{
this._logger.LogDebugWithThreadId($"BEGIN GetChanges Query={getChangesCommand.CommandText}");
var commandSw = Stopwatch.StartNew();
var rows = new List<IReadOnlyDictionary<string, string>>();

using (SqlDataReader reader = await getChangesCommand.ExecuteReaderAsync(token))
{
Expand All @@ -232,15 +233,14 @@ private async Task GetTableChangesAsync(SqlConnection connection, CancellationTo
}
}

this._rows = rows;
getChangesDurationMs = commandSw.ElapsedMilliseconds;
}
this._logger.LogDebugWithThreadId($"END GetChanges Duration={getChangesDurationMs}ms ChangedRows={this._rows.Count}");
this._logger.LogDebugWithThreadId($"END GetChanges Duration={getChangesDurationMs}ms ChangedRows={rows.Count}");

// If changes were found, acquire leases on them.
if (this._rows.Count > 0)
if (rows.Count > 0)
{
using (SqlCommand acquireLeasesCommand = this.BuildAcquireLeasesCommand(connection, transaction))
using (SqlCommand acquireLeasesCommand = this.BuildAcquireLeasesCommand(connection, transaction, rows))
{
this._logger.LogDebugWithThreadId($"BEGIN AcquireLeases Query={acquireLeasesCommand.CommandText}");
var commandSw = Stopwatch.StartNew();
Expand All @@ -252,6 +252,9 @@ private async Task GetTableChangesAsync(SqlConnection connection, CancellationTo

transaction.Commit();

// Set the rows for processing, now since the leases are acquired.
this._rows = rows;

var measures = new Dictionary<TelemetryMeasureName, double>
{
[TelemetryMeasureName.SetLastSyncVersionDurationMs] = setLastSyncVersionDurationMs,
Expand Down Expand Up @@ -576,6 +579,7 @@ private long RecomputeLastSyncVersion()
string changeVersion = row["SYS_CHANGE_VERSION"];
changeVersionSet.Add(long.Parse(changeVersion, CultureInfo.InvariantCulture));
}

// If there are more than one version numbers in the set, return the second highest one. Otherwise, return
// the only version number in the set.
long lastSyncVersion = changeVersionSet.ElementAt(changeVersionSet.Count > 1 ? changeVersionSet.Count - 2 : 0);
Expand Down Expand Up @@ -696,15 +700,16 @@ LEFT OUTER JOIN {this._userTable.BracketQuotedFullName} AS u ON {userTableJoinCo
/// </summary>
/// <param name="connection">The connection to add to the returned SqlCommand</param>
/// <param name="transaction">The transaction to add to the returned SqlCommand</param>
/// <param name="rows">Dictionary representing the table rows on which leases should be acquired</param>
/// <returns>The SqlCommand populated with the query and appropriate parameters</returns>
private SqlCommand BuildAcquireLeasesCommand(SqlConnection connection, SqlTransaction transaction)
private SqlCommand BuildAcquireLeasesCommand(SqlConnection connection, SqlTransaction transaction, IReadOnlyList<IReadOnlyDictionary<string, string>> rows)
{
var acquireLeasesQuery = new StringBuilder();

for (int rowIndex = 0; rowIndex < this._rows.Count; rowIndex++)
for (int rowIndex = 0; rowIndex < rows.Count; rowIndex++)
{
string valuesList = string.Join(", ", this._primaryKeyColumns.Select((_, colIndex) => $"@{rowIndex}_{colIndex}"));
string changeVersion = this._rows[rowIndex]["SYS_CHANGE_VERSION"];
string changeVersion = rows[rowIndex]["SYS_CHANGE_VERSION"];

acquireLeasesQuery.Append($@"
IF NOT EXISTS (SELECT * FROM {this._leasesTableName} WITH (TABLOCKX) WHERE {this._rowMatchConditions[rowIndex]})
Expand All @@ -720,7 +725,7 @@ IF NOT EXISTS (SELECT * FROM {this._leasesTableName} WITH (TABLOCKX) WHERE {this
");
}

return this.GetSqlCommandWithParameters(acquireLeasesQuery.ToString(), connection, transaction);
return this.GetSqlCommandWithParameters(acquireLeasesQuery.ToString(), connection, transaction, rows);
}

/// <summary>
Expand All @@ -738,7 +743,7 @@ private SqlCommand BuildRenewLeasesCommand(SqlConnection connection)
WHERE {matchCondition};
";

return this.GetSqlCommandWithParameters(renewLeasesQuery, connection, null);
return this.GetSqlCommandWithParameters(renewLeasesQuery, connection, null, this._rows);
}

/// <summary>
Expand Down Expand Up @@ -771,7 +776,7 @@ private SqlCommand BuildReleaseLeasesCommand(SqlConnection connection, SqlTransa
");
}

return this.GetSqlCommandWithParameters(releaseLeasesQuery.ToString(), connection, transaction);
return this.GetSqlCommandWithParameters(releaseLeasesQuery.ToString(), connection, transaction, this._rows);
}

/// <summary>
Expand Down Expand Up @@ -827,19 +832,21 @@ FROM CHANGETABLE(CHANGES {this._userTable.BracketQuotedFullName}, @current_last_
/// <param name="commandText">SQL query string</param>
/// <param name="connection">The connection to add to the returned SqlCommand</param>
/// <param name="transaction">The transaction to add to the returned SqlCommand</param>
/// <param name="rows">Dictionary representing the table rows</param>
/// <remarks>
/// Ideally, we would have a map that maps from rows to a list of SqlCommands populated with their primary key
/// values. The issue with this is that SQL doesn't seem to allow adding parameters to one collection when they
/// are part of another. So, for example, since the SqlParameters are part of the list in the map, an exception
/// is thrown if they are also added to the collection of a SqlCommand. The expected behavior seems to be to
/// rebuild the SqlParameters each time.
/// </remarks>
private SqlCommand GetSqlCommandWithParameters(string commandText, SqlConnection connection, SqlTransaction transaction)
private SqlCommand GetSqlCommandWithParameters(string commandText, SqlConnection connection,
SqlTransaction transaction, IReadOnlyList<IReadOnlyDictionary<string, string>> rows)
{
var command = new SqlCommand(commandText, connection, transaction);

SqlParameter[] parameters = Enumerable.Range(0, this._rows.Count)
.SelectMany(rowIndex => this._primaryKeyColumns.Select((col, colIndex) => new SqlParameter($"@{rowIndex}_{colIndex}", this._rows[rowIndex][col])))
SqlParameter[] parameters = Enumerable.Range(0, rows.Count)
.SelectMany(rowIndex => this._primaryKeyColumns.Select((col, colIndex) => new SqlParameter($"@{rowIndex}_{colIndex}", rows[rowIndex][col])))
.ToArray();

command.Parameters.AddRange(parameters);
Expand Down