diff --git a/lib/grpc/server/adapters/cowboy/handler.ex b/lib/grpc/server/adapters/cowboy/handler.ex index ae395c40..eb7056dd 100644 --- a/lib/grpc/server/adapters/cowboy/handler.ex +++ b/lib/grpc/server/adapters/cowboy/handler.ex @@ -29,7 +29,8 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do pid: server_rpc_pid :: pid, handling_timer: timeout_timer_ref :: reference, pending_reader: nil | pending_reader, - access_mode: GRPC.Server.Stream.access_mode() + access_mode: GRPC.Server.Stream.access_mode(), + exception_log_filter: exception_log_filter() } @type init_result :: {:cowboy_loop, :cowboy_req.req(), stream_state} | {:ok, :cowboy_req.req(), init_state} @@ -40,6 +41,8 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do @type headers :: %{binary() => binary()} + @type exception_log_filter :: {module(), atom()} | nil + @doc """ This function is meant to be called whenever a new request arrives to an existing connection. This handler works mainly with two linked processes. @@ -52,6 +55,7 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do @spec init(:cowboy_req.req(), state :: init_state) :: init_result def init(req, {endpoint, {_name, server}, route, opts} = state) do http_method = extract_http_method(req) |> String.to_existing_atom() + exception_log_filter = extract_exception_log_filter_opt(opts) with {:ok, access_mode, sub_type, content_type} <- find_content_type_subtype(req), {:ok, codec} <- find_codec(sub_type, content_type, server), @@ -98,7 +102,8 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do pid: server_rpc_pid, handling_timer: timer_ref, pending_reader: nil, - access_mode: access_mode + access_mode: access_mode, + exception_log_filter: exception_log_filter } } else @@ -110,6 +115,19 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do end end + defp extract_exception_log_filter_opt(opts) do + case opts[:exception_log_filter] do + {module, func_name} when is_atom(module) and is_atom(func_name) -> + {module, func_name} + + nil -> + nil + + invalid -> + raise ArgumentError, "invalid exception log filter: #{inspect(invalid)}" + end + end + defp find_codec(subtype, content_type, server) do if codec = Enum.find(server.__meta__(:codecs), nil, fn c -> c.name() == subtype end) do {:ok, codec} @@ -466,7 +484,7 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do [req: req] |> ReportException.new(error) - |> log_error() + |> maybe_log_error(state.exception_log_filter) {:stop, req, state} end @@ -493,7 +511,7 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do [req: req] |> ReportException.new(error, stacktrace) - |> log_error(stacktrace) + |> maybe_log_error(state.exception_log_filter, stacktrace) {:stop, req, state} end @@ -506,7 +524,7 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do [req: req] |> ReportException.new(reason, stack, kind) - |> log_error(stack) + |> maybe_log_error(state.exception_log_filter, stack) {:stop, req, state} end @@ -517,7 +535,7 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do [req: req] |> ReportException.new(reason, stacktrace) - |> log_error(stacktrace) + |> maybe_log_error(state.exception_log_filter, stacktrace) {:stop, req, state} end @@ -705,11 +723,31 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do {:wait, ref} end - defp log_error(%ReportException{kind: kind} = exception, stacktrace \\ []) do + defp maybe_log_error(exception, filter, stacktrace \\ []) + + defp maybe_log_error( + %ReportException{} = exception, + {module, func_name}, + stacktrace + ) do + if apply(module, func_name, [exception]) do + log_error(exception, stacktrace) + else + :ok + end + end + + defp maybe_log_error(exception, nil, stacktrace) do + log_error(exception, stacktrace) + end + + defp log_error(%ReportException{kind: kind} = exception, stacktrace) do crash_reason = GRPC.Logger.crash_reason(kind, exception, stacktrace) kind |> Exception.format(exception, stacktrace) |> Logger.error(crash_reason: crash_reason) + + :ok end end diff --git a/lib/grpc/server/supervisor.ex b/lib/grpc/server/supervisor.ex index ea486a1f..946d92cc 100644 --- a/lib/grpc/server/supervisor.ex +++ b/lib/grpc/server/supervisor.ex @@ -41,6 +41,9 @@ defmodule GRPC.Server.Supervisor do * `:endpoint` - defines the endpoint module that will be started. * `:port` - the HTTP port for the endpoint. * `:servers` - the list of servers that will be be started. + * `:exception_log_filter` - a `{module, function :: atom}` tuple that refers to a filter function of arity 1. + This function will be called with a `GRPC.Server.Adapters.ReportException` struct and must return a boolean + indicating whether or not a given exception should be logged or dropped. Defaults to `nil`, which means all exceptions will be logged. * `:adapter_opts` - options for the adapter. Either `:endpoint` or `:servers` must be present, but not both. @@ -62,13 +65,20 @@ defmodule GRPC.Server.Supervisor do end opts = - case Keyword.validate(opts, [:endpoint, :servers, :start_server, :port, :adapter_opts]) do + case Keyword.validate(opts, [ + :endpoint, + :servers, + :start_server, + :port, + :adapter_opts, + :exception_log_filter + ]) do {:ok, _opts} -> opts {:error, _} -> raise ArgumentError, - "just [:endpoint, :servers, :start_server, :port, :adapter_opts] are accepted as arguments, and any other keys for adapters should be passed as adapter_opts!" + "just [:endpoint, :servers, :start_server, :port, :adapter_opts, :exception_log_filter] are accepted as arguments, and any other keys for adapters should be passed as adapter_opts!" end case validate_cred(opts) do diff --git a/test/grpc/integration/server_test.exs b/test/grpc/integration/server_test.exs index 36ba357f..12313d6f 100644 --- a/test/grpc/integration/server_test.exs +++ b/test/grpc/integration/server_test.exs @@ -208,6 +208,16 @@ defmodule GRPC.Integration.ServerTest do end end + defmodule ExceptionLogFilter do + def always_allow(_exception) do + true + end + + def never_allow(_exception) do + false + end + end + test "multiple servers works" do run_server([FeatureServer, HelloServer], fn port -> {:ok, channel} = GRPC.Stub.connect("localhost:#{port}") @@ -277,6 +287,94 @@ defmodule GRPC.Integration.ServerTest do assert logs =~ "Exception raised while handling /helloworld.Greeter/SayHello" end + test "logs error if exception_log_filter returns true" do + logs = + ExUnit.CaptureLog.capture_log(fn -> + run_server( + [HelloErrorServer], + fn port -> + {:ok, channel} = GRPC.Stub.connect("localhost:#{port}") + req = %Helloworld.HelloRequest{name: "unknown error"} + Helloworld.Greeter.Stub.say_hello(channel, req) + end, + 0, + exception_log_filter: {ExceptionLogFilter, :always_allow} + ) + end) + + assert logs =~ "Exception raised while handling /helloworld.Greeter/SayHello" + end + + test "does not log error if exception_log_filter returns false" do + logs = + ExUnit.CaptureLog.capture_log(fn -> + run_server( + [HelloErrorServer], + fn port -> + {:ok, channel} = GRPC.Stub.connect("localhost:#{port}") + req = %Helloworld.HelloRequest{name: "unknown error"} + Helloworld.Greeter.Stub.say_hello(channel, req) + end, + 0, + exception_log_filter: {TestFalseFilter, :never_allow} + ) + end) + + refute logs =~ "Exception raised while handling /helloworld.Greeter/SayHello" + end + + test "passes RPCErrors to `exception_log_filter" do + defmodule ExceptionFilterMustBeRPCError do + def filter(%GRPC.RPCError{}) do + true + end + end + + run_server( + [HelloErrorServer], + fn port -> + {:ok, channel} = GRPC.Stub.connect("localhost:#{port}") + req = %Helloworld.HelloRequest{name: "unknown error"} + Helloworld.Greeter.Stub.say_hello(channel, req) + end, + 0, + exception_log_filter: {ExceptionFilterMustBeRPCError, :filter} + ) + end + + defmodule ExceptionFilterMustBeRaisedError do + def filter(exception) do + data = exception.adapter_extra[:req][:headers]["test-data"] + + {pid, ref} = :erlang.binary_to_term(data) + send(pid, {:exception_log_filter, ref}) + + true + end + end + + test "passes thrown exceptions to `exception_log_filter" do + test_pid = self() + ref = make_ref() + + run_server( + [HelloErrorServer], + fn port -> + {:ok, channel} = + GRPC.Stub.connect("localhost:#{port}", + headers: [{"test-data", :erlang.term_to_binary({test_pid, ref})}] + ) + + req = %Helloworld.HelloRequest{name: "raise", duration: 0} + Helloworld.Greeter.Stub.say_hello(channel, req) + end, + 0, + exception_log_filter: {ExceptionFilterMustBeRaisedError, :filter} + ) + + assert_receive {:exception_log_filter, ^ref} + end + test "returns appropriate error for stream requests" do run_server([FeatureErrorServer], fn port -> {:ok, channel} = GRPC.Stub.connect("localhost:#{port}")