Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ the server.
`gRPCStatus` represents the status of a request. It has the following fields:

- `success`: whether the request was completed successfully.
- `grpc_status`: the grpc status code returned
- `message`: any error message if request was not successful

### `gRPCCheck`
Expand Down Expand Up @@ -227,6 +228,7 @@ A `gRPMessageTooLargeException` has the following members:
A `gRPCServiceCallException` is thrown if a gRPC request is not successful.
It has the following members:

- `grpc_status`: grpc status code for this request
- `message`: any error message if request was not successful

## Credits
Expand Down
88 changes: 83 additions & 5 deletions src/curl.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,29 @@
const GRPC_STATIC_HEADERS = Ref{Ptr{Nothing}}(C_NULL)

const StatusCode = (
OK = (code=0, message="Success"),
CANCELLED = (code=1, message="The operation was cancelled"),
UNKNOWN = (code=2, message="Unknown error"),
INVALID_ARGUMENT = (code=3, message="Client specified an invalid argument"),
DEADLINE_EXCEEDED = (code=4, message="Deadline expired before the operation could complete"),
NOT_FOUND = (code=5, message="Requested entity was not found"),
ALREADY_EXISTS = (code=6, message="Entity already exists"),
PERMISSION_DENIED = (code=7, message="No permission to execute the specified operation"),
RESOURCE_EXHAUSTED = (code=8, message="Resource exhausted"),
FAILED_PRECONDITION = (code=9, message="Operation was rejected because the system is not in a state required for the operation's execution"),
ABORTED = (code=10, message="Operation was aborted"),
OUT_OF_RANGE = (code=11, message="Operation was attempted past the valid range"),
UNIMPLEMENTED = (code=12, message="Operation is not implemented or is not supported/enabled in this service"),
INTERNAL = (code=13, message="Internal error"),
UNAVAILABLE = (code=14, message="The service is currently unavailable"),
DATA_LOSS = (code=15, message="Unrecoverable data loss or corruption"),
UNAUTHENTICATED = (code=16, message="The request does not have valid authentication credentials for the operation")
)

grpc_status_info(code) = StatusCode[code+1]
grpc_status_message(code) = (grpc_status_info(code)).message
grpc_status_code_str(code) = string(propertynames(StatusCode)[code+1])

