Skip to content

Commit

Permalink
Implement :error_message option
Browse files Browse the repository at this point in the history
  • Loading branch information
zorbash committed Mar 31, 2018
1 parent 592151f commit d89060f
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 9 deletions.
23 changes: 16 additions & 7 deletions lib/opus/pipeline/stage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ defmodule Opus.Pipeline.Stage do
def with_retries({module, %{retry_times: _times, stage_id: _id} = opts}, fun) do
result = fun.()

case result |> handle_run({:_, :_, :_}) do
case result |> handle_run(%{stage: {:_, :_, :_, :_}, input: :_}) do
{:halt, _} -> with_retries({module, opts}, fun, %{failures: 1})
_ -> result
end
Expand All @@ -95,7 +95,7 @@ defmodule Opus.Pipeline.Stage do

result = fun.()

case {failures < times, result |> handle_run({:_, :_, :_})} do
case {failures < times, result |> handle_run(%{stage: {:_, :_, :_, :_}, input: :_})} do
{true, {:halt, _}} ->
with_retries({module, put_in(stage[:delays], delays)}, fun, %{failures: failures + 1})

Expand All @@ -104,17 +104,26 @@ defmodule Opus.Pipeline.Stage do
end
end

def handle_run(:error, {module, name, input}),
def handle_run(:error, %{stage: {module, _type, name, %{error_message: message}}, input: input}),
do:
{:halt,
{:error,
%PipelineError{error: "Pipeline failed", pipeline: module, stage: name, input: input}}}
%PipelineError{error: message, pipeline: module, stage: name, input: input}}}

def handle_run({:error, e}, {module, name, input}),
def handle_run(:error, %{stage: {module, _type, name, _opts}, input: input}),
do:
{:halt,
{:error,
%PipelineError{error: "stage failed", pipeline: module, stage: name, input: input}}}

def handle_run({:error, _}, %{stage: {module, _type, name, %{error_message: message}}, input: input}),
do: {:halt, {:error, %PipelineError{error: message, pipeline: module, stage: name, input: input}}}

def handle_run({:error, e}, %{stage: {module, _type, name, _opts}, input: input}),
do: {:halt, {:error, %PipelineError{error: e, pipeline: module, stage: name, input: input}}}

def handle_run(:stage_skipped, {_module, _name, input}), do: {:cont, input}
def handle_run(res, {_module, _name, _input}), do: {:cont, res}
def handle_run(:stage_skipped, %{stage: _stage, input: input}), do: {:cont, input}
def handle_run(res, %{stage: _stage, input: _input}), do: {:cont, res}

defp do_run({module, type, name, %{with: :anonymous, stage_id: id} = opts}, input) do
callback = (module._opus_callbacks[id] |> Enum.find(fn %{type: t} -> t == :with end)).name
Expand Down
4 changes: 2 additions & 2 deletions lib/opus/pipeline/stage/step.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ defmodule Opus.Pipeline.Stage.Step do

@behaviour Stage

def run({module, _, name, _opts} = stage, input) do
stage |> Stage.maybe_run(input) |> Stage.handle_run({module, name, input})
def run(stage, input) do
stage |> Stage.maybe_run(input) |> Stage.handle_run(%{stage: stage, input: input})
end
end
23 changes: 23 additions & 0 deletions test/opus/pipeline/stage_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,29 @@ defmodule Opus.Pipeline.StageTest do
end
end

describe ":error_message option, with a message" do
defmodule ErrorMessagePipeline do
use Opus.Pipeline

step :double, error_message: :failed_to_double
step :maybe_fail

def double(n) when is_number(n), do: n * 2
def maybe_fail(10), do: raise "this will fail"
def maybe_fail(n), do: n
end

alias ErrorMessagePipeline, as: Subject

test "when the stage fails, the original error message is replaced with the :error_message option" do
assert {:error, %Opus.PipelineError{error: :failed_to_double}} = Subject.call(:not_a_number)
end

test "when the stage fails and no :error_message option is set, returns the original error" do
assert {:error, %Opus.PipelineError{error: %RuntimeError{message: "this will fail"}}} = Subject.call(5)
end
end

def time_diff(t2, t1) do
:timer.now_diff(t2, t1) / 1000
end
Expand Down

0 comments on commit d89060f

Please sign in to comment.