Skip to content

Commit

Permalink
Merge pull request #136 from agrare/workflow_payload_validation
Browse files Browse the repository at this point in the history
Validate that the workflow payload is correct
  • Loading branch information
kbrock committed Nov 1, 2023
2 parents 7399807 + 8b0590d commit 97b81d3
Show file tree
Hide file tree
Showing 14 changed files with 149 additions and 19 deletions.
1 change: 1 addition & 0 deletions lib/floe.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
require_relative "floe/workflow/states/choice"
require_relative "floe/workflow/states/fail"
require_relative "floe/workflow/states/map"
require_relative "floe/workflow/states/non_terminal_mixin"
require_relative "floe/workflow/states/parallel"
require_relative "floe/workflow/states/pass"
require_relative "floe/workflow/states/succeed"
Expand Down
4 changes: 4 additions & 0 deletions lib/floe/workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ def initialize(payload, context = nil, credentials = {})
credentials = JSON.parse(credentials) if credentials.kind_of?(String)
context = Context.new(context) unless context.kind_of?(Context)

raise Floe::InvalidWorkflowError, "Missing field \"States\"" if payload["States"].nil?
raise Floe::InvalidWorkflowError, "Missing field \"StartAt\"" if payload["StartAt"].nil?
raise Floe::InvalidWorkflowError, "\"StartAt\" not in the \"States\" field" unless payload["States"].key?(payload["StartAt"])

@payload = payload
@context = context
@credentials = credentials
Expand Down
3 changes: 3 additions & 0 deletions lib/floe/workflow/path.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ def value(payload, context, input = {})

def initialize(payload)
@payload = payload

raise Floe::InvalidWorkflowError, "Path [#{payload}] must be a string" if payload.nil? || !payload.kind_of?(String)
raise Floe::InvalidWorkflowError, "Path [#{payload}] must start with \"$\"" if payload[0] != "$"
end

def value(context, input = {})
Expand Down
4 changes: 4 additions & 0 deletions lib/floe/workflow/state.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class State
class << self
def build!(workflow, name, payload)
state_type = payload["Type"]
raise Floe::InvalidWorkflowError, "Missing \"Type\" field in state [#{name}]" if payload["Type"].nil?

begin
klass = Floe::Workflow::States.const_get(state_type)
Expand All @@ -27,6 +28,9 @@ def initialize(workflow, name, payload)
@payload = payload
@type = payload["Type"]
@comment = payload["Comment"]

raise Floe::InvalidWorkflowError, "Missing \"Type\" field in state [#{name}]" if payload["Type"].nil?
raise Floe::InvalidWorkflowError, "State name [#{name}] must be less than or equal to 80 characters" if name.length > 80
end

def run!(_input = nil)
Expand Down
18 changes: 18 additions & 0 deletions lib/floe/workflow/states/choice.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ class Choice < Floe::Workflow::State
def initialize(workflow, name, payload)
super

validate_state!

@choices = payload["Choices"].map { |choice| ChoiceRule.build(choice) }
@default = payload["Default"]

Expand All @@ -33,6 +35,22 @@ def running?
def end?
false
end

private

def validate_state!
validate_state_choices!
validate_state_default!
end

def validate_state_choices!
raise Floe::InvalidWorkflowError, "Choice state must have \"Choices\"" unless payload.key?("Choices")
raise Floe::InvalidWorkflowError, "\"Choices\" must be a non-empty array" unless payload["Choices"].kind_of?(Array) && !payload["Choices"].empty?
end

def validate_state_default!
raise Floe::InvalidWorkflowError, "\"Default\" not in \"States\"" unless workflow.payload["States"].include?(payload["Default"])
end
end
end
end
Expand Down
14 changes: 14 additions & 0 deletions lib/floe/workflow/states/non_terminal_mixin.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# frozen_string_literal: true

module Floe
class Workflow
module States
module NonTerminalMixin
def validate_state_next!
raise Floe::InvalidWorkflowError, "Missing \"Next\" field in state [#{name}]" if @next.nil? && !@end
raise Floe::InvalidWorkflowError, "\"Next\" [#{@next}] not in \"States\" for state [#{name}]" if @next && !workflow.payload["States"].key?(@next)
end
end
end
end
end
10 changes: 10 additions & 0 deletions lib/floe/workflow/states/pass.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ module Floe
class Workflow
module States
class Pass < Floe::Workflow::State
include NonTerminalMixin