#=
const SEND_BUFFER_SZ = 1024 * 1024
function buffer_send_data(input::Channel{T}) where T <: ProtoType
Expand Down Expand Up @@ -31,15 +55,46 @@ function send_data(easy::Curl.Easy, input::Channel{T}, max_send_message_length::
end
end

function grpc_headers()
function grpc_timeout_header_val(timeout::Real)
if round(Int, timeout) == timeout
timeout_secs = round(Int64, timeout)
return "$(timeout_secs)S"
end
timeout *= 1000
if round(Int, timeout) == timeout
timeout_millisecs = round(Int64, timeout)
return "$(timeout_millisecs)m"
end
timeout *= 1000
if round(Int, timeout) == timeout
timeout_microsecs = round(Int64, timeout)
return "$(timeout_microsecs)u"
end
timeout *= 1000
timeout_nanosecs = round(Int64, timeout)
return "$(timeout_nanosecs)n"
end

function grpc_headers(; timeout::Real=Inf)
headers = C_NULL
headers = LibCURL.curl_slist_append(headers, "User-Agent: $(Curl.USER_AGENT)")
headers = LibCURL.curl_slist_append(headers, "Content-Type: application/grpc+proto")
headers = LibCURL.curl_slist_append(headers, "Content-Length:")
if timeout !== Inf
headers = LibCURL.curl_slist_append(headers, "grpc-timeout: $(grpc_timeout_header_val(timeout))")
end
headers
end

function easy_handle(maxage::Clong, keepalive::Clong, negotiation::Symbol, revocation::Bool)
function grpc_request_header(request_timeout::Real)
if request_timeout == Inf
GRPC_STATIC_HEADERS[]
else
grpc_headers(; timeout=request_timeout)
end
end

function easy_handle(maxage::Clong, keepalive::Clong, negotiation::Symbol, revocation::Bool, request_timeout::Real)
easy = Curl.Easy()
http_version = (negotiation === :http2) ? CURL_HTTP_VERSION_2_0 :
(negotiation === :http2_tls) ? CURL_HTTP_VERSION_2TLS :
Expand All @@ -48,7 +103,7 @@ function easy_handle(maxage::Clong, keepalive::Clong, negotiation::Symbol, revoc
Curl.setopt(easy, CURLOPT_HTTP_VERSION, http_version)
Curl.setopt(easy, CURLOPT_PIPEWAIT, Clong(1))
Curl.setopt(easy, CURLOPT_POST, Clong(1))
Curl.setopt(easy, CURLOPT_HTTPHEADER, GRPC_STATIC_HEADERS[])
Curl.setopt(easy, CURLOPT_HTTPHEADER, grpc_request_header(request_timeout))
if !revocation
Curl.setopt(easy, CURLOPT_SSL_OPTIONS, CURLSSLOPT_NO_REVOKE)
end
Expand Down Expand Up @@ -126,7 +181,7 @@ function grpc_request(downloader::Downloader, url::String, input::Channel{T1}, o
max_recv_message_length::Int = DEFAULT_MAX_RECV_MESSAGE_LENGTH,
max_send_message_length::Int = DEFAULT_MAX_SEND_MESSAGE_LENGTH,
verbose::Bool = false)::gRPCStatus where {T1 <: ProtoType, T2 <: ProtoType}
Curl.with_handle(easy_handle(maxage, keepalive, negotiation, revocation)) do easy
Curl.with_handle(easy_handle(maxage, keepalive, negotiation, revocation, request_timeout)) do easy
# setup the request
Curl.set_url(easy, url)
Curl.set_timeout(easy, request_timeout)
Expand Down Expand Up @@ -188,6 +243,29 @@ function grpc_request(downloader::Downloader, url::String, input::Channel{T1}, o
end
end

(easy.code == CURLE_OK) ? gRPCStatus(true, "") : gRPCStatus(false, Curl.get_curl_errstr(easy))
@debug("response headers", easy.res_hdrs)

# parse the grpc headers
grpc_status = StatusCode.OK.code
grpc_message = ""
for hdr in easy.res_hdrs
if startswith(hdr, "grpc-status")
grpc_status = parse(Int, strip(last(split(hdr, ':'; limit=2))))
elseif startswith(hdr, "grpc-message")
grpc_message = string(strip(last(split(hdr, ':'; limit=2))))
end
end
if (easy.code == CURLE_OPERATION_TIMEDOUT) && (grpc_status == StatusCode.OK.code)
grpc_status = StatusCode.DEADLINE_EXCEEDED.code
end
if (grpc_status != StatusCode.OK.code) && isempty(grpc_message)
grpc_message = grpc_status_message(grpc_status)
end

if ((easy.code == CURLE_OK) && (grpc_status == StatusCode.OK.code))
gRPCStatus(true, grpc_status, "")
else
gRPCStatus(false, grpc_status, isempty(grpc_message) ? Curl.get_curl_errstr(easy) : grpc_message)
end
end
end
2 changes: 1 addition & 1 deletion src/gRPCClient.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ using ProtoBuf
import Downloads: Curl
import ProtoBuf: call_method

export gRPCController, gRPCChannel, gRPCException, gRPCServiceCallException, gRPCMessageTooLargeException, gRPCStatus, gRPCCheck
export gRPCController, gRPCChannel, gRPCException, gRPCServiceCallException, gRPCMessageTooLargeException, gRPCStatus, gRPCCheck, StatusCode

abstract type gRPCException <: Exception end

Expand Down
19 changes: 13 additions & 6 deletions src/grpc.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@
"""
struct gRPCStatus
success::Bool
grpc_status::Int
message::String
exception::Union{Nothing,Exception}
end

gRPCStatus(success::Bool, message::AbstractString) = gRPCStatus(success, string(message), nothing)
gRPCStatus(success::Bool, grpc_status::Int, message::AbstractString) = gRPCStatus(success, grpc_status, string(message), nothing)
function gRPCStatus(status_future)
try
fetch(status_future)
Expand All @@ -24,7 +25,7 @@ function gRPCStatus(status_future)
while isa(task_exception, TaskFailedException)
task_exception = task_exception.task.exception
end
gRPCStatus(false, string(task_exception), task_exception)
gRPCStatus(false, StatusCode.INTERNAL.code, string(task_exception), task_exception)
end
end

Expand All @@ -39,10 +40,11 @@ It has the following members:
- `message`: any error message if request was not successful
"""
struct gRPCServiceCallException <: gRPCException
grpc_status::Int
message::String
end

Base.show(io::IO, m::gRPCServiceCallException) = print(io, "gRPCServiceCallException - $(m.message)")
Base.show(io::IO, m::gRPCServiceCallException) = print(io, "gRPCServiceCallException: $(m.grpc_status), $(m.message)")

"""
gRPCCheck(status; throw_error::Bool=true)
Expand All @@ -56,7 +58,7 @@ gRPCCheck(status_future; throw_error::Bool=true) = gRPCCheck(gRPCStatus(status_f
function gRPCCheck(status::gRPCStatus; throw_error::Bool=true)
if throw_error && !status.success
if status.exception === nothing
throw(gRPCServiceCallException(status.message))
throw(gRPCServiceCallException(status.grpc_status, status.message))
else
throw(status.exception)
end
Expand Down Expand Up @@ -180,8 +182,13 @@ function call_method(channel::gRPCChannel, service::ServiceDescriptor, method::M
outchannel, status_future = call_method(channel, service, method, controller, input, Channel{T2}())
try
take!(outchannel), status_future
catch
nothing, status_future
catch ex
gRPCCheck(status_future) # check for core issue
if isa(ex, InvalidStateException)
throw(gRPCServiceCallException("Server closed connection without any response"))
else
rethrow() # throw this error if there's no other issue
end
end
end
function call_method(channel::gRPCChannel, service::ServiceDescriptor, method::MethodDescriptor, controller::gRPCController, input::Channel{T1}, outchannel::Channel{T2}) where {T1 <: ProtoType, T2 <: ProtoType}
Expand Down
4 changes: 3 additions & 1 deletion test/.gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
grpc-go
server.pid
runserver_*
routeguide_*
grpcerrors_*
testservers
84 changes: 84 additions & 0 deletions test/GrpcerrorsClients/GrpcerrorsClients.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
module GrpcerrorsClients
using gRPCClient

include("grpcerrors.jl")
using .grpcerrors

import Base: show

# begin service: grpcerrors.GRPCErrors

export GRPCErrorsBlockingClient, GRPCErrorsClient

struct GRPCErrorsBlockingClient
controller::gRPCController
channel::gRPCChannel
stub::GRPCErrorsBlockingStub

function GRPCErrorsBlockingClient(api_base_url::String; kwargs...)
controller = gRPCController(; kwargs...)
channel = gRPCChannel(api_base_url)
stub = GRPCErrorsBlockingStub(channel)
new(controller, channel, stub)
end
end

struct GRPCErrorsClient
controller::gRPCController
channel::gRPCChannel
stub::GRPCErrorsStub

function GRPCErrorsClient(api_base_url::String; kwargs...)
controller = gRPCController(; kwargs...)
channel = gRPCChannel(api_base_url)
stub = GRPCErrorsStub(channel)
new(controller, channel, stub)
end
end

show(io::IO, client::GRPCErrorsBlockingClient) = print(io, "GRPCErrorsBlockingClient(", client.channel.baseurl, ")")
show(io::IO, client::GRPCErrorsClient) = print(io, "GRPCErrorsClient(", client.channel.baseurl, ")")

import .grpcerrors: SimpleRPC
"""
SimpleRPC

- input: grpcerrors.Data
- output: grpcerrors.Data
"""
SimpleRPC(client::GRPCErrorsBlockingClient, inp::grpcerrors.Data) = SimpleRPC(client.stub, client.controller, inp)
SimpleRPC(client::GRPCErrorsClient, inp::grpcerrors.Data, done::Function) = SimpleRPC(client.stub, client.controller, inp, done)

import .grpcerrors: StreamResponse
"""
StreamResponse

- input: grpcerrors.Data
- output: Channel{grpcerrors.Data}
"""
StreamResponse(client::GRPCErrorsBlockingClient, inp::grpcerrors.Data) = StreamResponse(client.stub, client.controller, inp)
StreamResponse(client::GRPCErrorsClient, inp::grpcerrors.Data, done::Function) = StreamResponse(client.stub, client.controller, inp, done)

import .grpcerrors: StreamRequest
"""
StreamRequest

- input: Channel{grpcerrors.Data}
- output: grpcerrors.Data
"""
StreamRequest(client::GRPCErrorsBlockingClient, inp::Channel{grpcerrors.Data}) = StreamRequest(client.stub, client.controller, inp)
StreamRequest(client::GRPCErrorsClient, inp::Channel{grpcerrors.Data}, done::Function) = StreamRequest(client.stub, client.controller, inp, done)

import .grpcerrors: StreamRequestResponse
"""
StreamRequestResponse

- input: Channel{grpcerrors.Data}
- output: Channel{grpcerrors.Data}
"""
StreamRequestResponse(client::GRPCErrorsBlockingClient, inp::Channel{grpcerrors.Data}) = StreamRequestResponse(client.stub, client.controller, inp)
StreamRequestResponse(client::GRPCErrorsClient, inp::Channel{grpcerrors.Data}, done::Function) = StreamRequestResponse(client.stub, client.controller, inp, done)

# end service: grpcerrors.GRPCErrors

end # module GrpcerrorsClients
4 changes: 4 additions & 0 deletions test/GrpcerrorsClients/grpcerrors.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
module grpcerrors
const _ProtoBuf_Top_ = @static isdefined(parentmodule(@__MODULE__), :_ProtoBuf_Top_) ? (parentmodule(@__MODULE__))._ProtoBuf_Top_ : parentmodule(@__MODULE__)
include("grpcerrors_pb.jl")
end
77 changes: 77 additions & 0 deletions test/GrpcerrorsClients/grpcerrors_pb.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# syntax: proto3
using ProtoBuf
import ProtoBuf.meta

mutable struct Data <: ProtoType
__protobuf_jl_internal_meta::ProtoMeta
__protobuf_jl_internal_values::Dict{Symbol,Any}
__protobuf_jl_internal_defaultset::Set{Symbol}

function Data(; kwargs...)
obj = new(meta(Data), Dict{Symbol,Any}(), Set{Symbol}())
values = obj.__protobuf_jl_internal_values
symdict = obj.__protobuf_jl_internal_meta.symdict
for nv in kwargs
fldname, fldval = nv
fldtype = symdict[fldname].jtyp
(fldname in keys(symdict)) || error(string(typeof(obj), " has no field with name ", fldname))
values[fldname] = isa(fldval, fldtype) ? fldval : convert(fldtype, fldval)
end
obj
end
end # mutable struct Data
const __meta_Data = Ref{ProtoMeta}()
function meta(::Type{Data})
ProtoBuf.metalock() do
if !isassigned(__meta_Data)
__meta_Data[] = target = ProtoMeta(Data)
allflds = Pair{Symbol,Union{Type,String}}[:mode => Int32, :param => Int32]
meta(target, Data, allflds, ProtoBuf.DEF_REQ, ProtoBuf.DEF_FNUM, ProtoBuf.DEF_VAL, ProtoBuf.DEF_PACK, ProtoBuf.DEF_WTYPES, ProtoBuf.DEF_ONEOFS, ProtoBuf.DEF_ONEOF_NAMES)
end
__meta_Data[]
end
end
function Base.getproperty(obj::Data, name::Symbol)
if name === :mode
return (obj.__protobuf_jl_internal_values[name])::Int32
elseif name === :param
return (obj.__protobuf_jl_internal_values[name])::Int32
else
getfield(obj, name)
end
end

# service methods for GRPCErrors
const _GRPCErrors_methods = MethodDescriptor[
MethodDescriptor("SimpleRPC", 1, Data, Data),
MethodDescriptor("StreamResponse", 2, Data, Channel{Data}),
MethodDescriptor("StreamRequest", 3, Channel{Data}, Data),
MethodDescriptor("StreamRequestResponse", 4, Channel{Data}, Channel{Data})
] # const _GRPCErrors_methods
const _GRPCErrors_desc = ServiceDescriptor("grpcerrors.GRPCErrors", 1, _GRPCErrors_methods)

GRPCErrors(impl::Module) = ProtoService(_GRPCErrors_desc, impl)

mutable struct GRPCErrorsStub <: AbstractProtoServiceStub{false}
impl::ProtoServiceStub
GRPCErrorsStub(channel::ProtoRpcChannel) = new(ProtoServiceStub(_GRPCErrors_desc, channel))
end # mutable struct GRPCErrorsStub

mutable struct GRPCErrorsBlockingStub <: AbstractProtoServiceStub{true}
impl::ProtoServiceBlockingStub
GRPCErrorsBlockingStub(channel::ProtoRpcChannel) = new(ProtoServiceBlockingStub(_GRPCErrors_desc, channel))
end # mutable struct GRPCErrorsBlockingStub

SimpleRPC(stub::GRPCErrorsStub, controller::ProtoRpcController, inp::Data, done::Function) = call_method(stub.impl, _GRPCErrors_methods[1], controller, inp, done)
SimpleRPC(stub::GRPCErrorsBlockingStub, controller::ProtoRpcController, inp::Data) = call_method(stub.impl, _GRPCErrors_methods[1], controller, inp)

StreamResponse(stub::GRPCErrorsStub, controller::ProtoRpcController, inp::Data, done::Function) = call_method(stub.impl, _GRPCErrors_methods[2], controller, inp, done)
StreamResponse(stub::GRPCErrorsBlockingStub, controller::ProtoRpcController, inp::Data) = call_method(stub.impl, _GRPCErrors_methods[2], controller, inp)

StreamRequest(stub::GRPCErrorsStub, controller::ProtoRpcController, inp::Channel{Data}, done::Function) = call_method(stub.impl, _GRPCErrors_methods[3], controller, inp, done)
StreamRequest(stub::GRPCErrorsBlockingStub, controller::ProtoRpcController, inp::Channel{Data}) = call_method(stub.impl, _GRPCErrors_methods[3], controller, inp)

StreamRequestResponse(stub::GRPCErrorsStub, controller::ProtoRpcController, inp::Channel{Data}, done::Function) = call_method(stub.impl, _GRPCErrors_methods[4], controller, inp, done)
StreamRequestResponse(stub::GRPCErrorsBlockingStub, controller::ProtoRpcController, inp::Channel{Data}) = call_method(stub.impl, _GRPCErrors_methods[4], controller, inp)

export Data, GRPCErrors, GRPCErrorsStub, GRPCErrorsBlockingStub, SimpleRPC, StreamResponse, StreamRequest, StreamRequestResponse
Loading