Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] [POC] Awesome runner #178

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions examples/awesome.asl
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"Comment": "Directory Listing",
"StartAt": "a",
"States": {
"a":{
"Type": "Pass",
"Next": "b"
},
"b": {
"Type": "Wait",
"Seconds": 1,
"Next": "ls"
},
"ls": {
"Type": "Task",
"Resource": "awesome://ls -l Gemfile",
"Comment": "awesome://ls -l $FILENAME",
"Next": "c",
"Parameters": {
"FILENAME" : "Gemfile"
}
},
"c": {
"Type": "Succeed"
}
}
}
130 changes: 130 additions & 0 deletions lib/floe/awesome_runner.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# frozen_string_literal: true

require "concurrent/array"

module Floe
class AwesomeProcess < Thread
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would have preferred not using a thread per process and not using an AwesomeSpan centric model.

attr_reader :result
attr_accessor :error

def initialize(queue, context, *args)
self.report_on_exception = true
@processed = false
@context = context

# Don't like changing the value of context here,
# but want to make sure thread is set before the `queue.push`
# `queue.pop` will look potentially at status, which is through thread
context["thread"] = self

super do
@result = AwesomeSpawn.run(*args)

# this is changing the value of the context
# in the non-main thread
# Potential race condition here
Floe::AwesomeRunner.populate_results!(@context, :result => @result)

# trigger an event
queue.push(["delete", context])
rescue => err
# Shouldn't ever get in here
@error = err

Floe::AwesomeRunner.populate_results!(@context, :error => err)

# trigger an event
queue.push(["delete", context])
end
end
end

class AwesomeRunner < Floe::Runner
SCHEME = "awesome"
SCHEME_PREFIX = "#{SCHEME}://"
SCHEME_OFFSET = SCHEME.length + 3

# only exposed for tests
# use wait instead
attr_reader :queue

def initialize(_options = {})
require "awesome_spawn"

# events triggered
@queue = Queue.new

super
end

# @return [Hash] runner_context
def run_async!(resource, params = {}, _secrets = {}, _context = {})
raise ArgumentError, "Invalid resource" unless resource&.start_with?(SCHEME_PREFIX)

args = resource[SCHEME_OFFSET..].split
method = args.shift

runner_context = {}

# NOTE: this adds itself to the runner_context
AwesomeProcess.new(@queue, runner_context, method, :env => params, :params => args)

runner_context
end

def status!(runner_context)
# check if it has no output (i.e.: we think it is running) but it is not running
if !runner_context.key?("Output") && !runner_context["thread"]&.alive?
runner_context["Output"] = {"Error" => "Lambda.Unknown", "Cause" => "no output and no thread"}
runner_context["Error"] = true
end
end

def running?(runner_context)
!runner_context["Output"]
end

def success?(runner_context)
!runner_context["Error"]
end

def output(runner_context)
runner_context["Output"]
end

def cleanup(runner_context)
runner_context["thread"] = nil
end

def wait(timeout: nil, _events: %i[create update delete])
# TODO: implement whole interface
raise "wait needs a block and doesn't support timeout" unless timeout.nil? && block_given?

loop do
event_context = @queue.pop
yield event_context if block_given?
end
end

# internal methods

def self.command_error_cause(command_result)
command_result.error.nil? || command_result.error.empty? ? command_result.output.to_s : command_result.error.to_s
end

def self.populate_results!(runner_context, result: nil, error: nil)
error ||= command_error_cause(result) if result&.failure?

if error
runner_context["Output"] = {"Error" => "States.TaskFailed", "Cause" => error}
runner_context["Error"] = true
else
runner_context["Output"] = {"Result" => result.output.chomp.split("\n")}
end

runner_context
end
end
end

Floe::Runner.register_scheme(Floe::AwesomeRunner::SCHEME, Floe::AwesomeRunner.new)
1 change: 1 addition & 0 deletions lib/floe/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ def initialize
require "optimist"
require "floe"
require "floe/container_runner"
require "floe/awesome_runner"
require "logger"

Floe.logger = Logger.new($stdout)
Expand Down
4 changes: 4 additions & 0 deletions lib/floe/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ def for_resource(resource)
scheme = resource.split("://").first
resolve_scheme(scheme) || raise(ArgumentError, "Invalid resource scheme [#{scheme}]")
end

