Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

<!-- ### Changed -->

### Fixed

- [Proxy] Fixed Kafka request logger in proxy -- now supports also Apache Avro. [#170](https://github.com/Accenture/reactive-interaction-gateway/issues/170)

<!-- ### Deprecated -->

<!-- ### Removed -->
Expand Down
2 changes: 1 addition & 1 deletion apps/rig_api/lib/rig_api/controllers/apis_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ defmodule RigApi.ApisController do
)

method(:string, "Endpoint HTTP method", required: true, example: "GET")
not_secured(:boolean, "Endpoint Security", default: true, example: true)
secured(:boolean, "Endpoint Security", default: false, example: false)
end
end,
ProxyAPIResponse:
Expand Down
20 changes: 20 additions & 0 deletions apps/rig_inbound_gateway/config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,29 @@ config :rig, RigInboundGateway.ApiProxy.Handler.Kinesis,
cors: {:system, "CORS", "*"}

config :rig, RigInboundGateway.RequestLogger.Kafka,
# The list of brokers, given by a comma-separated list of host:port items:
brokers: {:system, :list, "KAFKA_BROKERS", []},
serializer: {:system, "KAFKA_SERIALIZER", nil},
schema_registry_host: {:system, "KAFKA_SCHEMA_REGISTRY_HOST", "localhost:8081"},
# The list of topics to consume messages from:
consumer_topics: [],
# If KAFKA_SSL_ENABLED=0, the KAFKA_SSL_* settings are ignored; otherwise, they're required.
ssl_enabled?: {:system, :boolean, "KAFKA_SSL_ENABLED", false},
# If use_enabled?, the following paths are expected (relative to the `priv` directory):
ssl_ca_certfile: {:system, "KAFKA_SSL_CA_CERTFILE", "ca.crt.pem"},
ssl_certfile: {:system, "KAFKA_SSL_CERTFILE", "client.crt.pem"},
ssl_keyfile: {:system, "KAFKA_SSL_KEYFILE", "client.key.pem"},
# In case the private key is password protected:
ssl_keyfile_pass: {:system, "KAFKA_SSL_KEYFILE_PASS", ""},
# Credentials for SASL/Plain authentication. Example: "plain:myusername:mypassword"
sasl: {:system, "KAFKA_SASL", nil},
log_topic: {:system, "KAFKA_LOG_TOPIC", "rig-request-log"},
log_schema: {:system, "KAFKA_LOG_SCHEMA", ""}

config :rig, RigInboundGateway.RequestLogger.ConfigValidation,
active_loggers: {:system, :list, "REQUEST_LOG", []},
brokers: {:system, :list, "KAFKA_BROKERS", []}

