Skip to content
This repository has been archived by the owner on Jan 23, 2023. It is now read-only.

SqlClient minor memory improvements #34134

Merged
merged 6 commits into from Jan 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
225 changes: 113 additions & 112 deletions src/System.Data.SqlClient/src/System/Data/SqlClient/SqlBulkCopy.cs
Expand Up @@ -1841,7 +1841,10 @@ private Task WriteRowSourceToServerAsync(int columnCount, CancellationToken ctok
}
else
{
AsyncHelper.ContinueTask(writeTask, tcs, () => tcs.SetResult(null));
AsyncHelper.ContinueTaskWithState(writeTask, tcs,
state: tcs,
onSuccess: state => ((TaskCompletionSource<object>)state).SetResult(null)
);
}
}, ctoken); // We do not need to propagate exception, etc, from reconnect task, we just need to wait for it to finish.
return tcs.Task;
Expand Down Expand Up @@ -2153,17 +2156,17 @@ private Task CopyColumnsAsync(int col, TaskCompletionSource<object> source = nul
private void CopyColumnsAsyncSetupContinuation(TaskCompletionSource<object> source, Task task, int i)
{
AsyncHelper.ContinueTask(task, source, () =>
{
if (i + 1 < _sortedColumnMappings.Count)
{
CopyColumnsAsync(i + 1, source); //continue from the next column
}
else
{
source.SetResult(null);
if (i + 1 < _sortedColumnMappings.Count)
{
CopyColumnsAsync(i + 1, source); //continue from the next column
}
else
{
source.SetResult(null);
}
}
},
_connection.GetOpenTdsConnection());
);
}

// The notification logic.
Expand Down Expand Up @@ -2257,24 +2260,6 @@ private Task CheckForCancellation(CancellationToken cts, TaskCompletionSource<ob
}
}

private TaskCompletionSource<object> ContinueTaskPend(Task task, TaskCompletionSource<object> source, Func<TaskCompletionSource<object>> action)
{
if (task == null)
{
return action();
}
else
{
Debug.Assert(source != null, "source should already be initialized if task is not null");
AsyncHelper.ContinueTask(task, source, () =>
{
TaskCompletionSource<object> newSource = action();
Debug.Assert(newSource == null, "Shouldn't create a new source when one already exists");
});
}
return null;
}

// Copies all the rows in a batch.
// Maintains state machine with state variable: rowSoFar.
// Returned Task could be null in two cases: (1) _isAsyncBulkCopy == false, or (2) _isAsyncBulkCopy == true but all async writes finished synchronously.
Expand Down Expand Up @@ -2315,7 +2300,7 @@ private Task CopyRowsAsync(int rowsSoFar, int totalRows, CancellationToken cts,
}
resultTask = source.Task;

AsyncHelper.ContinueTask(readTask, source, () => CopyRowsAsync(i + 1, totalRows, cts, source), connectionToDoom: _connection.GetOpenTdsConnection());
AsyncHelper.ContinueTask(readTask, source, () => CopyRowsAsync(i + 1, totalRows, cts, source));
return resultTask; // Associated task will be completed when all rows are copied to server/exception/cancelled.
}
}
Expand All @@ -2325,19 +2310,20 @@ private Task CopyRowsAsync(int rowsSoFar, int totalRows, CancellationToken cts,
resultTask = source.Task;

AsyncHelper.ContinueTask(task, source, onSuccess: () =>
{
CheckAndRaiseNotification(); // Check for notification now as the current row copy is done at this moment.

Task readTask = ReadFromRowSourceAsync(cts);
if (readTask == null)
{
CopyRowsAsync(i + 1, totalRows, cts, source);
}
else
{
AsyncHelper.ContinueTask(readTask, source, onSuccess: () => CopyRowsAsync(i + 1, totalRows, cts, source), connectionToDoom: _connection.GetOpenTdsConnection());
CheckAndRaiseNotification(); // Check for notification now as the current row copy is done at this moment.

Task readTask = ReadFromRowSourceAsync(cts);
if (readTask == null)
{
CopyRowsAsync(i + 1, totalRows, cts, source);
}
else
{
AsyncHelper.ContinueTask(readTask, source, onSuccess: () => CopyRowsAsync(i + 1, totalRows, cts, source));
}
}
}, connectionToDoom: _connection.GetOpenTdsConnection());
);
return resultTask;
}
}
Expand Down Expand Up @@ -2406,15 +2392,17 @@ private Task CopyBatchesAsync(BulkCopySimpleResultSet internalResults, string up
source = new TaskCompletionSource<object>();
}

