Skip to content

Commit

Permalink
Async version of the awesome runner
Browse files Browse the repository at this point in the history
  • Loading branch information
kbrock committed Apr 30, 2024
1 parent 64c7000 commit a2a833c
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 4 deletions.
66 changes: 62 additions & 4 deletions lib/floe/awesome_runner.rb
Original file line number Diff line number Diff line change
@@ -1,14 +1,59 @@
# frozen_string_literal: true

require "concurrent/array"

module Floe
class AwesomeProcess < Thread
attr_reader :result
attr_accessor :error

def initialize(queue, context, *args)
self.report_on_exception = true
@processed = false
@context = context

# Don't like changing the value of context here,
# but want to make sure thread is set before the `queue.push`
# `queue.pop` will look potentially at status, which is through thread
context["thread"] = self

super do
@result = AwesomeSpawn.run(*args)

# this is changing the value of the context
# in the non-main thread
# Potential race condition here
Floe::AwesomeRunner.populate_results!(@context, :result => @result)

# trigger an event
queue.push(["delete", context])
rescue => err
# Shouldn't ever get in here
@error = err

Floe::AwesomeRunner.populate_results!(@context, :error => err)

# trigger an event
queue.push(["delete", context])
end
end
end

class AwesomeRunner < Floe::Runner
SCHEME = "awesome"
SCHEME_PREFIX = "#{SCHEME}://"
SCHEME_OFFSET = SCHEME.length + 3

# only exposed for tests
# use wait instead
attr_reader :queue

def initialize(_options = {})
require "awesome_spawn"

# events triggered
@queue = Queue.new

super
end

Expand All @@ -21,13 +66,18 @@ def run_async!(resource, params = {}, _secrets = {}, _context = {})

runner_context = {}

# TODO: fix sanitization preventing params in args (e.g.: $PARAM1 => \$PARAM1)
result = AwesomeSpawn.run(method, :env => params, :params => args)
self.class.populate_results!(runner_context, :result => result)
# NOTE: this adds itself to the runner_context
AwesomeProcess.new(@queue, runner_context, method, :env => params, :params => args)

runner_context
end

def status!(runner_context)
# check if it has no output (i.e.: we think it is running) but it is not running
if !runner_context.key?("Output") && !runner_context["thread"]&.alive?
runner_context["Output"] = {"Error" => "Lambda.Unknown", "Cause" => "no output and no thread"}
runner_context["Error"] = true
end
end

def running?(runner_context)
Expand All @@ -43,9 +93,17 @@ def output(runner_context)
end

def cleanup(runner_context)
runner_context["thread"] = nil
end

def wait(_timeout: nil, _events: %i[create update delete])
def wait(timeout: nil, _events: %i[create update delete])
# TODO: implement whole interface
raise "wait needs a block and doesn't support timeout" unless timeout.nil? && block_given?

loop do
event_context = @queue.pop
yield event_context if block_given?
end
end

# internal methods
Expand Down
2 changes: 2 additions & 0 deletions spec/awesome_runner_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
stub_good_run("ls", :params => [], :env => {}, :output => "file\nlisting\n")

subject.run_async!("awesome://ls")
subject.queue.pop
end

it "passes environment variables to command run" do
stub_good_run("ls", :params => [], :env => {"FOO" => "BAR"}, :output => "file\nlisting\n")

subject.run_async!("awesome://ls", {"FOO" => "BAR"})
subject.queue.pop
end
end

Expand Down

0 comments on commit a2a833c

Please sign in to comment.