# --------------------------------------
# Authorization Token (JWT)
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule RigInboundGateway.ApiProxy.Router do
If endpoint needs authentication, it is automatically triggered.
Valid HTTP requests are forwarded to given service and their response is sent back to client.
"""
use Rig.Config, [:logger_modules, :active_loggers]
use Rig.Config, :custom_validation
use Plug.Router
require Logger

Expand All @@ -20,11 +20,27 @@ defmodule RigInboundGateway.ApiProxy.Router do
alias RigInboundGateway.ApiProxy.Handler.Kinesis, as: KinesisHandler
alias RigInboundGateway.ApiProxy.Serializer
alias RigInboundGateway.Proxy
alias RigInboundGateway.RequestLogger.ConfigValidation
alias RigMetrics.ProxyMetrics

plug(:match)
plug(:dispatch)

# Confex callback
defp validate_config!(config) do
active_loggers = Keyword.fetch!(config, :active_loggers)
logger_modules = Keyword.fetch!(config, :logger_modules)

:ok =
ConfigValidation.validate_value_difference(
"REQUEST_LOG",
active_loggers,
Map.keys(logger_modules)
)

%{active_loggers: active_loggers, logger_modules: logger_modules}
end

# Get all incoming HTTP requests, check if they are valid, provide authentication if needed
match _ do
active_apis =
Expand Down Expand Up @@ -70,6 +86,18 @@ defmodule RigInboundGateway.ApiProxy.Router do
"kinesis" -> KinesisHandler
end

%{active_loggers: active_loggers, logger_modules: logger_modules} = config()

Enum.each(active_loggers, fn active_logger ->
logger_module = Map.get(logger_modules, active_logger)

logger_module.log_call(
endpoint,
api,
conn
)
end)

{:ok, body, conn} = BodyReader.read_full_body(conn)

conn
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ defmodule RigInboundGateway.Application do
supervisor(RigInboundGatewayWeb.Endpoint, _args = []),
supervisor(RigAuth.Blacklist.Sup, _args = []),
supervisor(RigInboundGateway.ApiProxy.Sup, _args = []),
supervisor(RigInboundGateway.ApiProxy.Handler.Kafka, _args = [])
supervisor(RigInboundGateway.ApiProxy.Handler.Kafka, _args = []),
supervisor(RigInboundGateway.RequestLogger.Kafka, _args = [])
]

opts = [strategy: :one_for_one, name: RigInboundGateway.Supervisor]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
defmodule RigInboundGateway.RequestLogger.ConfigValidation do
@moduledoc """
Module responsible for global validation of environment variables and provides utility
functions to validate configuration in respective modules.
"""
use Rig.Config, :custom_validation
require Logger

# Confex callback
defp validate_config!(config) do
active_loggers = Keyword.fetch!(config, :active_loggers)
brokers = Keyword.fetch!(config, :brokers)

if Enum.member?(active_loggers, "kafka") do
validate_dependent_value("REQUEST_LOG", "kafka", "KAFKA_BROKERS", brokers)
end

%{active_loggers: active_loggers, brokers: brokers}
end

# ---

@spec validate_value_difference(String.t(), [String.t(), ...], [String.t(), ...]) ::
:ok | :shutdown
def validate_value_difference(env_var_name, env_var_value, expected_value) do
filtered_env_var_value = Enum.filter(env_var_value, &(&1 != ""))

is_empty? =
MapSet.new(filtered_env_var_value)
|> MapSet.difference(MapSet.new(expected_value))
|> Enum.empty?()

if !is_empty? do
Logger.error(fn ->
"Invalid configuration for=#{env_var_name} expected=#{inspect(expected_value)} found=#{
inspect(filtered_env_var_value)
}"
end)

exit(:shutdown)
end

:ok
end

# ---

@spec validate_dependent_value(String.t(), String.t(), String.t(), [String.t(), ...]) ::
:ok | :shutdown
def validate_dependent_value(
env_var_name,
env_var_value,
depedency_env_var_name,
dependency_env_var_value
) do
is_empty? =
dependency_env_var_value
|> Enum.empty?()

if is_empty? do
Logger.error(fn ->
"Configuration for=#{env_var_name} is set to=#{env_var_value}, but required configuration=#{
depedency_env_var_name
} is empty"
end)

exit(:shutdown)
end

:ok
end
end
Original file line number Diff line number Diff line change
@@ -1,31 +1,20 @@
defmodule RigInboundGateway.RequestLogger.Console do
@moduledoc """
Example request logger implementation.

"""
@behaviour RigInboundGateway.RequestLogger

@impl RigInboundGateway.RequestLogger
@spec log_call(Proxy.endpoint(), Proxy.api_definition(), %Plug.Conn{}) :: :ok
def log_call(
%{"secured" => true} = endpoint,
%{"auth_type" => "jwt"} = api_definition,
endpoint,
api_definition,
_conn
) do
IO.puts("CALL: #{endpoint_desc(endpoint)} => #{api_definition["proxy"]["target_url"]}")
:ok
end

def log_call(endpoint, api_definition, _conn) do
IO.puts(
"UNAUTHENTICATED CALL: #{endpoint_desc(endpoint)} => #{
api_definition["proxy"]["target_url"]
}"
)

:ok
end

defp endpoint_desc(endpoint) do
"[#{endpoint["id"]}] #{endpoint["method"]} #{endpoint["path"]}"
end
Expand Down
Original file line number Diff line number Diff line change
@@ -1,96 +1,81 @@
defmodule RigInboundGateway.RequestLogger.Kafka do
@moduledoc """
Kafka request logger implementation.