attr_reader :end, :next, :result, :parameters, :input_path, :output_path, :result_path

def initialize(workflow, name, payload)
Expand All @@ -17,6 +19,8 @@ def initialize(workflow, name, payload)
@input_path = Path.new(payload.fetch("InputPath", "$"))
@output_path = Path.new(payload.fetch("OutputPath", "$"))
@result_path = ReferencePath.new(payload.fetch("ResultPath", "$"))

validate_state!
end

def start(input)
Expand All @@ -36,6 +40,12 @@ def running?
def end?
@end
end

private

def validate_state!
validate_state_next!
end
end
end
end
Expand Down
8 changes: 8 additions & 0 deletions lib/floe/workflow/states/task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ module Floe
class Workflow
module States
class Task < Floe::Workflow::State
include NonTerminalMixin

attr_reader :credentials, :end, :heartbeat_seconds, :next, :parameters,
:result_selector, :resource, :timeout_seconds, :retry, :catch,
:input_path, :output_path, :result_path
Expand All @@ -25,6 +27,8 @@ def initialize(workflow, name, payload)
@parameters = PayloadTemplate.new(payload["Parameters"]) if payload["Parameters"]
@result_selector = PayloadTemplate.new(payload["ResultSelector"]) if payload["ResultSelector"]
@credentials = PayloadTemplate.new(payload["Credentials"]) if payload["Credentials"]

validate_state!
end

def start(input)
Expand Down Expand Up @@ -72,6 +76,10 @@ def end?

attr_reader :runner

def validate_state!
validate_state_next!
end

def success?
runner.success?(context.state["RunnerContext"])
end
Expand Down
8 changes: 8 additions & 0 deletions lib/floe/workflow/states/wait.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ module Floe
class Workflow
module States
class Wait < Floe::Workflow::State
include NonTerminalMixin

attr_reader :end, :next, :seconds, :input_path, :output_path

def initialize(workflow, name, payload)
Expand All @@ -20,6 +22,8 @@ def initialize(workflow, name, payload)

@input_path = Path.new(payload.fetch("InputPath", "$"))
@output_path = Path.new(payload.fetch("OutputPath", "$"))

validate_state!
end

def start(input)
Expand All @@ -41,6 +45,10 @@ def end?

private

def validate_state!
validate_state_next!
end

def please_hold(input)
wait(
:seconds => @seconds_path ? @seconds_path.value(context, input).to_i : @seconds,
Expand Down
9 changes: 9 additions & 0 deletions spec/workflow/path_spec.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
RSpec.describe Floe::Workflow::Path do
it "raises an exception if the payload isn't a string" do
expect { described_class.new(nil) }.to raise_error(Floe::InvalidWorkflowError, "Path [] must be a string")
expect { described_class.new(0) }.to raise_error(Floe::InvalidWorkflowError, "Path [0] must be a string")
end

it "raises an exception if the first character isn't a $" do
expect { described_class.new("foo") }.to raise_error(Floe::InvalidWorkflowError, "Path [foo] must start with \"$\"")
end

describe "#value" do
context "referencing the global context" do
it "with a missing value" do
Expand Down
20 changes: 20 additions & 0 deletions spec/workflow/states/choice_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,26 @@
)
end

it "raises an exception if Choices is missing" do
payload = {"StartAt" => "Choice", "States" => {"Choice" => {"Type" => "Choice", "Default" => "DefaultState"}, "DefaultState" => {"type" => "Succeed"}}}
expect { Floe::Workflow.new(payload) }.to raise_error(Floe::InvalidWorkflowError, "Choice state must have \"Choices\"")
end

it "raises an exception if Choices is not an array" do
payload = {"StartAt" => "Choice", "States" => {"Choice" => {"Type" => "Choice", "Choices" => {}, "Default" => "DefaultState"}, "DefaultState" => {"type" => "Succeed"}}}
expect { Floe::Workflow.new(payload) }.to raise_error(Floe::InvalidWorkflowError, "\"Choices\" must be a non-empty array")
end

it "raises an exception if Choices is an empty array" do
payload = {"StartAt" => "Choice", "States" => {"Choice" => {"Type" => "Choice", "Choices" => [], "Default" => "DefaultState"}, "DefaultState" => {"type" => "Succeed"}}}
expect { Floe::Workflow.new(payload) }.to raise_error(Floe::InvalidWorkflowError, "\"Choices\" must be a non-empty array")
end

