Skip to content

Commit

Permalink
provide extras opts for sinks and sources implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
boonious committed Mar 1, 2024
1 parent 523225c commit b58c284
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 42 deletions.
5 changes: 5 additions & 0 deletions lib/stow/http/headers.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
defmodule Stow.Http.Headers do
@moduledoc false
defstruct req: [], resp: []
@type t :: %__MODULE__{req: [{binary, binary}], resp: [{binary, binary}]}
end
7 changes: 7 additions & 0 deletions lib/stow/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ defmodule Stow.Pipeline do
use Plug.Builder, unquote(opts)
import Stow.Pipeline
alias Stow.Pipeline

plug(:first)

# Plug always run the first plug
# before halting a pipeline.
# This plug enables halting through a halted conn via `super`
def first(conn, _opts), do: conn
end
end

Expand Down
6 changes: 2 additions & 4 deletions lib/stow/pipeline/source_sink.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,15 @@ defmodule Stow.Pipeline.SourceSink do
plug(Source)
plug(Sink)

@impl true
def call(conn, opts) do
opts = Keyword.validate!(opts, @fields)

@fields
|> Enum.map(&{&1, Keyword.get(opts, &1)})
|> then(&set_private_fields(conn, &1))
|> maybe_halt_pipeline()
|> case do
%Conn{halted: false} = conn -> super(conn, opts |> Keyword.drop(@fields))
conn -> conn
end
|> super(opts)
end

def set_private_fields(conn, []), do: conn
Expand Down
15 changes: 10 additions & 5 deletions lib/stow/plug/sink.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Stow.Plug.Sink do

import Utils, only: [fetch_uri: 2, update_private: 3]

@plug_opts [:uri, :data, :extra_opts]
@plug_opts [:uri, :data, :extras]
@schemes ["file"]

@impl true
Expand All @@ -27,10 +27,10 @@ defmodule Stow.Plug.Sink do
end

defp validate_opts(opts), do: Keyword.validate!(opts, @plug_opts)
defp add_extra_opts(opts), do: [field: :sink, schemes: @schemes] |> Keyword.merge(opts)

defp parse_uri(conn, opts),
do: add_extra_opts(opts) |> then(&fetch_uri(conn, &1)) |> update_conn(conn)
defp parse_uri(conn, opts) do
fetch_uri(conn, [field: :sink, schemes: @schemes] |> Keyword.merge(opts)) |> update_conn(conn)
end

# opt1: data from plug opts
defp fetch_data(_conn, data) when is_binary(data) or is_list(data), do: {:ok, data}
Expand All @@ -48,10 +48,15 @@ defmodule Stow.Plug.Sink do
(uri.scheme <> "_sink")
|> Macro.camelize()
|> then(fn sink -> Module.concat(Sink, sink) end)
|> apply(:put, [uri, data, Keyword.get(opts, :extra_opts, [])])
# to fix fetch "extras" from private sink field or opts
|> apply(:put, [uri, data, Keyword.get(opts, :extras, [])])
|> update_conn(conn, :ok)
end

# to fix: fetch "extras" from private sink field or opts
# defp fetch_headers(nil, _type), do: nil
# etc

defp update_conn(uri, conn, status \\ nil)

defp update_conn({:ok, uri}, conn, nil) do
Expand Down
32 changes: 16 additions & 16 deletions lib/stow/plug/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Stow.Plug.Source do
import Plug.Conn, only: [halt: 1, resp: 3]
import Utils, only: [fetch_uri: 2, put_headers: 3, set_uri_params: 2, update_private: 3]

@plug_opts [:uri, :req_headers, :resp_headers]
@plug_opts [:uri, :extras]
@schemes ["http", "https"]

@impl true
Expand All @@ -29,29 +29,32 @@ defmodule Stow.Plug.Source do
end

defp validate_opts(opts), do: Keyword.validate!(opts, @plug_opts)
defp add_extra_opts(opts), do: [field: :source, schemes: @schemes] |> Keyword.merge(opts)

defp parse_uri(conn, opts) do
add_extra_opts(opts) |> then(&fetch_uri(conn, &1)) |> update_conn(conn)
fetch_uri(conn, [field: :source, schemes: @schemes] |> Keyword.merge(opts))
|> update_conn(conn)
end

