Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 47 additions & 51 deletions src/curl.jl
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,34 @@ function set_connect_timeout(easy::Curl.Easy, timeout::Real)
end
end

# Prevent reuse of this handle
# Should be called if an error is detected and/or the server is likely to close connection
forbid_reuse(easy::Curl.Easy) = Curl.setopt(easy, CURLOPT_FORBID_REUSE, Clong(1))

function get_grpc_status(easy::Curl.Easy)
grpc_status = StatusCode.OK.code
grpc_message = ""

# parse the grpc headers
@debug("response headers", easy.res_hdrs)
for hdr in easy.res_hdrs
if startswith(hdr, "grpc-status")
grpc_status = parse(Int, strip(last(split(hdr, ':'; limit=2))))
elseif startswith(hdr, "grpc-message")
grpc_message = string(strip(last(split(hdr, ':'; limit=2))))
end
end

if (easy.code == CURLE_OPERATION_TIMEDOUT) && (grpc_status == StatusCode.OK.code)
grpc_status = StatusCode.DEADLINE_EXCEEDED.code
end
if (grpc_status != StatusCode.OK.code) && isempty(grpc_message)
grpc_message = grpc_status_message(grpc_status)
end

return grpc_status, grpc_message
end

function grpc_request(downloader::Downloader, url::String, input::Channel{T1}, output::Channel{T2};
maxage::Clong = typemax(Clong),
keepalive::Clong = 60,
Expand Down Expand Up @@ -209,62 +237,30 @@ function grpc_request(downloader::Downloader, url::String, input::Channel{T1}, o
nothing
end

# do send recv data
if VERSION < v"1.5"
cleaned_up = false
exception = nothing
cleanup_once = (ex)->begin
if !cleaned_up
cleaned_up = true
exception = ex
cleanup()
end
end

@sync begin
@async try
recv_data(easy, output, max_recv_message_length)
catch ex
cleanup_once(ex)
end
@async try
send_data(easy, input, max_send_message_length)
catch ex
cleanup_once(ex)
end
end
exception = nothing
grpc_status = StatusCode.OK.code
grpc_message = ""

if exception !== nothing
throw(exception)
# do send recv data
try
Base.Experimental.@sync begin
@async recv_data(easy, output, max_recv_message_length)
@async send_data(easy, input, max_send_message_length)
end
else
try
Base.Experimental.@sync begin
@async recv_data(easy, output, max_recv_message_length)
@async send_data(easy, input, max_send_message_length)
end
finally # ensure handle is removed
cleanup()
grpc_status, grpc_message = get_grpc_status(easy)
if ((easy.code != CURLE_OK) || (grpc_status != StatusCode.OK.code))
forbid_reuse(easy)
end
catch ex
forbid_reuse(easy)
exception = ex
finally # ensure handle is removed
cleanup()
end

@debug("response headers", easy.res_hdrs)

# parse the grpc headers
grpc_status = StatusCode.OK.code
grpc_message = ""
for hdr in easy.res_hdrs
if startswith(hdr, "grpc-status")
grpc_status = parse(Int, strip(last(split(hdr, ':'; limit=2))))
elseif startswith(hdr, "grpc-message")
grpc_message = string(strip(last(split(hdr, ':'; limit=2))))
end
end
if (easy.code == CURLE_OPERATION_TIMEDOUT) && (grpc_status == StatusCode.OK.code)
grpc_status = StatusCode.DEADLINE_EXCEEDED.code
end
if (grpc_status != StatusCode.OK.code) && isempty(grpc_message)
grpc_message = grpc_status_message(grpc_status)
# throw the unwrapped exception if there was one
if exception !== nothing
throw(exception)
end

if ((easy.code == CURLE_OK) && (grpc_status == StatusCode.OK.code))
Expand Down
6 changes: 3 additions & 3 deletions src/grpc.jl
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ end
gRPCStatus(success::Bool, grpc_status::Int, message::AbstractString) = gRPCStatus(success, grpc_status, string(message), nothing)
function gRPCStatus(status_future)
try
fetch(status_future)
return fetch(status_future)
catch ex
task_exception = isa(ex, TaskFailedException) ? ex.task.exception : ex
while isa(task_exception, TaskFailedException)
task_exception = task_exception.task.exception
end
gRPCStatus(false, StatusCode.INTERNAL.code, string(task_exception), task_exception)
return gRPCStatus(false, StatusCode.INTERNAL.code, string(task_exception), task_exception)
end
end

Expand Down Expand Up @@ -181,7 +181,7 @@ end
function call_method(channel::gRPCChannel, service::ServiceDescriptor, method::MethodDescriptor, controller::gRPCController, input::Channel{T1}, ::Type{T2}) where {T1 <: ProtoType, T2 <: ProtoType}
outchannel, status_future = call_method(channel, service, method, controller, input, Channel{T2}())
try
take!(outchannel), status_future
return (take!(outchannel), status_future)
catch ex
gRPCCheck(status_future) # check for core issue
if isa(ex, InvalidStateException)
Expand Down