it "raises an exception if Default isn't a valid state" do
payload = {"StartAt" => "Choice", "States" => {"Choice" => {"Type" => "Choice", "Choices" => [{"Variable" => "$.foo", "NumericEquals" => 1, "Next" => "FirstMatchState"}], "Default" => "MissingState"}}}
expect { Floe::Workflow.new(payload) }.to raise_error(Floe::InvalidWorkflowError, "\"Default\" not in \"States\"")
end

it "#end?" do
expect(state.end?).to eq(false)
end
Expand Down
26 changes: 13 additions & 13 deletions spec/workflow/states/task_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

describe "Input" do
context "with no InputPath" do
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource}}) }
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "End" => true}}) }

it "passes the whole context to the resource" do
expect_run_async({"foo" => {"bar" => "baz"}, "bar" => {"baz" => "foo"}}, :output => "hello, world!")
Expand All @@ -23,7 +23,7 @@
end

context "with an InputPath" do
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "InputPath" => "$.foo"}}) }
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "InputPath" => "$.foo", "End" => true}}) }

it "filters the context passed to the resource" do
expect_run_async({"bar" => "baz"}, :output => nil)
Expand All @@ -33,7 +33,7 @@
end

context "with Parameters" do
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Parameters" => {"var1.$" => "$.foo.bar"}}}) }
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Parameters" => {"var1.$" => "$.foo.bar"}, "End" => true}}) }

it "passes the interpolated parameters to the resource" do
expect_run_async({"var1" => "baz"}, :output => nil)
Expand All @@ -44,7 +44,7 @@
end

describe "Output" do
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource}}) }
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "End" => true}}) }

it "uses the last line as output if it is JSON" do
expect_run_async({"foo" => {"bar" => "baz"}, "bar" => {"baz" => "foo"}}, :output => "ABCD\nHELLO\n{\"response\":[\"192.168.1.2\"]}")
Expand Down Expand Up @@ -73,7 +73,7 @@
end

context "ResultSelector" do
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "ResultSelector" => {"ip_addrs.$" => "$.response"}}}) }
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "ResultSelector" => {"ip_addrs.$" => "$.response"}, "End" => true}}) }

it "filters the results" do
expect_run_async({"foo" => {"bar" => "baz"}, "bar" => {"baz" => "foo"}}, :output => "ABCD\nHELLO\n{\"response\":[\"192.168.1.2\"],\"exit_code\":0}")
Expand All @@ -85,7 +85,7 @@
end

context "ResultPath" do
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "ResultPath" => "$.ip_addrs"}}) }
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "ResultPath" => "$.ip_addrs", "End" => true}}) }

it "inserts the response into the input" do
expect_run_async(input, :output => "[\"192.168.1.2\"]")
Expand All @@ -101,7 +101,7 @@
end

context "OutputPath" do
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "ResultPath" => "$.data.ip_addrs", "OutputPath" => output_path}}) }
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "ResultPath" => "$.data.ip_addrs", "OutputPath" => output_path, "End" => true}}) }

context "with the default '$'" do
let(:output_path) { "$" }
Expand Down Expand Up @@ -134,7 +134,7 @@
end

describe "Retry" do
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Retry" => retriers, "TimeoutSeconds" => 2}}) }
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Retry" => retriers, "TimeoutSeconds" => 2, "End" => true}}) }

context "with specific errors" do
let(:retriers) { [{"ErrorEquals" => ["States.Timeout"], "MaxAttempts" => 1}] }
Expand All @@ -150,7 +150,7 @@
end

context "with multiple retriers" do
let(:retriers) { [{"ErrorEquals" => ["States.Timeout"], "MaxAttempts" => 1}, {"ErrorEquals" => ["Exception"]}] }
let(:retriers) { [{"ErrorEquals" => ["States.Timeout"], "MaxAttempts" => 1}, {"ErrorEquals" => ["Exception"], "End" => true}] }

it "resets the retrier if a different exception is raised" do
expect_run_async(input, :error => "States.Timeout")
Expand Down Expand Up @@ -209,7 +209,7 @@
end

context "with a Catch" do
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Retry" => [{"ErrorEquals" => ["States.Timeout"]}], "Catch" => [{"ErrorEquals" => ["States.ALL"], "Next" => "FailState"}]}}) }
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Retry" => [{"ErrorEquals" => ["States.Timeout"]}], "Catch" => [{"ErrorEquals" => ["States.ALL"], "Next" => "FailState"}], "End" => true}}) }

