From f3eb716a6dc08849c9ed92175114f5ed023effda Mon Sep 17 00:00:00 2001 From: tan Date: Mon, 18 Jul 2022 19:55:51 +0530 Subject: [PATCH] prevent connection reuse on errors If the server returns an error or is likely to close connection, it is best to not reuse the connection for future client interactions. With this we now set the `CURLOPT_FORBID_REUSE` flag on the easy handle when we detect such a condition. --- src/curl.jl | 98 +++++++++++++++++++++++++---------------------------- src/grpc.jl | 6 ++-- 2 files changed, 50 insertions(+), 54 deletions(-) diff --git a/src/curl.jl b/src/curl.jl index 9076df1..5cb2412 100644 --- a/src/curl.jl +++ b/src/curl.jl @@ -172,6 +172,34 @@ function set_connect_timeout(easy::Curl.Easy, timeout::Real) end end +# Prevent reuse of this handle +# Should be called if an error is detected and/or the server is likely to close connection +forbid_reuse(easy::Curl.Easy) = Curl.setopt(easy, CURLOPT_FORBID_REUSE, Clong(1)) + +function get_grpc_status(easy::Curl.Easy) + grpc_status = StatusCode.OK.code + grpc_message = "" + + # parse the grpc headers + @debug("response headers", easy.res_hdrs) + for hdr in easy.res_hdrs + if startswith(hdr, "grpc-status") + grpc_status = parse(Int, strip(last(split(hdr, ':'; limit=2)))) + elseif startswith(hdr, "grpc-message") + grpc_message = string(strip(last(split(hdr, ':'; limit=2)))) + end + end + + if (easy.code == CURLE_OPERATION_TIMEDOUT) && (grpc_status == StatusCode.OK.code) + grpc_status = StatusCode.DEADLINE_EXCEEDED.code + end + if (grpc_status != StatusCode.OK.code) && isempty(grpc_message) + grpc_message = grpc_status_message(grpc_status) + end + + return grpc_status, grpc_message +end + function grpc_request(downloader::Downloader, url::String, input::Channel{T1}, output::Channel{T2}; maxage::Clong = typemax(Clong), keepalive::Clong = 60, @@ -209,62 +237,30 @@ function grpc_request(downloader::Downloader, url::String, input::Channel{T1}, o nothing end - # do send recv data - if VERSION < v"1.5" - cleaned_up = false - exception = nothing - cleanup_once = (ex)->begin - if !cleaned_up - cleaned_up = true - exception = ex - cleanup() - end - end - - @sync begin - @async try - recv_data(easy, output, max_recv_message_length) - catch ex - cleanup_once(ex) - end - @async try - send_data(easy, input, max_send_message_length) - catch ex - cleanup_once(ex) - end - end + exception = nothing + grpc_status = StatusCode.OK.code + grpc_message = "" - if exception !== nothing - throw(exception) + # do send recv data + try + Base.Experimental.@sync begin + @async recv_data(easy, output, max_recv_message_length) + @async send_data(easy, input, max_send_message_length) end - else - try - Base.Experimental.@sync begin - @async recv_data(easy, output, max_recv_message_length) - @async send_data(easy, input, max_send_message_length) - end - finally # ensure handle is removed - cleanup() + grpc_status, grpc_message = get_grpc_status(easy) + if ((easy.code != CURLE_OK) || (grpc_status != StatusCode.OK.code)) + forbid_reuse(easy) end + catch ex + forbid_reuse(easy) + exception = ex + finally # ensure handle is removed + cleanup() end - @debug("response headers", easy.res_hdrs) - - # parse the grpc headers - grpc_status = StatusCode.OK.code - grpc_message = "" - for hdr in easy.res_hdrs - if startswith(hdr, "grpc-status") - grpc_status = parse(Int, strip(last(split(hdr, ':'; limit=2)))) - elseif startswith(hdr, "grpc-message") - grpc_message = string(strip(last(split(hdr, ':'; limit=2)))) - end - end - if (easy.code == CURLE_OPERATION_TIMEDOUT) && (grpc_status == StatusCode.OK.code) - grpc_status = StatusCode.DEADLINE_EXCEEDED.code - end - if (grpc_status != StatusCode.OK.code) && isempty(grpc_message) - grpc_message = grpc_status_message(grpc_status) + # throw the unwrapped exception if there was one + if exception !== nothing + throw(exception) end if ((easy.code == CURLE_OK) && (grpc_status == StatusCode.OK.code)) diff --git a/src/grpc.jl b/src/grpc.jl index 98b0c96..20821ae 100644 --- a/src/grpc.jl +++ b/src/grpc.jl @@ -19,13 +19,13 @@ end gRPCStatus(success::Bool, grpc_status::Int, message::AbstractString) = gRPCStatus(success, grpc_status, string(message), nothing) function gRPCStatus(status_future) try - fetch(status_future) + return fetch(status_future) catch ex task_exception = isa(ex, TaskFailedException) ? ex.task.exception : ex while isa(task_exception, TaskFailedException) task_exception = task_exception.task.exception end - gRPCStatus(false, StatusCode.INTERNAL.code, string(task_exception), task_exception) + return gRPCStatus(false, StatusCode.INTERNAL.code, string(task_exception), task_exception) end end @@ -181,7 +181,7 @@ end function call_method(channel::gRPCChannel, service::ServiceDescriptor, method::MethodDescriptor, controller::gRPCController, input::Channel{T1}, ::Type{T2}) where {T1 <: ProtoType, T2 <: ProtoType} outchannel, status_future = call_method(channel, service, method, controller, input, Channel{T2}()) try - take!(outchannel), status_future + return (take!(outchannel), status_future) catch ex gRPCCheck(status_future) # check for core issue if isa(ex, InvalidStateException)