defp get_req_headers(conn, opts) do
get_in(conn.private, [:stow, Access.key!(:source)]) |> private_headers(:req) ||
Keyword.get(opts, :req_headers, [])
fetch_headers(conn.private.stow.source, :req) ||
fetch_headers(Keyword.get(opts, :extras), :req) || []
end

defp private_headers(nil, _type), do: nil
defp private_headers(%Source{req_headers: h}, :req) when h != [], do: h
defp private_headers(%Source{resp_headers: h}, :resp) when h != [], do: h
defp private_headers(%Source{}, _), do: nil
# to fix: refactor this to the plug utils module
defp fetch_headers(nil, _type), do: nil
defp fetch_headers(%Source{extras: %{headers: %{req: h}}}, :req) when h != [], do: h
defp fetch_headers(%Source{extras: %{headers: %{resp: h}}}, :resp) when h != [], do: h
defp fetch_headers(%{headers: %{req: h}}, :req) when h != [], do: h
defp fetch_headers(%{headers: %{resp: h}}, :resp) when h != [], do: h
defp fetch_headers(%{}, _), do: nil

defp source_data(conn, uri, opts) do
normalise_scheme_name(conn.scheme)
|> Kernel.<>("_source")
|> Macro.camelize()
|> then(fn source -> Module.concat(Source, source) end)
|> apply(:get, [conn, opts |> Keyword.drop(@plug_opts)])
|> update_conn(conn, uri, Keyword.get(opts, :resp_headers, []))
|> update_conn(conn, uri, fetch_headers(Keyword.get(opts, :extras), :resp) || [])
end

defp normalise_scheme_name(scheme) when scheme in [:http, :https], do: "http"
Expand All @@ -62,8 +65,8 @@ defmodule Stow.Plug.Source do
source = get_in(conn.private, [:stow, Access.key!(:source)])

headers = [
req_headers: private_headers(source, :req) || [],
resp_headers: private_headers(source, :resp) || []
req_headers: fetch_headers(source, :req) || [],
resp_headers: fetch_headers(source, :resp) || []
]

{:ok, uri, update_private(conn, :source, Source.new(uri |> to_string(), headers))}
Expand All @@ -74,10 +77,7 @@ defmodule Stow.Plug.Source do
end

defp update_conn({:ok, {200, headers, body}}, conn, uri, resp_headers) do
private_headers =
get_in(conn.private, [:stow, Access.key!(:source)]) |> private_headers(:resp)

opts_headers = private_headers || resp_headers
opts_headers = fetch_headers(conn.private.stow.source, :resp) || resp_headers

conn
|> resp(200, body)
Expand Down
7 changes: 4 additions & 3 deletions lib/stow/sink.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ defmodule Stow.Sink do
Behaviour for writing and deleting data.
"""

defstruct [:uri, :status]
defstruct [:uri, :status, extras: []]

@type t :: %__MODULE__{
uri: binary() | nil,
status: nil | :ok | {:error, term()}
status: nil | :ok | {:error, term()},
extras: keyword()
}

@type uri :: URI.t()
Expand All @@ -17,7 +18,7 @@ defmodule Stow.Sink do
@callback delete(uri(), options()) :: {:ok, uri()} | {:error, File.posix()}
@callback put(uri(), data, options()) :: {:ok, uri()} | {:error, File.posix()}

def new(uri), do: %__MODULE__{uri: uri}
def new(uri, extras \\ []), do: %__MODULE__{uri: uri, extras: extras}
def done(uri), do: %__MODULE__{uri: uri, status: :ok}
def failed(error), do: %__MODULE__{status: error}
end
15 changes: 10 additions & 5 deletions lib/stow/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ defmodule Stow.Source do
"""

alias Stow.Http.Client, as: HttpClient
alias Stow.Http.Headers
alias Stow.Source.Extras

defstruct [:uri, :status, req_headers: [], resp_headers: []]
defstruct [:uri, :status, extras: %Extras{}]

@type t :: %__MODULE__{
uri: binary() | nil,
req_headers: [tuple()],
resp_headers: [tuple()],
extras: Extras.t(),
status: nil | :ok | {:error, term()}
}