AsyncHelper.ContinueTask(commandTask, source, () =>
{
Task continuedTask = CopyBatchesAsyncContinued(internalResults, updateBulkCommandText, cts, source);
if (continuedTask == null)
AsyncHelper.ContinueTask(commandTask, source,
() =>
{
// Continuation finished sync, recall into CopyBatchesAsync to continue
CopyBatchesAsync(internalResults, updateBulkCommandText, cts, source);
Task continuedTask = CopyBatchesAsyncContinued(internalResults, updateBulkCommandText, cts, source);
if (continuedTask == null)
{
// Continuation finished sync, recall into CopyBatchesAsync to continue
CopyBatchesAsync(internalResults, updateBulkCommandText, cts, source);
}
}
}, _connection.GetOpenTdsConnection());
);
return source.Task;
}
}
Expand Down Expand Up @@ -2462,15 +2450,19 @@ private Task CopyBatchesAsyncContinued(BulkCopySimpleResultSet internalResults,
{ // First time only
source = new TaskCompletionSource<object>();
}
AsyncHelper.ContinueTask(task, source, () =>
{
Task continuedTask = CopyBatchesAsyncContinuedOnSuccess(internalResults, updateBulkCommandText, cts, source);
if (continuedTask == null)
AsyncHelper.ContinueTask(task, source,
onSuccess: () =>
{
// Continuation finished sync, recall into CopyBatchesAsync to continue
CopyBatchesAsync(internalResults, updateBulkCommandText, cts, source);
}
}, _connection.GetOpenTdsConnection(), _ => CopyBatchesAsyncContinuedOnError(cleanupParser: false), () => CopyBatchesAsyncContinuedOnError(cleanupParser: true));
Task continuedTask = CopyBatchesAsyncContinuedOnSuccess(internalResults, updateBulkCommandText, cts, source);
if (continuedTask == null)
{
// Continuation finished sync, recall into CopyBatchesAsync to continue
CopyBatchesAsync(internalResults, updateBulkCommandText, cts, source);
}
},
onFailure: (_) => CopyBatchesAsyncContinuedOnError(cleanupParser: false),
onCancellation: () => CopyBatchesAsyncContinuedOnError(cleanupParser: true)
);

return source.Task;
}
Expand Down Expand Up @@ -2517,22 +2509,25 @@ private Task CopyBatchesAsyncContinuedOnSuccess(BulkCopySimpleResultSet internal
source = new TaskCompletionSource<object>();
}

