Skip to content

Commit

Permalink
Use leaveOpen: false on StreamPipeReaderOptions (#6379)
Browse files Browse the repository at this point in the history
  • Loading branch information
benaadams committed Dec 15, 2023
1 parent b8b0a8e commit 355e3fe
Showing 1 changed file with 124 additions and 132 deletions.
256 changes: 124 additions & 132 deletions src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
using System.IO.Abstractions;
using System.IO.Pipelines;
using System.Linq;
using System.Numerics;
using System.Text.Json;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
Expand Down Expand Up @@ -117,169 +116,160 @@ private JsonRpcRequest DeserializeObject(JsonElement element)

public async IAsyncEnumerable<JsonRpcResult> ProcessAsync(PipeReader reader, JsonRpcContext context)
{
Stream? stream = null;
(reader, stream) = await RecordRequest(reader);
reader = await RecordRequest(reader);
Stopwatch stopwatch = Stopwatch.StartNew();
// Initializes a buffer to store the data read from the reader.
ReadOnlySequence<byte> buffer = default;
try
{
// Initializes a buffer to store the data read from the reader.
ReadOnlySequence<byte> buffer = default;
try
// Continuously read data from the PipeReader in a loop.
// Can read multiple requests, ends when there is no more requests to read or there is an error in deserialization.
while (true)
{
// Continuously read data from the PipeReader in a loop.
// Can read multiple requests, ends when there is no more requests to read or there is an error in deserialization.
while (true)
// Asynchronously reads data from the PipeReader.
ReadResult readResult = await reader.ReadAsync();
buffer = readResult.Buffer;
// Placeholder for a result in case of deserialization failure.
JsonRpcResult? deserializationFailureResult = null;

// Processes the buffer while it's not empty; before going out to outer loop to get more data.
while (!buffer.IsEmpty)
{
// Asynchronously reads data from the PipeReader.
ReadResult readResult = await reader.ReadAsync();
buffer = readResult.Buffer;
// Placeholder for a result in case of deserialization failure.
JsonRpcResult? deserializationFailureResult = null;

// Processes the buffer while it's not empty; before going out to outer loop to get more data.
while (!buffer.IsEmpty)
JsonDocument? jsonDocument = null;
JsonRpcRequest? model = null;
ArrayPoolList<JsonRpcRequest>? collection = null;
try
{
JsonDocument? jsonDocument = null;
JsonRpcRequest? model = null;
ArrayPoolList<JsonRpcRequest>? collection = null;
try
// Tries to parse the JSON from the buffer.
if (!TryParseJson(ref buffer, out jsonDocument))
{
// Tries to parse the JSON from the buffer.
if (!TryParseJson(ref buffer, out jsonDocument))
{
// More data needs to be read to complete a document
break;
}

// Deserializes the JSON document into a request object or a collection of requests.
(model, collection) = DeserializeObjectOrArray(jsonDocument);
}
catch (BadHttpRequestException e)
{
// Increments failure metric and logs the exception, then stops processing.
Metrics.JsonRpcRequestDeserializationFailures++;
if (_logger.IsDebug) _logger.Debug($"Couldn't read request.{Environment.NewLine}{e}");
yield break;
}
catch (Exception ex)
{
// Handles general exceptions during parsing and validation.
// Sends an error response and stops the stopwatch.
Metrics.JsonRpcRequestDeserializationFailures++;
if (_logger.IsError) _logger.Error($"Error during parsing/validation.", ex);
JsonRpcErrorResponse response = _jsonRpcService.GetErrorResponse(ErrorCodes.ParseError, "Incorrect message");
TraceResult(response);
stopwatch.Stop();
deserializationFailureResult = JsonRpcResult.Single(
RecordResponse(response, new RpcReport("# parsing error #", stopwatch.ElapsedMicroseconds(), false)));
}

// Checks for deserialization failure and yields the result.
if (deserializationFailureResult.HasValue)
{
yield return deserializationFailureResult.Value;
// More data needs to be read to complete a document
break;
}
else

// Deserializes the JSON document into a request object or a collection of requests.
(model, collection) = DeserializeObjectOrArray(jsonDocument);
}
catch (BadHttpRequestException e)
{
// Increments failure metric and logs the exception, then stops processing.
Metrics.JsonRpcRequestDeserializationFailures++;
if (_logger.IsDebug) _logger.Debug($"Couldn't read request.{Environment.NewLine}{e}");
yield break;
}
catch (Exception ex)
{
// Handles general exceptions during parsing and validation.
// Sends an error response and stops the stopwatch.
Metrics.JsonRpcRequestDeserializationFailures++;
if (_logger.IsError) _logger.Error($"Error during parsing/validation.", ex);
JsonRpcErrorResponse response = _jsonRpcService.GetErrorResponse(ErrorCodes.ParseError, "Incorrect message");
TraceResult(response);
stopwatch.Stop();
deserializationFailureResult = JsonRpcResult.Single(
RecordResponse(response, new RpcReport("# parsing error #", stopwatch.ElapsedMicroseconds(), false)));
}

// Checks for deserialization failure and yields the result.
if (deserializationFailureResult.HasValue)
{
yield return deserializationFailureResult.Value;
break;
}
else
{
// Handles a single JSON RPC request.
if (model is not null)
{
// Handles a single JSON RPC request.
if (model is not null)
{
if (_logger.IsDebug) _logger.Debug($"JSON RPC request {model}");
if (_logger.IsDebug) _logger.Debug($"JSON RPC request {model}");

// Processes the individual request.
JsonRpcResult.Entry result = await HandleSingleRequest(model, context);
result.Response.AddDisposable(() => jsonDocument.Dispose());
// Processes the individual request.
JsonRpcResult.Entry result = await HandleSingleRequest(model, context);
result.Response.AddDisposable(() => jsonDocument.Dispose());

// Returns the result of the processed request.
yield return JsonRpcResult.Single(RecordResponse(result));
}
// Returns the result of the processed request.
yield return JsonRpcResult.Single(RecordResponse(result));
}

// Processes a collection of JSON RPC requests.
if (collection is not null)
{
if (_logger.IsDebug) _logger.Debug($"{collection.Count} JSON RPC requests");

// Checks for authentication and batch size limit.
if (!context.IsAuthenticated && collection.Count > _jsonRpcConfig.MaxBatchSize)
{
if (_logger.IsWarn) _logger.Warn($"The batch size limit was exceeded. The requested batch size {collection.Count}, and the current config setting is JsonRpc.{nameof(_jsonRpcConfig.MaxBatchSize)} = {_jsonRpcConfig.MaxBatchSize}.");
JsonRpcErrorResponse? response = _jsonRpcService.GetErrorResponse(ErrorCodes.LimitExceeded, "Batch size limit exceeded");
response.AddDisposable(() => jsonDocument.Dispose());

deserializationFailureResult = JsonRpcResult.Single(RecordResponse(response, RpcReport.Error));
yield return deserializationFailureResult.Value;
break;
}

// Stops the stopwatch and yields the batch processing result.
stopwatch.Stop();
yield return JsonRpcResult.Collection(new JsonRpcBatchResult((e, c) => IterateRequest(collection, context, e).GetAsyncEnumerator(c)));
}
// Processes a collection of JSON RPC requests.
if (collection is not null)
{
if (_logger.IsDebug) _logger.Debug($"{collection.Count} JSON RPC requests");

// Handles invalid requests.
if (model is null && collection is null)
// Checks for authentication and batch size limit.
if (!context.IsAuthenticated && collection.Count > _jsonRpcConfig.MaxBatchSize)
{
Metrics.JsonRpcInvalidRequests++;
JsonRpcErrorResponse errorResponse = _jsonRpcService.GetErrorResponse(ErrorCodes.InvalidRequest, "Invalid request");
errorResponse.AddDisposable(() => jsonDocument.Dispose());

TraceResult(errorResponse);
stopwatch.Stop();
if (_logger.IsDebug) _logger.Debug($" Failed request handled in {stopwatch.Elapsed.TotalMilliseconds}ms");
deserializationFailureResult = JsonRpcResult.Single(RecordResponse(errorResponse, new RpcReport("# parsing error #", stopwatch.ElapsedMicroseconds(), false)));
if (_logger.IsWarn) _logger.Warn($"The batch size limit was exceeded. The requested batch size {collection.Count}, and the current config setting is JsonRpc.{nameof(_jsonRpcConfig.MaxBatchSize)} = {_jsonRpcConfig.MaxBatchSize}.");
JsonRpcErrorResponse? response = _jsonRpcService.GetErrorResponse(ErrorCodes.LimitExceeded, "Batch size limit exceeded");
response.AddDisposable(() => jsonDocument.Dispose());

deserializationFailureResult = JsonRpcResult.Single(RecordResponse(response, RpcReport.Error));
yield return deserializationFailureResult.Value;
break;
}
}
}

// Checks if the deserialization failed
if (deserializationFailureResult.HasValue)
{
break;
}
// Stops the stopwatch and yields the batch processing result.
stopwatch.Stop();
yield return JsonRpcResult.Collection(new JsonRpcBatchResult((e, c) => IterateRequest(collection, context, e).GetAsyncEnumerator(c)));
}

// Checks if the read operation is completed.
if (readResult.IsCompleted)
{
if (buffer.Length > 0 && (buffer.IsSingleSegment ? buffer.FirstSpan : buffer.ToArray()).IndexOfAnyExcept(WhiteSpace()) >= 0)
// Handles invalid requests.
if (model is null && collection is null)
{
Metrics.JsonRpcRequestDeserializationFailures++;
if (_logger.IsError) _logger.Error($"Error during parsing/validation. Incomplete request");
JsonRpcErrorResponse response = _jsonRpcService.GetErrorResponse(ErrorCodes.ParseError, "Incorrect message");
TraceResult(response);
Metrics.JsonRpcInvalidRequests++;
JsonRpcErrorResponse errorResponse = _jsonRpcService.GetErrorResponse(ErrorCodes.InvalidRequest, "Invalid request");
errorResponse.AddDisposable(() => jsonDocument.Dispose());

TraceResult(errorResponse);
stopwatch.Stop();
deserializationFailureResult = JsonRpcResult.Single(
RecordResponse(response, new RpcReport("# parsing error #", stopwatch.ElapsedMicroseconds(), false)));
if (_logger.IsDebug) _logger.Debug($" Failed request handled in {stopwatch.Elapsed.TotalMilliseconds}ms");
deserializationFailureResult = JsonRpcResult.Single(RecordResponse(errorResponse, new RpcReport("# parsing error #", stopwatch.ElapsedMicroseconds(), false)));
yield return deserializationFailureResult.Value;
break;
}

break;
}
}

// Advances the reader to the next segment of the buffer.
reader.AdvanceTo(buffer.Start, buffer.End);
buffer = default;
// Checks if the deserialization failed
if (deserializationFailureResult.HasValue)
{
break;
}
}
finally
{
// Advances the reader to the end of the buffer if not null.
if (!buffer.FirstSpan.IsNull())

// Checks if the read operation is completed.
if (readResult.IsCompleted)
{
reader.AdvanceTo(buffer.End);
if (buffer.Length > 0 && (buffer.IsSingleSegment ? buffer.FirstSpan : buffer.ToArray()).IndexOfAnyExcept(WhiteSpace()) >= 0)
{
Metrics.JsonRpcRequestDeserializationFailures++;
if (_logger.IsError) _logger.Error($"Error during parsing/validation. Incomplete request");
JsonRpcErrorResponse response = _jsonRpcService.GetErrorResponse(ErrorCodes.ParseError, "Incorrect message");
TraceResult(response);
stopwatch.Stop();
deserializationFailureResult = JsonRpcResult.Single(
RecordResponse(response, new RpcReport("# parsing error #", stopwatch.ElapsedMicroseconds(), false)));
yield return deserializationFailureResult.Value;
}

break;
}

// Advances the reader to the next segment of the buffer.
reader.AdvanceTo(buffer.Start, buffer.End);
buffer = default;
}

// Completes the PipeReader's asynchronous reading operation.
await reader.CompleteAsync();
}
finally
{
stream?.Dispose();
// Advances the reader to the end of the buffer if not null.
if (!buffer.FirstSpan.IsNull())
{
reader.AdvanceTo(buffer.End);
}
}

// Completes the PipeReader's asynchronous reading operation.
await reader.CompleteAsync();
}

private static ReadOnlySpan<byte> WhiteSpace() => " \n\r\t"u8;
Expand Down Expand Up @@ -374,7 +364,9 @@ private JsonRpcResult.Entry RecordResponse(JsonRpcResult.Entry result)
return result;
}

private async ValueTask<(PipeReader, Stream?)> RecordRequest(PipeReader reader)
private static readonly StreamPipeReaderOptions _pipeReaderOptions = new StreamPipeReaderOptions(leaveOpen: false);

private async ValueTask<PipeReader> RecordRequest(PipeReader reader)
{
if ((_jsonRpcConfig.RpcRecorderState & RpcRecorderState.Request) != 0)
{
Expand All @@ -388,10 +380,10 @@ private async ValueTask<(PipeReader, Stream?)> RecordRequest(PipeReader reader)
_recorder.RecordRequest(requestString);

memoryStream.Seek(0, SeekOrigin.Begin);
return (PipeReader.Create(memoryStream), memoryStream);
return PipeReader.Create(memoryStream, _pipeReaderOptions);
}

return (reader, null);
return reader;
}

private void TraceResult(JsonRpcResult.Entry response)
Expand Down

0 comments on commit 355e3fe

Please sign in to comment.