Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass ruby objects between states #222

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions lib/floe/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,27 @@ def run(args = ARGV)
opts[:credentials] == "-" ? $stdin.read : opts[:credentials]
elsif opts[:credentials_file_given]
File.read(opts[:credentials_file])
else
{}
end

workflows =
workflows_inputs.each_slice(2).map do |workflow, input|
context = Floe::Workflow::Context.new(opts[:context], :input => input, :credentials => credentials)
context_hash = from_json(workflow, "context", opts[:context])
input = from_json(workflow, "input", input)
break unless !context_hash.nil? && !input.nil?

context = Floe::Workflow::Context.new(context_hash, :input => input, :credentials => credentials)
Floe::Workflow.load(workflow, context)
end
return if workflows.nil?

Floe::Workflow.wait(workflows, &:run_nonblock)

# Display status
workflows.each do |workflow|
puts "", "#{workflow.name}#{" (#{workflow.status})" unless workflow.context.success?}", "===" if workflows.size > 1
puts workflow.output.inspect
puts workflow.output
end

workflows.all? { |workflow| workflow.context.success? }
Expand Down Expand Up @@ -79,5 +86,13 @@ def parse_options!(args)

return workflows_inputs, opts
end

def from_json(workflow, name, value)
value ? JSON.parse(value) : {}
rescue JSON::ParserError => err
warn "invalid #{name} for #{workflow} -- #{err.message}"
warn "#{name}: #{value}"
nil
end
end
end
21 changes: 16 additions & 5 deletions lib/floe/workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,21 @@ class Workflow
include Logging

class << self
# used by providers-workflow
def from_json(payload_str, context_str, credentials_str, name = nil)
payload = JSON.parse(payload_str)
context_hash = JSON.parse(context_str)
credentials = JSON.parse(credentials_str)
context = Context.new(context_hash, :credentials => credentials)

new(payload, context, nil, name)
end

# used by exe/floe
def load(path_or_io, context = nil, credentials = {}, name = nil)
payload = path_or_io.respond_to?(:read) ? path_or_io.read : File.read(path_or_io)
payload = JSON.parse(payload)

# default the name if it is a filename and none was passed in
name ||= path_or_io.respond_to?(:read) ? "stream" : path_or_io.split("/").last.split(".").first

Expand Down Expand Up @@ -87,10 +100,8 @@ def wait(workflows, timeout: nil, &block)

attr_reader :context, :payload, :states, :states_by_name, :start_at, :name, :comment

def initialize(payload, context = nil, credentials = nil, name = nil)
payload = JSON.parse(payload) if payload.kind_of?(String)
credentials = JSON.parse(credentials) if credentials.kind_of?(String)
context = Context.new(context) unless context.kind_of?(Context)
def initialize(payload, context = {}, credentials = nil, name = nil)
context = Context.new(context) unless context.kind_of?(Context)

# backwards compatibility
# caller should really put credentials into context and not pass that variable
Expand Down Expand Up @@ -157,7 +168,7 @@ def status
end

def output
context.output if end?
context.json_output if end?
end

def end?
Expand Down
19 changes: 10 additions & 9 deletions lib/floe/workflow/context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,18 @@ class Workflow
class Context
attr_accessor :credentials

# @param context [Json|Hash] (default, create another with input and execution params)
# @param context [String|Array|Hash] (default, create another with input and execution params)
# @param input [Hash] (default: {})
def initialize(context = nil, input: nil, credentials: {})
context = JSON.parse(context) if context.kind_of?(String)

input ||= {}
input = JSON.parse(input) if input.kind_of?(String)

@context = context || {}
self["Execution"] ||= {}
self["Execution"]["Input"] ||= input
self["Execution"]["Input"] ||= input || {}
self["State"] ||= {}
self["StateHistory"] ||= []
self["StateMachine"] ||= {}
self["Task"] ||= {}

@credentials = credentials || {}
rescue JSON::ParserError => err
raise Floe::InvalidWorkflowError, err.message
end

def execution
Expand Down Expand Up @@ -54,10 +47,18 @@ def input
state["Input"]
end

def json_input
input.to_json
end

def output
state["Output"]
end

def json_output
output.to_json
end

def output=(val)
state["Output"] = val
end
Expand Down
4 changes: 2 additions & 2 deletions lib/floe/workflow/state.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def run_nonblock!(context)
def start(context)
context.state["EnteredTime"] = Time.now.utc.iso8601

logger.info("Running state: [#{long_name}] with input [#{context.input}]...")
logger.info("Running state: [#{long_name}] with input [#{context.json_input}]...")
end

def finish(context)
Expand All @@ -65,7 +65,7 @@ def finish(context)
context.state["Duration"] = finished_time - entered_time

level = context.failed? ? :error : :info
logger.public_send(level, "Running state: [#{long_name}] with input [#{context.input}]...Complete #{context.next_state ? "- next state [#{context.next_state}]" : "workflow -"} output: [#{context.output}]")
logger.public_send(level, "Running state: [#{long_name}] with input [#{context.json_input}]...Complete #{context.next_state ? "- next state [#{context.next_state}]" : "workflow -"} output: [#{context.json_output}]")

0
end
Expand Down
6 changes: 3 additions & 3 deletions lib/floe/workflow/states/task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def retry_state!(context, error)
wait_until!(context, :seconds => retrier.sleep_duration(context["State"]["RetryCount"]))
context.next_state = context.state_name
context.output = error
logger.info("Running state: [#{long_name}] with input [#{context.input}] got error[#{context.output}]...Retry - delay: #{wait_until(context)}")
logger.info("Running state: [#{long_name}] with input [#{context.json_input}] got error[#{context.json_output}]...Retry - delay: #{wait_until(context)}")
true
end

Expand All @@ -116,7 +116,7 @@ def catch_error!(context, error)

context.next_state = catcher.next
context.output = catcher.result_path.set(context.input, error)
logger.info("Running state: [#{long_name}] with input [#{context.input}]...CatchError - next state: [#{context.next_state}] output: [#{context.output}]")
logger.info("Running state: [#{long_name}] with input [#{context.json_input}]...CatchError - next state: [#{context.next_state}] output: [#{context.json_output}]")

true
end
Expand All @@ -126,7 +126,7 @@ def fail_workflow!(context, error)
# keeping in here for completeness
context.next_state = nil
context.output = error
logger.error("Running state: [#{long_name}] with input [#{context.input}]...Complete workflow - output: [#{context.output}]")
logger.error("Running state: [#{long_name}] with input [#{context.json_input}]...Complete workflow - output: [#{context.json_output}]")
end

def parse_error(output)
Expand Down
14 changes: 7 additions & 7 deletions spec/cli_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

lines = output.lines(:chomp => true)
expect(lines.first).to include("checking 1 workflows...")
expect(lines.last).to eq('{"foo"=>1}')
expect(lines.last).to eq('{"foo":1}')
end

it "with a bare workflow and --input" do
Expand All @@ -48,7 +48,7 @@

lines = output.lines(:chomp => true)
expect(lines.first).to include("checking 1 workflows...")
expect(lines.last).to eq('{"foo"=>1}')
expect(lines.last).to eq('{"foo":1}')
end

it "with --workflow and no input" do
Expand All @@ -66,7 +66,7 @@

lines = output.lines(:chomp => true)
expect(lines.first).to include("checking 1 workflows...")
expect(lines.last).to eq('{"foo"=>1}')
expect(lines.last).to eq('{"foo":1}')
end

it "with a bare workflow and --workflow" do
Expand Down Expand Up @@ -94,11 +94,11 @@
expect(lines.last(7).join("\n")).to eq(<<~OUTPUT.chomp)
workflow
===
{"foo"=>1}
{"foo":1}

workflow
===
{"foo"=>2}
{"foo":2}
OUTPUT
end

Expand All @@ -111,11 +111,11 @@
expect(lines.last(7).join("\n")).to eq(<<~OUTPUT.chomp)
workflow
===
{"foo"=>1}
{"foo":1}

workflow
===
{"foo"=>1}
{"foo":1}
OUTPUT
end

Expand Down
2 changes: 1 addition & 1 deletion spec/workflow/context_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
end

context "with a simple string input" do
let(:input) { "\"foo\"" }
let(:input) { "foo" }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example. I think this is actually invalid input, right?

Copy link
Member Author

@kbrock kbrock Jun 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is invalid input for CLI.

In the CLI / floe from the command line, I want to see "\"foo\"".
And possibly the methods for manageiq-providers-workflow would pass in JSON strings.
But once something is in our inner classes, I want to see ruby.


it "sets the input" do
expect(ctx.execution["Input"]).to eq("foo")
Expand Down
16 changes: 5 additions & 11 deletions spec/workflow_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,6 @@
expect { described_class.new(payload) }.to raise_error(Floe::InvalidWorkflowError, "State name [#{truncated_state_name}] must be less than or equal to 80 characters")
end

it "raises an exception for invalid context" do
payload = make_payload({"FirstState" => {"Type" => "Success"}})

expect { described_class.new(payload, "invalid context") }.to raise_error(Floe::InvalidWorkflowError, "unexpected token at 'invalid context'")
end

it "raises an exception for invalid resource scheme in a Task state" do
payload = make_payload({"FirstState" => {"Type" => "Task", "Resource" => "invalid://foo", "End" => true}})

Expand Down Expand Up @@ -88,7 +82,7 @@
expect(ctx.ended?).to eq(true)

# final results
expect(workflow.output).to eq(input)
expect(workflow.output).to eq(input.to_json)
expect(workflow.status).to eq("success")
expect(workflow.end?).to eq(true)
end
Expand All @@ -113,7 +107,7 @@
expect(ctx.ended?).to eq(true)

# final results
expect(workflow.output).to eq({"Cause" => "Bad Stuff", "Error" => "Issue"})
expect(workflow.output).to eq('{"Error":"Issue","Cause":"Bad Stuff"}')
expect(workflow.status).to eq("failure")
expect(workflow.end?).to eq(true)
end
Expand Down Expand Up @@ -148,7 +142,7 @@
workflow.start_workflow
workflow.step_nonblock

expect(workflow.output).to eq(input)
expect(workflow.output).to eq(input.to_json)
expect(workflow.status).to eq("success")
expect(workflow.end?).to eq(true)
expect(ctx.output).to eq(input)
Expand Down Expand Up @@ -187,7 +181,7 @@
# step_nonblock should return 0 and mark the workflow as completed
expect(workflow.step_nonblock).to eq(0)

expect(workflow.output).to eq(input)
expect(workflow.output).to eq(input.to_json)
expect(workflow.status).to eq("success")
expect(workflow.end?).to eq(true)
expect(ctx.output).to eq(input)
Expand Down Expand Up @@ -243,7 +237,7 @@
expect(ctx.running?).to eq(false)
expect(ctx.ended?).to eq(true)

expect(workflow.output).to eq(input)
expect(workflow.output).to eq(input.to_json)
end
end

Expand Down