Skip to content

Commit

Permalink
Add unwrap and wrap plugs
Browse files Browse the repository at this point in the history
  • Loading branch information
Allen Madsen authored and Allen Madsen committed Oct 26, 2018
1 parent 5bf600f commit 316a0ea
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 0 deletions.
38 changes: 38 additions & 0 deletions lib/conduit/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,44 @@ defmodule Conduit.Message do
%{message | created_at: created_at}
end

@fields [
:source,
:destination,
:user_id,
:correlation_id,
:message_id,
:content_type,
:content_encoding,
:created_by,
:created_at
]
@doc """
Returns all non-`nil` fields from the message as a map.
The following fields will be returned:
#{@fields |> Enum.map(&"* `#{inspect(&1)}`") |> Enum.join("\n ")}
## Examples
iex> import Conduit.Message
iex> message =
iex> %Conduit.Message{}
iex> |> put_message_id("1")
iex> |> put_correlation_id("2")
iex> get_fields(message)
%{
message_id: "1",
correlation_id: "2"
}
"""
@spec get_fields(__MODULE__.t()) :: %{atom() => term()}
def get_fields(%__MODULE__{} = message) do
message
|> Map.take(@fields)
|> Enum.filter(fn {_, value} -> value != nil end)
|> Enum.into(%{})
end

@doc """
Returns a header from the message specified by `key`.
Expand Down
70 changes: 70 additions & 0 deletions lib/conduit/plug/unwrap.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
defmodule Conduit.Plug.Unwrap do
@moduledoc """
Plug to help unwrap headers and fields from the message body
This can be helpful if your broker doesn't support message headers natively.
By default this plug expects the body to be a map containing 3 string keys:
* `"headers"` - will be put into the message headers
* `"fields" - will be put into the messages fields (i.e. `:message_id`, `:correlation_id`, etc.)
* `"body"` - will replace the current wrapper body
If you want a different wrapping structure for the message, you can pass the the
`:unwrap_fn` option. The wrap function should accept a message and return a message.
## Examples
iex> alias Conduit.Message
iex> defmodule MyPipeline do
iex> use Conduit.Plug.Builder
iex> plug Conduit.Plug.Unwrap
iex> end
iex>
iex> message =
iex> %Message{}
iex> |> Message.put_body(%{
iex> "body" => %{},
iex> "headers" => %{"foo" => "bar"},
iex> "fields" => %{"correlation_id" => "1"}
iex> })
iex> |> MyPipeline.run()
iex> message.body
%{}
iex> Message.get_header(message, "foo")
"bar"
iex> message.correlation_id
"1"
"""
use Conduit.Plug.Builder

require Logger

@doc """
Extracts the headers and fields from the wrapped body of the message
"""
def call(message, next, opts) do
unwrap_fn = Keyword.get(opts, :unwrap_fn, &default_unwrap/1)

message
|> unwrap_fn.()
|> next.()
end

defp default_unwrap(message) do
%{"fields" => fields, "headers" => headers, "body" => body} = message.body

message
# Merge here because lower down code is putting in a routing key, and source
|> put_headers(Map.merge(message.headers, headers))
|> put_body(body)
|> put_content_encoding(fields["content_encoding"])
|> put_content_type(fields["content_type"])
|> put_correlation_id(fields["correlation_id"])
|> put_created_at(fields["created_at"])
|> put_created_by(fields["created_by"])
|> put_message_id(fields["message_id"])
|> put_user_id(fields["user_id"])
end
end
66 changes: 66 additions & 0 deletions lib/conduit/plug/wrap.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
defmodule Conduit.Plug.Wrap do
@moduledoc """
Plug to help wrap headers and fields into the message body
This can be helpful if your broker doesn't support message headers natively.
By default this plug will update the body of the message to:
%{
"headers" => message.headers,
"fields" => %{}, # message_id, correlation_id, etc.
"body" => message.body
}
If you want a different wrapping structure for the message, you can pass the the
`:wrap_fn` option. The wrap function should accept a message, the fields, the headers,
and body. The return value should be a message.
## Examples
iex> alias Conduit.Message
iex> defmodule MyPipeline do
iex> use Conduit.Plug.Builder
iex> plug Conduit.Plug.Wrap
iex> end
iex>
iex> message =
iex> %Message{}
iex> |> Message.put_correlation_id("1")
iex> |> Message.put_header("foo", "bar")
iex> |> Message.put_body(%{})
iex> |> MyPipeline.run()
iex> message.body
%{
"headers" => %{
"foo" => "bar"
},
"fields" => %{
"correlation_id" => "1"
},
"body" => %{}
}
"""
use Conduit.Plug.Builder

@doc """
Puts headers and fields into the body of the message
"""
def call(%Message{headers: headers, body: body} = message, next, opts) do
wrap_fn = Keyword.get(opts, :wrap_fn, &default_wrap/4)

fields =
message
|> get_fields()
|> Map.new(fn {key, value} -> {to_string(key), value} end)

message
|> wrap_fn.(fields, headers, body)
|> Message.put_private(:wrapped, true)
|> next.()
end

defp default_wrap(message, fields, headers, body) do
put_body(message, %{"fields" => fields, "headers" => headers, "body" => body})
end
end
4 changes: 4 additions & 0 deletions test/conduit/plug/unwrap_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
defmodule Conduit.Plug.UnwrapTest do
use ExUnit.Case
doctest Conduit.Plug.Unwrap
end
4 changes: 4 additions & 0 deletions test/conduit/plug/wrap_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
defmodule Conduit.Plug.WrapTest do
use ExUnit.Case
doctest Conduit.Plug.Wrap
end

0 comments on commit 316a0ea

Please sign in to comment.