def runners
@runners.each_value.map { |runner| runner.kind_of?(Proc) ? runner.call : runner }
end
end

# Run a command asynchronously and create a runner_context
Expand Down
23 changes: 14 additions & 9 deletions lib/floe/workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,21 @@ def wait(workflows, timeout: nil, &block)
workflows = [workflows] if workflows.kind_of?(self)
logger.info("checking #{workflows.count} workflows...")

run_until = Time.now.utc + timeout if timeout.to_i > 0
ready = []
queue = Queue.new
wait_thread = Thread.new do
loop do
Runner.for_resource("docker").wait do |event, runner_context|
queue.push([event, runner_context])
run_until = Time.now.utc + timeout if timeout.to_i > 0
ready = []
queue = Queue.new
wait_threads =
Runner.runners.map do |runner|
next unless runner.respond_to?(:wait)

Thread.new do
loop do
runner.wait do |event, runner_context|
queue.push([event, runner_context])
end
end
end
end
end

loop do
ready = workflows.select(&:step_nonblock_ready?)
Expand Down Expand Up @@ -81,7 +86,7 @@ def wait(workflows, timeout: nil, &block)
logger.info("checking #{workflows.count} workflows...Complete - #{ready.count} ready")
ready
ensure
wait_thread&.kill
wait_threads.compact.map(&:kill)
end
end

Expand Down
80 changes: 80 additions & 0 deletions spec/awesome_runner_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
require_relative "../lib/floe/awesome_runner"

RSpec.describe Floe::AwesomeRunner, :uses_awesome_spawn => true do
let(:subject) { described_class.new(runner_options) }
let(:runner_options) { {} }
let(:container_id) { SecureRandom.hex }

# let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Parameters" => {"var1.$" => "$.foo.bar"}, "End" => true}}) }

describe "#run_async!" do
it "raises an exception without a resource" do
expect { subject.run_async!(nil) }.to raise_error(ArgumentError, "Invalid resource")
end

it "raises an exception for an invalid resource uri" do
expect { subject.run_async!("arn:abcd:efgh") }.to raise_error(ArgumentError, "Invalid resource")
end

it "calls command run with the command name" do
stub_good_run("ls", :params => [], :env => {}, :output => "file\nlisting\n")

subject.run_async!("awesome://ls")
subject.queue.pop
end

it "passes environment variables to command run" do
stub_good_run("ls", :params => [], :env => {"FOO" => "BAR"}, :output => "file\nlisting\n")

subject.run_async!("awesome://ls", {"FOO" => "BAR"})
subject.queue.pop
end
end

# describe "#status!" do
# let(:runner_context) { {"container_ref" => container_id} }

# it "returns the updated container_state" do
# stub_good_run!("ls", :params => ["inspect", container_id], :output => "[{\"State\": {\"Running\": true}}]")

# subject.status!(runner_context)

# expect(runner_context).to include("container_state" => {"Running" => true})
# end
# end

describe "#running?" do
# it "retuns true when running" do
# runner_context = {"container_ref" => container_id, "container_state" => {"Running" => true}}
# expect(subject.running?(runner_context)).to be_truthy
# end

# it "retuns false when not running" do
# runner_context = {"container_ref" => container_id, "container_state" => {"Running" => false, "ExitCode" => 0}}
# expect(subject.running?(runner_context)).to be_falsey
# end
end

describe "#success?" do
# it "retuns true when successful" do
# runner_context = {"container_ref" => container_id, "container_state" => {"Running" => false, "ExitCode" => 0}}
# expect(subject.success?(runner_context)).to be_truthy
# end

# it "retuns false when not successful" do
# runner_context = {"container_ref" => container_id, "container_state" => {"Running" => false, "ExitCode" => 1}}
# expect(subject.success?(runner_context)).to be_falsey
# end
end

describe "#output" do
let(:runner_context) { {"Output" => ["output1", "output2"]} }

it "returns log output" do
expect(subject.output(runner_context)).to eq(["output1", "output2"])
end
end

# describe "#cleanup" do
# end
end