Skip to content

Commit

Permalink
Port dotnet#603 changes
Browse files Browse the repository at this point in the history
  • Loading branch information
cheenamalhotra committed May 25, 2021
1 parent d5b7a5b commit c0f261d
Showing 1 changed file with 80 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2171,45 +2171,45 @@ internal bool TryGetBytesInternalSequential(int i, byte[] buffer, int index, int
{
tdsReliabilitySection.Start();
#endif //DEBUG
if ((_sharedState._columnDataBytesRemaining == 0) || (length == 0))
{
// No data left or nothing requested, return 0
bytesRead = 0;
return true;
}
else
if ((_sharedState._columnDataBytesRemaining == 0) || (length == 0))
{
// No data left or nothing requested, return 0
bytesRead = 0;
return true;
}
else
{
// if plp columns, do partial reads. Don't read the entire value in one shot.
if (_metaData[i].metaType.IsPlp)
{
// if plp columns, do partial reads. Don't read the entire value in one shot.
if (_metaData[i].metaType.IsPlp)
// Read in data
bool result = _stateObj.TryReadPlpBytes(ref buffer, index, length, out bytesRead);
_columnDataBytesRead += bytesRead;
if (!result)
{
// Read in data
bool result = _stateObj.TryReadPlpBytes(ref buffer, index, length, out bytesRead);
_columnDataBytesRead += bytesRead;
if (!result)
{
return false;
}

// Query for number of bytes left
ulong left;
if (!_parser.TryPlpBytesLeft(_stateObj, out left))
{
_sharedState._columnDataBytesRemaining = -1;
return false;
}
_sharedState._columnDataBytesRemaining = (long)left;
return true;
return false;
}
else

// Query for number of bytes left
ulong left;
if (!_parser.TryPlpBytesLeft(_stateObj, out left))
{
// Read data (not exceeding the total amount of data available)
int bytesToRead = (int)Math.Min((long)length, _sharedState._columnDataBytesRemaining);
bool result = _stateObj.TryReadByteArray(buffer, index, bytesToRead, out bytesRead);
_columnDataBytesRead += bytesRead;
_sharedState._columnDataBytesRemaining -= bytesRead;
return result;
_sharedState._columnDataBytesRemaining = -1;
return false;
}
_sharedState._columnDataBytesRemaining = (long)left;
return true;
}
else
{
// Read data (not exceeding the total amount of data available)
int bytesToRead = (int)Math.Min((long)length, _sharedState._columnDataBytesRemaining);
bool result = _stateObj.TryReadByteArray(buffer, index, bytesToRead, out bytesRead);
_columnDataBytesRead += bytesRead;
_sharedState._columnDataBytesRemaining -= bytesRead;
return result;
}
}
#if DEBUG
}
finally
Expand Down Expand Up @@ -4156,7 +4156,7 @@ private bool TryReadColumnHeader(int i)
{
tdsReliabilitySection.Start();
#endif //DEBUG
return TryReadColumnInternal(i, readHeaderOnly: true);
return TryReadColumnInternal(i, readHeaderOnly: true);
#if DEBUG
}
finally
Expand Down Expand Up @@ -5074,14 +5074,25 @@ private Task<int> GetBytesAsyncReadDataStage(GetBytesAsyncCallContext context, b
SetTimeout(_defaultTimeoutMilliseconds);

// Try to read without any continuations (all the data may already be in the stateObj's buffer)
if (!TryGetBytesInternalSequential(context.columnIndex, context.buffer, context.index, context.length, out bytesRead))
bool filledBuffer = context._reader.TryGetBytesInternalSequential(
context.columnIndex,
context.buffer,
context.index + context.totalBytesRead,
context.length - context.totalBytesRead,
out bytesRead
);
context.totalBytesRead += bytesRead;
Debug.Assert(context.totalBytesRead <= context.length, "Read more bytes than required");

if (!filledBuffer)
{
// This will be the 'state' for the callback
int totalBytesRead = bytesRead;

if (!isContinuation)
{
// This is the first async operation which is happening - setup the _currentTask and timeout
Debug.Assert(context._source == null, "context._source should not be non-null when trying to change to async");
source = new TaskCompletionSource<int>();
Task original = Interlocked.CompareExchange(ref _currentTask, source.Task, null);
if (original != null)
Expand All @@ -5090,6 +5101,7 @@ private Task<int> GetBytesAsyncReadDataStage(GetBytesAsyncCallContext context, b
return source.Task;
}

context._source = source;
// Check if cancellation due to close is requested (this needs to be done after setting _currentTask)
if (_cancelAsyncOnCloseToken.IsCancellationRequested)
{
Expand Down Expand Up @@ -5118,7 +5130,7 @@ private Task<int> GetBytesAsyncReadDataStage(GetBytesAsyncCallContext context, b
}
else
{
Debug.Assert(context._source != null, "context.source shuld not be null when continuing");
Debug.Assert(context._source != null, "context._source shuld not be null when continuing");
// setup for cleanup\completing
retryTask.ContinueWith(
continuationAction: AAsyncCallContext<int>.s_completeCallback,
Expand Down Expand Up @@ -5177,42 +5189,42 @@ public override Task<bool> ReadAsync(CancellationToken cancellationToken)
{
_stateObj._shouldHaveEnoughData = true;
#endif
if (_sharedState._dataReady)
{
// Clean off current row
CleanPartialReadReliable();
}
if (_sharedState._dataReady)
{
// Clean off current row
CleanPartialReadReliable();
}

// If there a ROW token ready (as well as any metadata for the row)
if (_stateObj.IsRowTokenReady())
{
// Read the ROW token
bool result = TryReadInternal(true, out more);
Debug.Assert(result, "Should not have run out of data");
// If there a ROW token ready (as well as any metadata for the row)
if (_stateObj.IsRowTokenReady())
{
// Read the ROW token
bool result = TryReadInternal(true, out more);
Debug.Assert(result, "Should not have run out of data");

rowTokenRead = true;
if (more)
rowTokenRead = true;
if (more)
{
// Sequential mode, nothing left to do
if (IsCommandBehavior(CommandBehavior.SequentialAccess))
{
// Sequential mode, nothing left to do
if (IsCommandBehavior(CommandBehavior.SequentialAccess))
{
return ADP.TrueTask;
}
// For non-sequential, check if we can read the row data now
else if (WillHaveEnoughData(_metaData.Length - 1))
{
// Read row data
result = TryReadColumn(_metaData.Length - 1, setTimeout: true);
Debug.Assert(result, "Should not have run out of data");
return ADP.TrueTask;
}
return ADP.TrueTask;
}
else
// For non-sequential, check if we can read the row data now
else if (WillHaveEnoughData(_metaData.Length - 1))
{
// No data left, return
return ADP.FalseTask;
// Read row data
result = TryReadColumn(_metaData.Length - 1, setTimeout: true);
Debug.Assert(result, "Should not have run out of data");
return ADP.TrueTask;
}
}
else
{
// No data left, return
return ADP.FalseTask;
}
}
#if DEBUG
}
finally
Expand Down Expand Up @@ -5373,8 +5385,8 @@ override public Task<bool> IsDBNullAsync(int i, CancellationToken cancellationTo
{
_stateObj._shouldHaveEnoughData = true;
#endif
ReadColumnHeader(i);
return _data[i].IsNull ? ADP.TrueTask : ADP.FalseTask;
ReadColumnHeader(i);
return _data[i].IsNull ? ADP.TrueTask : ADP.FalseTask;
#if DEBUG
}
finally
Expand Down Expand Up @@ -5510,7 +5522,7 @@ override public Task<T> GetFieldValueAsync<T>(int i, CancellationToken cancellat
{
_stateObj._shouldHaveEnoughData = true;
#endif
return Task.FromResult(GetFieldValueInternal<T>(i));
return Task.FromResult(GetFieldValueInternal<T>(i));
#if DEBUG
}
finally
Expand Down

0 comments on commit c0f261d

Please sign in to comment.