Expand All @@ -22,8 +23,12 @@ defmodule Stow.Source do
def new(uri, headers \\ []) do
%__MODULE__{
uri: uri,
req_headers: Keyword.get(headers, :req_headers, []),
resp_headers: Keyword.get(headers, :resp_headers, [])
extras: %Extras{
headers: %Headers{
req: Keyword.get(headers, :req_headers, []),
resp: Keyword.get(headers, :resp_headers, [])
}
}
}
end

Expand Down
6 changes: 6 additions & 0 deletions lib/stow/source/extras.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
defmodule Stow.Source.Extras do
@moduledoc false
alias Stow.Http.Headers
defstruct headers: %Headers{}
@type t :: %__MODULE__{headers: Headers.t()}
end
6 changes: 5 additions & 1 deletion test/stow/pipeline/source_sink_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,11 @@ defmodule Stow.Pipeline.SourceSinkTest do
assert %Conn{} = conn = SourceSink.call(context.conn, source: src_uri, sink: sink_uri)

assert %Pipeline{
source: %Source{status: :ok, uri: ^src_uri, req_headers: [], resp_headers: []},
source: %Source{
status: :ok,
uri: ^src_uri,
extras: %{headers: %{req: [], resp: []}}
},
sink: %Sink{uri: ^sink_uri, status: :ok}
} = conn.private.stow
end
Expand Down
10 changes: 6 additions & 4 deletions test/stow/plug/sink_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ defmodule Stow.Plug.SinkTest do

defmodule FileSinkTestPlug do
use Plug.Builder
plug(Sink, uri: "file:/path/to/file", data: "test data")
plug(Sink, uri: "file:/path/to/file", data: "test data", extras: [modes: [:compressed]])
end

setup :verify_on_exit!
Expand Down Expand Up @@ -41,7 +41,7 @@ defmodule Stow.Plug.SinkTest do
dir = path |> Path.dirname()

FileIO |> expect(:exists?, fn ^dir -> true end)
FileIO |> expect(:write, fn ^path, ^data, [] -> :ok end)
FileIO |> expect(:write, fn ^path, ^data, [:compressed] -> :ok end)

assert %Conn{} = conn = __MODULE__.FileSinkTestPlug.call(context.conn, [])
assert %SinkStruct{uri: ^uri_s, status: :ok} = conn.private.stow.sink
Expand All @@ -61,8 +61,10 @@ defmodule Stow.Plug.SinkTest do

test "via connection private params", %{data: data, path: path, uri_s: uri_s} = context do
conn = resp(context.conn, 200, data)
conn = update_private(conn, :sink, SinkStruct.new(uri_s))
conn = update_private(conn, :sink, SinkStruct.new(uri_s, modes: :compressed))

# to fix, extras opts in private sink field.
# FileIO |> expect(:write, fn ^path, ^data, [:compressed] -> :ok end)
FileIO |> expect(:write, fn ^path, ^data, [] -> :ok end)

assert %Conn{} = conn = Sink.call(conn, [])
Expand All @@ -77,7 +79,7 @@ defmodule Stow.Plug.SinkTest do
opts = [
uri: context.uri_s,
data: data,
extra_opts: [base_dir: base_dir, modes: [:compressed]]
extras: [base_dir: base_dir, modes: [:compressed]]
]

FileIO |> expect(:exists?, fn ^dir -> true end)
Expand Down
16 changes: 12 additions & 4 deletions test/stow/plug/source_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@ defmodule Stow.Plug.SourceTest do

plug(Source,
uri: "http://localhost/path/to/source?foo=bar",
req_headers: [{"accept", "text/html"}, {"accept-charset", "utf-8"}],
resp_headers: [{"server", "apache/2.4.1 (unix)"}, {"cache-control", "max-age=604800"}]
extras: %{
headers: %{
req: [{"accept", "text/html"}, {"accept-charset", "utf-8"}],
resp: [{"server", "apache/2.4.1 (unix)"}, {"cache-control", "max-age=604800"}]
}
}
)
end

Expand Down Expand Up @@ -56,8 +60,12 @@ defmodule Stow.Plug.SourceTest do
conn =
Source.call(conn,
uri: uri,
req_headers: context.req_headers,
resp_headers: context.resp_headers
extras: %{
headers: %{
req: context.req_headers,
resp: context.resp_headers
}
}
)

assert %Conn{} = conn
Expand Down

0 comments on commit b58c284

Please sign in to comment.