it "retry preceeds catch" do
expect_run_async(input, :error => "States.Timeout")
Expand All @@ -233,7 +233,7 @@

describe "Catch" do
context "with specific errors" do
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Catch" => [{"ErrorEquals" => ["States.Timeout"], "Next" => "FirstState"}]}}) }
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Catch" => [{"ErrorEquals" => ["States.Timeout"], "Next" => "FirstState"}], "End" => true}}) }

it "catches the exception" do
expect_run_async(input, :output => "States.Timeout", :success => false)
Expand All @@ -255,7 +255,7 @@
end

context "with a State.ALL catcher" do
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Catch" => [{"ErrorEquals" => ["States.Timeout"], "Next" => "FirstState"}, {"ErrorEquals" => ["States.ALL"], "Next" => "FailState"}]}}) }
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Catch" => [{"ErrorEquals" => ["States.Timeout"], "Next" => "FirstState"}, {"ErrorEquals" => ["States.ALL"], "Next" => "FailState"}], "End" => true}}) }

it "catches a more specific exception" do
expect_run_async(input, :output => "States.Timeout", :success => false)
Expand All @@ -278,7 +278,7 @@

describe "#end?" do
it "with a normal state" do
workflow = make_workflow(ctx, {"FirstState" => {"Type" => "Task", "Resource" => resource, "Next" => "ChoiceState"}})
workflow = make_workflow(ctx, {"FirstState" => {"Type" => "Task", "Resource" => resource, "Next" => "SuccessState"}, "SuccessState" => {"Type" => "Succeed"}})
state = workflow.current_state
expect(state.end?).to be false
end
Expand Down
8 changes: 4 additions & 4 deletions spec/workflow/states/wait_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

describe "#running?" do
context "with seconds" do
let(:workflow) { make_workflow(ctx, {"WaitState" => {"Type" => "Wait", "Seconds" => 1, "Next" => "SuccessState"}}) }
let(:workflow) { make_workflow(ctx, {"WaitState" => {"Type" => "Wait", "Seconds" => 1, "Next" => "SuccessState"}, "SuccessState" => {"Type" => "Succeed"}}) }
it "is running before finished" do
state.start(ctx.input)
expect(state.running?).to be_truthy
Expand All @@ -36,7 +36,7 @@

context "with secondsPath" do
let(:input) { {"expire" => "1"} }
let(:workflow) { make_workflow(ctx, {"WaitState" => {"Type" => "Wait", "SecondsPath" => "$.expire", "Next" => "SuccessState"}}) }
let(:workflow) { make_workflow(ctx, {"WaitState" => {"Type" => "Wait", "SecondsPath" => "$.expire", "Next" => "SuccessState"}, "SuccessState" => {"Type" => "Succeed"}}) }
it "is running? before finished" do
state.start(ctx.input)
expect(state.running?).to be_truthy
Expand All @@ -52,7 +52,7 @@

context "with timestamp" do
let(:expiry) { Time.now.utc + 1 }
let(:workflow) { make_workflow(ctx, {"WaitState" => {"Type" => "Wait", "Timestamp" => expiry.iso8601, "Next" => "SuccessState"}}) }
let(:workflow) { make_workflow(ctx, {"WaitState" => {"Type" => "Wait", "Timestamp" => expiry.iso8601, "Next" => "SuccessState"}, "SuccessState" => {"Type" => "Succeed"}}) }
it "is running? before finished" do
state.start(ctx.input)
expect(state.running?).to be_truthy
Expand All @@ -69,7 +69,7 @@
context "with timestamp" do
let(:expiry) { Time.now.utc + 1 }
let(:input) { {"expire" => expiry.iso8601} }
let(:workflow) { make_workflow(ctx, {"WaitState" => {"Type" => "Wait", "TimestampPath" => "$.expire", "Next" => "SuccessState"}}) }
let(:workflow) { make_workflow(ctx, {"WaitState" => {"Type" => "Wait", "TimestampPath" => "$.expire", "Next" => "SuccessState"}, "SuccessState" => {"Type" => "Succeed"}}) }
it "is running? before finished" do
state.start(ctx.input)
expect(state.running?).to be_truthy
Expand Down
Loading

0 comments on commit 97b81d3

Please sign in to comment.