diff --git a/src/curl.jl b/src/curl.jl index 9076df1..5cb2412 100644 --- a/src/curl.jl +++ b/src/curl.jl @@ -172,6 +172,34 @@ function set_connect_timeout(easy::Curl.Easy, timeout::Real) end end +# Prevent reuse of this handle +# Should be called if an error is detected and/or the server is likely to close connection +forbid_reuse(easy::Curl.Easy) = Curl.setopt(easy, CURLOPT_FORBID_REUSE, Clong(1)) + +function get_grpc_status(easy::Curl.Easy) + grpc_status = StatusCode.OK.code + grpc_message = "" + + # parse the grpc headers + @debug("response headers", easy.res_hdrs) + for hdr in easy.res_hdrs + if startswith(hdr, "grpc-status") + grpc_status = parse(Int, strip(last(split(hdr, ':'; limit=2)))) + elseif startswith(hdr, "grpc-message") + grpc_message = string(strip(last(split(hdr, ':'; limit=2)))) + end + end + + if (easy.code == CURLE_OPERATION_TIMEDOUT) && (grpc_status == StatusCode.OK.code) + grpc_status = StatusCode.DEADLINE_EXCEEDED.code + end + if (grpc_status != StatusCode.OK.code) && isempty(grpc_message) + grpc_message = grpc_status_message(grpc_status) + end + + return grpc_status, grpc_message +end + function grpc_request(downloader::Downloader, url::String, input::Channel{T1}, output::Channel{T2}; maxage::Clong = typemax(Clong), keepalive::Clong = 60, @@ -209,62 +237,30 @@ function grpc_request(downloader::Downloader, url::String, input::Channel{T1}, o nothing end - # do send recv data - if VERSION < v"1.5" - cleaned_up = false - exception = nothing - cleanup_once = (ex)->begin - if !cleaned_up - cleaned_up = true - exception = ex - cleanup() - end - end - - @sync begin - @async try - recv_data(easy, output, max_recv_message_length) - catch ex - cleanup_once(ex) - end - @async try - send_data(easy, input, max_send_message_length) - catch ex - cleanup_once(ex) - end - end + exception = nothing + grpc_status = StatusCode.OK.code + grpc_message = "" - if exception !== nothing - throw(exception) + # do send recv data + try + Base.Experimental.@sync begin + @async recv_data(easy, output, max_recv_message_length) + @async send_data(easy, input, max_send_message_length) end - else - try - Base.Experimental.@sync begin - @async recv_data(easy, output, max_recv_message_length) - @async send_data(easy, input, max_send_message_length) - end - finally # ensure handle is removed - cleanup() + grpc_status, grpc_message = get_grpc_status(easy) + if ((easy.code != CURLE_OK) || (grpc_status != StatusCode.OK.code)) + forbid_reuse(easy) end + catch ex + forbid_reuse(easy) + exception = ex + finally # ensure handle is removed + cleanup() end - @debug("response headers", easy.res_hdrs) - - # parse the grpc headers - grpc_status = StatusCode.OK.code - grpc_message = "" - for hdr in easy.res_hdrs - if startswith(hdr, "grpc-status") - grpc_status = parse(Int, strip(last(split(hdr, ':'; limit=2)))) - elseif startswith(hdr, "grpc-message") - grpc_message = string(strip(last(split(hdr, ':'; limit=2)))) - end - end - if (easy.code == CURLE_OPERATION_TIMEDOUT) && (grpc_status == StatusCode.OK.code) - grpc_status = StatusCode.DEADLINE_EXCEEDED.code - end - if (grpc_status != StatusCode.OK.code) && isempty(grpc_message) - grpc_message = grpc_status_message(grpc_status) + # throw the unwrapped exception if there was one + if exception !== nothing + throw(exception) end if ((easy.code == CURLE_OK) && (grpc_status == StatusCode.OK.code)) diff --git a/src/grpc.jl b/src/grpc.jl index 98b0c96..20821ae 100644 --- a/src/grpc.jl +++ b/src/grpc.jl @@ -19,13 +19,13 @@ end gRPCStatus(success::Bool, grpc_status::Int, message::AbstractString) = gRPCStatus(success, grpc_status, string(message), nothing) function gRPCStatus(status_future) try - fetch(status_future) + return fetch(status_future) catch ex task_exception = isa(ex, TaskFailedException) ? ex.task.exception : ex while isa(task_exception, TaskFailedException) task_exception = task_exception.task.exception end - gRPCStatus(false, StatusCode.INTERNAL.code, string(task_exception), task_exception) + return gRPCStatus(false, StatusCode.INTERNAL.code, string(task_exception), task_exception) end end @@ -181,7 +181,7 @@ end function call_method(channel::gRPCChannel, service::ServiceDescriptor, method::MethodDescriptor, controller::gRPCController, input::Channel{T1}, ::Type{T2}) where {T1 <: ProtoType, T2 <: ProtoType} outchannel, status_future = call_method(channel, service, method, controller, input, Channel{T2}()) try - take!(outchannel), status_future + return (take!(outchannel), status_future) catch ex gRPCCheck(status_future) # check for core issue if isa(ex, InvalidStateException)