diff --git a/lib/floe/awesome_runner.rb b/lib/floe/awesome_runner.rb index 3246d539..8520d9d5 100644 --- a/lib/floe/awesome_runner.rb +++ b/lib/floe/awesome_runner.rb @@ -1,14 +1,59 @@ # frozen_string_literal: true +require "concurrent/array" + module Floe + class AwesomeProcess < Thread + attr_reader :result + attr_accessor :error + + def initialize(queue, context, *args) + self.report_on_exception = true + @processed = false + @context = context + + # Don't like changing the value of context here, + # but want to make sure thread is set before the `queue.push` + # `queue.pop` will look potentially at status, which is through thread + context["thread"] = self + + super do + @result = AwesomeSpawn.run(*args) + + # this is changing the value of the context + # in the non-main thread + # Potential race condition here + Floe::AwesomeRunner.populate_results!(@context, :result => @result) + + # trigger an event + queue.push(["delete", context]) + rescue => err + # Shouldn't ever get in here + @error = err + + Floe::AwesomeRunner.populate_results!(@context, :error => err) + + # trigger an event + queue.push(["delete", context]) + end + end + end + class AwesomeRunner < Floe::Runner SCHEME = "awesome" SCHEME_PREFIX = "#{SCHEME}://" SCHEME_OFFSET = SCHEME.length + 3 + # only exposed for tests + # use wait instead + attr_reader :queue + def initialize(_options = {}) require "awesome_spawn" + # events triggered + @queue = Queue.new + super end @@ -21,13 +66,18 @@ def run_async!(resource, params = {}, _secrets = {}, _context = {}) runner_context = {} - # TODO: fix sanitization preventing params in args (e.g.: $PARAM1 => \$PARAM1) - result = AwesomeSpawn.run(method, :env => params, :params => args) - self.class.populate_results!(runner_context, :result => result) + # NOTE: this adds itself to the runner_context + AwesomeProcess.new(@queue, runner_context, method, :env => params, :params => args) + runner_context end def status!(runner_context) + # check if it has no output (i.e.: we think it is running) but it is not running + if !runner_context.key?("Output") && !runner_context["thread"]&.alive? + runner_context["Output"] = {"Error" => "Lambda.Unknown", "Cause" => "no output and no thread"} + runner_context["Error"] = true + end end def running?(runner_context) @@ -43,9 +93,17 @@ def output(runner_context) end def cleanup(runner_context) + runner_context["thread"] = nil end - def wait(_timeout: nil, _events: %i[create update delete]) + def wait(timeout: nil, _events: %i[create update delete]) + # TODO: implement whole interface + raise "wait needs a block and doesn't support timeout" unless timeout.nil? && block_given? + + loop do + event_context = @queue.pop + yield event_context if block_given? + end end # internal methods diff --git a/spec/awesome_runner_spec.rb b/spec/awesome_runner_spec.rb index 94bdf923..aafedc32 100644 --- a/spec/awesome_runner_spec.rb +++ b/spec/awesome_runner_spec.rb @@ -20,12 +20,14 @@ stub_good_run("ls", :params => [], :env => {}, :output => "file\nlisting\n") subject.run_async!("awesome://ls") + subject.queue.pop end it "passes environment variables to command run" do stub_good_run("ls", :params => [], :env => {"FOO" => "BAR"}, :output => "file\nlisting\n") subject.run_async!("awesome://ls", {"FOO" => "BAR"}) + subject.queue.pop end end