AsyncHelper.ContinueTask(writeTask, source, () =>
{
try
{
RunParser();
CommitTransaction();
}
catch (Exception)
AsyncHelper.ContinueTask(writeTask, source,
onSuccess: () =>
{
CopyBatchesAsyncContinuedOnError(cleanupParser: false);
throw;
}
try
{
RunParser();
CommitTransaction();
}
catch (Exception)
{
CopyBatchesAsyncContinuedOnError(cleanupParser: false);
throw;
}

// Always call back into CopyBatchesAsync
CopyBatchesAsync(internalResults, updateBulkCommandText, cts, source);
}, connectionToDoom: _connection.GetOpenTdsConnection(), onFailure: _ => CopyBatchesAsyncContinuedOnError(cleanupParser: false));
// Always call back into CopyBatchesAsync
CopyBatchesAsync(internalResults, updateBulkCommandText, cts, source);
},
onFailure: (_) => CopyBatchesAsyncContinuedOnError(cleanupParser: false)
);
return source.Task;
}
}
Expand Down Expand Up @@ -2651,48 +2646,50 @@ private void WriteToServerInternalRestContinuedAsync(BulkCopySimpleResultSet int
{
source = new TaskCompletionSource<object>();
}
AsyncHelper.ContinueTask(task, source, () =>
{
// Bulk copy task is completed at this moment.
if (task.IsCanceled)
AsyncHelper.ContinueTask(task, source,
() =>
{
_localColumnMappings = null;
try
// Bulk copy task is completed at this moment.
if (task.IsCanceled)
{
CleanUpStateObjectOnError();
_localColumnMappings = null;
try
{
CleanUpStateObjectOnError();
}
finally
{
source.SetCanceled();
}
}
finally
else if (task.Exception != null)
{
source.SetCanceled();
source.SetException(task.Exception.InnerException);
}
}
else if (task.Exception != null)
{
source.SetException(task.Exception.InnerException);
}
else
{
_localColumnMappings = null;
try
{
CleanUpStateObjectOnError();
}
finally
else
{
if (source != null)
_localColumnMappings = null;
try
{
if (cts.IsCancellationRequested)
{ // We may get cancellation req even after the entire copy.
source.SetCanceled();
}
else
CleanUpStateObjectOnError();
}
finally
{
if (source != null)
{
source.SetResult(null);
if (cts.IsCancellationRequested)
{ // We may get cancellation req even after the entire copy.
source.SetCanceled();
}
else
{
source.SetResult(null);
}
}
}
}
}
}, _connection.GetOpenTdsConnection());
);
return;
}
else
Expand Down Expand Up @@ -2782,12 +2779,15 @@ private void WriteToServerInternalRestAsync(CancellationToken cts, TaskCompletio
{
regReconnectCancel = cts.Register(s => ((TaskCompletionSource<object>)s).TrySetCanceled(), cancellableReconnectTS);
}
AsyncHelper.ContinueTask(reconnectTask, cancellableReconnectTS, () => { cancellableReconnectTS.SetResult(null); });
AsyncHelper.ContinueTaskWithState(reconnectTask, cancellableReconnectTS,
state: cancellableReconnectTS,
onSuccess: (state) => { ((TaskCompletionSource<object>)state).SetResult(null); }
);
// No need to cancel timer since SqlBulkCopy creates specific task source for reconnection.
AsyncHelper.SetTimeoutException(cancellableReconnectTS, BulkCopyTimeout,
() => { return SQL.BulkLoadInvalidDestinationTable(_destinationTableName, SQL.CR_ReconnectTimeout()); }, CancellationToken.None);
AsyncHelper.ContinueTask(cancellableReconnectTS.Task, source,
() =>
onSuccess: () =>
{
regReconnectCancel.Dispose();
if (_parserLock != null)
Expand All @@ -2799,7 +2799,6 @@ private void WriteToServerInternalRestAsync(CancellationToken cts, TaskCompletio
_parserLock.Wait(canReleaseFromAnyThread: true);
WriteToServerInternalRestAsync(cts, source);
},
connectionToAbort: _connection,
onFailure: (e) => { regReconnectCancel.Dispose(); },
onCancellation: () => { regReconnectCancel.Dispose(); },
exceptionConverter: (ex) => SQL.BulkLoadInvalidDestinationTable(_destinationTableName, ex));
Expand Down Expand Up @@ -2850,7 +2849,7 @@ private void WriteToServerInternalRestAsync(CancellationToken cts, TaskCompletio

if (internalResultsTask != null)
{
AsyncHelper.ContinueTask(internalResultsTask, source, () => WriteToServerInternalRestContinuedAsync(internalResultsTask.Result, cts, source), _connection.GetOpenTdsConnection());
AsyncHelper.ContinueTask(internalResultsTask, source, () => WriteToServerInternalRestContinuedAsync(internalResultsTask.Result, cts, source));
Wraith2 marked this conversation as resolved.
Show resolved Hide resolved
}
else
{
Expand Down Expand Up @@ -2921,17 +2920,19 @@ private Task WriteToServerInternalAsync(CancellationToken ctoken)
else
{
Debug.Assert(_isAsyncBulkCopy, "Read must not return a Task in the Sync mode");
AsyncHelper.ContinueTask(readTask, source, () =>
{
if (!_hasMoreRowToCopy)
{
source.SetResult(null); // No rows to copy!
}
else
AsyncHelper.ContinueTask(readTask, source,
() =>
{
WriteToServerInternalRestAsync(ctoken, source); // Passing the same completion which will be completed by the Callee.
if (!_hasMoreRowToCopy)
{
source.SetResult(null); // No rows to copy!
}
else
{
WriteToServerInternalRestAsync(ctoken, source); // Passing the same completion which will be completed by the Callee.
}
}
}, _connection.GetOpenTdsConnection());
);
return resultTask;
}
}
Expand Down