"""
use Rig.Config, [:log_topic, :log_schema]
require Logger
alias Rig.Kafka, as: RigKafka
alias RigAuth.Jwt.Utils, as: Jwt
use Rig.KafkaConsumerSetup, [:log_topic, :log_schema, :serializer]

alias RigInboundGateway.RequestLogger
@behaviour RequestLogger
alias UUID

@behaviour RigInboundGateway.RequestLogger
require Logger

# ---

@spec kafka_handler(any()) ::
:ok
| {:error, %{:__exception__ => true, :__struct__ => atom(), optional(atom()) => any()},
any()}
def kafka_handler(_message), do: :ok

# ---

@spec validate(any()) :: {:ok, any()}
def validate(conf), do: {:ok, conf}

@impl RigInboundGateway.RequestLogger
@spec log_call(Proxy.endpoint(), Proxy.api_definition(), %Plug.Conn{}) :: :ok
@impl RequestLogger
def log_call(
%{"secured" => true} = endpoint,
%{"auth_type" => "jwt"} = api_definition,
endpoint,
_api_definition,
conn
) do
claims = extract_claims!(conn)
username = Map.fetch!(claims, "username")

jti =
case Map.get(claims, "jti") do
nil ->
Logger.warn("jti not found in claims (#{inspect(claims)})")
nil
%{
serializer: serializer
} = config()

jti ->
jti
contenttype =
case serializer do
"avro" -> "avro/binary"
_ -> "application/json"
end

message = %{
id: UUID.uuid4(),
username: username,
jti: jti,
type: "PROXY_API_CALL",
version: "1.0",
timestamp: Timex.now() |> Timex.to_unix(),
level: 0,
payload: %{
endpoint: inspect(endpoint),
api_definition: inspect(api_definition),
request_path: conn.request_path,
remote_ip: conn.remote_ip |> format_ip
kafka_message =
%{
id: UUID.uuid4(),
time: Timex.now() |> Timex.format!("{RFC3339}"),
source: "/rig",
type: "com.rig.proxy.api.call",
contenttype: contenttype,
specversion: "0.2",
data: %{
endpoint: endpoint,
request_path: conn.request_path,
remote_ip: conn.remote_ip |> format_ip
}
}
}

message_json = message |> Poison.encode!()
conf = config()
# If topic does not exist, it will be created automatically, provided the server is
# configured that way. However, this call then returns with {:error, :LeaderNotAvailable},
# as at that point there won't be a partition leader yet.
RigKafka.produce(conf.log_topic, conf.log_schema, _key = username, _plaintext = message_json)
rescue
err ->
case err do
%KeyError{key: "username", term: claims} ->
Logger.warn("""
A username is required for publishing to the right Kafka topic, \
but no such field is found in the given claims: #{inspect(claims)}
""")

_ ->
Logger.error("""
Failed to log API call: #{inspect(err)}
endpoint=#{inspect(endpoint)}
api_definition=#{inspect(api_definition)}
""")
end
|> Poison.encode!()

{:error, err}
produce("partition", kafka_message)
end

def log_call(_endpoint, _api_definition, _conn) do
# Unauthenticated calls are not sent to Kafka.
:ok
# ---

defp produce(server \\ __MODULE__, key, plaintext) do
GenServer.cast(server, {:produce, key, plaintext})
end

@spec extract_claims!(%Plug.Conn{}) :: Jwt.claims()
defp extract_claims!(conn) do
# we assume there is exactly one valid token:
[token] =
conn
|> Plug.Conn.get_req_header("authorization")
|> Stream.filter(&Jwt.valid?/1)
|> Enum.take(1)

{:ok, claims} = Jwt.decode(token)
claims
# ---

@impl GenServer
def handle_cast({:produce, key, plaintext}, %{kafka_config: kafka_config} = state) do
%{log_topic: topic, log_schema: schema} = config()
RigKafka.produce(kafka_config, topic, schema, key, plaintext)
{:noreply, state}
end

# ---

@spec format_ip({integer, integer, integer, integer}) :: String.t()
defp format_ip(ip_tuple) do
ip_tuple
Expand Down
2 changes: 1 addition & 1 deletion apps/rig_inbound_gateway/test/rig/proxy_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ defmodule RigInboundGateway.ProxyTest do
%{
"id" => id <> "1",
"method" => "GET",
"not_secured" => true,
"secured" => false,
"path" => "/foo"
}
]
Expand Down
Loading