Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 45 additions & 7 deletions lib/grpc/server/adapters/cowboy/handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do
pid: server_rpc_pid :: pid,
handling_timer: timeout_timer_ref :: reference,
pending_reader: nil | pending_reader,
access_mode: GRPC.Server.Stream.access_mode()
access_mode: GRPC.Server.Stream.access_mode(),
exception_log_filter: exception_log_filter()
}
@type init_result ::
{:cowboy_loop, :cowboy_req.req(), stream_state} | {:ok, :cowboy_req.req(), init_state}
Expand All @@ -40,6 +41,8 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do

@type headers :: %{binary() => binary()}

@type exception_log_filter :: {module(), atom()} | nil

@doc """
This function is meant to be called whenever a new request arrives to an existing connection.
This handler works mainly with two linked processes.
Expand All @@ -52,6 +55,7 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do
@spec init(:cowboy_req.req(), state :: init_state) :: init_result
def init(req, {endpoint, {_name, server}, route, opts} = state) do
http_method = extract_http_method(req) |> String.to_existing_atom()
exception_log_filter = extract_exception_log_filter_opt(opts)

with {:ok, access_mode, sub_type, content_type} <- find_content_type_subtype(req),
{:ok, codec} <- find_codec(sub_type, content_type, server),
Expand Down Expand Up @@ -98,7 +102,8 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do
pid: server_rpc_pid,
handling_timer: timer_ref,
pending_reader: nil,
access_mode: access_mode
access_mode: access_mode,
exception_log_filter: exception_log_filter
}
}
else
Expand All @@ -110,6 +115,19 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do
end
end

defp extract_exception_log_filter_opt(opts) do
case opts[:exception_log_filter] do
{module, func_name} when is_atom(module) and is_atom(func_name) ->
{module, func_name}

nil ->
nil

invalid ->
raise ArgumentError, "invalid exception log filter: #{inspect(invalid)}"
end
end

defp find_codec(subtype, content_type, server) do
if codec = Enum.find(server.__meta__(:codecs), nil, fn c -> c.name() == subtype end) do
{:ok, codec}
Expand Down Expand Up @@ -466,7 +484,7 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do

[req: req]
|> ReportException.new(error)
|> log_error()
|> maybe_log_error(state.exception_log_filter)

{:stop, req, state}
end
Expand All @@ -493,7 +511,7 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do

[req: req]
|> ReportException.new(error, stacktrace)
|> log_error(stacktrace)
|> maybe_log_error(state.exception_log_filter, stacktrace)

{:stop, req, state}
end
Expand All @@ -506,7 +524,7 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do

[req: req]
|> ReportException.new(reason, stack, kind)
|> log_error(stack)
|> maybe_log_error(state.exception_log_filter, stack)

{:stop, req, state}
end
Expand All @@ -517,7 +535,7 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do

[req: req]
|> ReportException.new(reason, stacktrace)
|> log_error(stacktrace)
|> maybe_log_error(state.exception_log_filter, stacktrace)

{:stop, req, state}
end
Expand Down Expand Up @@ -705,11 +723,31 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do
{:wait, ref}
end

defp log_error(%ReportException{kind: kind} = exception, stacktrace \\ []) do
defp maybe_log_error(exception, filter, stacktrace \\ [])

defp maybe_log_error(
%ReportException{} = exception,
{module, func_name},
stacktrace
) do
if apply(module, func_name, [exception]) do
log_error(exception, stacktrace)
else
:ok
end
end

defp maybe_log_error(exception, nil, stacktrace) do
log_error(exception, stacktrace)
end

defp log_error(%ReportException{kind: kind} = exception, stacktrace) do
crash_reason = GRPC.Logger.crash_reason(kind, exception, stacktrace)

kind
|> Exception.format(exception, stacktrace)
|> Logger.error(crash_reason: crash_reason)

:ok
end
end
14 changes: 12 additions & 2 deletions lib/grpc/server/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ defmodule GRPC.Server.Supervisor do
* `:endpoint` - defines the endpoint module that will be started.
* `:port` - the HTTP port for the endpoint.
* `:servers` - the list of servers that will be be started.
* `:exception_log_filter` - a `{module, function :: atom}` tuple that refers to a filter function of arity 1.
This function will be called with a `GRPC.Server.Adapters.ReportException` struct and must return a boolean
indicating whether or not a given exception should be logged or dropped. Defaults to `nil`, which means all exceptions will be logged.
* `:adapter_opts` - options for the adapter.

Either `:endpoint` or `:servers` must be present, but not both.
Expand All @@ -62,13 +65,20 @@ defmodule GRPC.Server.Supervisor do
end

opts =
case Keyword.validate(opts, [:endpoint, :servers, :start_server, :port, :adapter_opts]) do
case Keyword.validate(opts, [
:endpoint,
:servers,
:start_server,
:port,
:adapter_opts,
:exception_log_filter
]) do
{:ok, _opts} ->
opts

{:error, _} ->
raise ArgumentError,
"just [:endpoint, :servers, :start_server, :port, :adapter_opts] are accepted as arguments, and any other keys for adapters should be passed as adapter_opts!"
"just [:endpoint, :servers, :start_server, :port, :adapter_opts, :exception_log_filter] are accepted as arguments, and any other keys for adapters should be passed as adapter_opts!"
end

case validate_cred(opts) do
Expand Down
98 changes: 98 additions & 0 deletions test/grpc/integration/server_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,16 @@ defmodule GRPC.Integration.ServerTest do
end
end

defmodule ExceptionLogFilter do
def always_allow(_exception) do
true
end

def never_allow(_exception) do
false
end
end

test "multiple servers works" do
run_server([FeatureServer, HelloServer], fn port ->
{:ok, channel} = GRPC.Stub.connect("localhost:#{port}")
Expand Down Expand Up @@ -277,6 +287,94 @@ defmodule GRPC.Integration.ServerTest do
assert logs =~ "Exception raised while handling /helloworld.Greeter/SayHello"
end

test "logs error if exception_log_filter returns true" do
logs =
ExUnit.CaptureLog.capture_log(fn ->
run_server(
[HelloErrorServer],
fn port ->
{:ok, channel} = GRPC.Stub.connect("localhost:#{port}")
req = %Helloworld.HelloRequest{name: "unknown error"}
Helloworld.Greeter.Stub.say_hello(channel, req)
end,
0,
exception_log_filter: {ExceptionLogFilter, :always_allow}
)
end)

assert logs =~ "Exception raised while handling /helloworld.Greeter/SayHello"
end

test "does not log error if exception_log_filter returns false" do
logs =
ExUnit.CaptureLog.capture_log(fn ->
run_server(
[HelloErrorServer],
fn port ->
{:ok, channel} = GRPC.Stub.connect("localhost:#{port}")
req = %Helloworld.HelloRequest{name: "unknown error"}
Helloworld.Greeter.Stub.say_hello(channel, req)
end,
0,
exception_log_filter: {TestFalseFilter, :never_allow}
)
end)

refute logs =~ "Exception raised while handling /helloworld.Greeter/SayHello"
end

test "passes RPCErrors to `exception_log_filter" do
defmodule ExceptionFilterMustBeRPCError do
def filter(%GRPC.RPCError{}) do
true
end
end

run_server(
[HelloErrorServer],
fn port ->
{:ok, channel} = GRPC.Stub.connect("localhost:#{port}")
req = %Helloworld.HelloRequest{name: "unknown error"}
Helloworld.Greeter.Stub.say_hello(channel, req)
end,
0,
exception_log_filter: {ExceptionFilterMustBeRPCError, :filter}
)
end

defmodule ExceptionFilterMustBeRaisedError do
def filter(exception) do
data = exception.adapter_extra[:req][:headers]["test-data"]

{pid, ref} = :erlang.binary_to_term(data)
send(pid, {:exception_log_filter, ref})

true
end
end

test "passes thrown exceptions to `exception_log_filter" do
test_pid = self()
ref = make_ref()

run_server(
[HelloErrorServer],
fn port ->
{:ok, channel} =
GRPC.Stub.connect("localhost:#{port}",
headers: [{"test-data", :erlang.term_to_binary({test_pid, ref})}]
)

req = %Helloworld.HelloRequest{name: "raise", duration: 0}
Helloworld.Greeter.Stub.say_hello(channel, req)
end,
0,
exception_log_filter: {ExceptionFilterMustBeRaisedError, :filter}
)

assert_receive {:exception_log_filter, ^ref}
end

test "returns appropriate error for stream requests" do
run_server([FeatureErrorServer], fn port ->
{:ok, channel} = GRPC.Stub.connect("localhost:#{port}")
Expand Down
Loading