Skip to content

Commit

Permalink
Keep going (#3)
Browse files Browse the repository at this point in the history
* Added tests for proc exception re-raising
* Implemented keep_going for serial task running
* Implemented keep_going for concurrent task running
  • Loading branch information
ralsina committed Jul 3, 2023
1 parent dffc5f6 commit 81a06eb
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 18 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## Version 0.3.3

* Implemented keep-going flag

## Version 0.3.2

* Add support for running only some tasks in auto_run
Expand Down
2 changes: 1 addition & 1 deletion shard.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: croupier
version: 0.3.2
version: 0.3.3
description: A smart task definition and execution library
authors:
- Roberto Alsina <roberto.alsina@gmail.com>
Expand Down
49 changes: 48 additions & 1 deletion spec/croupier_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,16 @@ describe "Task" do
end
end

it "should fail if the proc raises an exception" do
with_scenario("empty") do
b = TaskProc.new { raise "foo" }
t = Task.new("output2", proc: b)
expect_raises(Exception, "Task 052cd9c::output2 failed: foo") do
t.run
end
end
end

it "should record hash for outputs in the TaskManager" do
with_scenario("empty") do
t = Task.new(
Expand Down Expand Up @@ -461,7 +471,7 @@ describe "TaskManager" do

# Run the same tests for parallel and serial execution of tasks
[false, true].each do |parallel|
describe "run_task, parallel = #{parallel}" do
describe "run_tasks, parallel = #{parallel}" do
it "should run all stale tasks when run_all is false" do
with_scenario("basic", to_create: {"input" => "foo", "input2" => "bar"}) do
TaskManager.tasks["output5"].stale = false
Expand Down Expand Up @@ -538,6 +548,43 @@ describe "TaskManager" do
end
end

it "should fail if a proc raises an exception" do
with_scenario("empty") do
b = TaskProc.new { raise "foo" }
Task.new(["output2"], proc: b)
expect_raises(Exception, "Task 052cd9c::output2 failed: foo") do
TaskManager.run_tasks(parallel: parallel)
end
end
end

# This is very hard to assert on parallel execution
unless parallel
it "should abort when a proc raises an exception" do
with_scenario("empty") do
b = TaskProc.new { raise "foo" }
Task.new(["output2"], proc: b)
Task.new(["output1"])
expect_raises(Exception, "Task 052cd9c::output2 failed: foo") do
TaskManager.run_tasks
end
# It should never have executed the second task
File.exists?("output1").should be_false
end
end
end

it "should not abort when a proc raises an exception with keep_going flag" do
with_scenario("empty") do
Task.new(["output2"], proc: TaskProc.new { raise "foo" })
Task.new(["output1"], proc: TaskProc.new { "foo" })
# Even though a proc raises an exception, it's caught
TaskManager.run_tasks(parallel: parallel, keep_going: true)
# It should never have executed the second task
File.exists?("output1").should be_true
end
end

it "should handle a task that generates multiple outputs" do
with_scenario("empty") do
p = TaskProc.new { ["foo", "bar"] }
Expand Down
60 changes: 44 additions & 16 deletions src/croupier.cr
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require "log"
require "yaml"

module Croupier
VERSION = "0.3.2"
VERSION = "0.3.3"

# A Task is an object that may generate output
#
Expand Down Expand Up @@ -107,7 +107,11 @@ module Croupier
call_results = Array(String | Nil).new
@procs.each do |proc|
Fiber.yield
result = proc.call
begin
result = proc.call
rescue ex
raise "Task #{self} failed: #{ex}"
end
if result.nil?
call_results << nil
elsif result.is_a?(String)
Expand Down Expand Up @@ -380,33 +384,49 @@ module Croupier
#
# If `run_all` is true, run non-stale tasks too
# If `dry_run` is true, only log what would be done, but don't do it
def run_tasks(run_all : Bool = false, dry_run : Bool = false, parallel : Bool = false)
# If `parallel` is true, run tasks in parallel
# If `keep_going` is true, keep going even if a task fails
def run_tasks(
run_all : Bool = false,
dry_run : Bool = false,
parallel : Bool = false,
keep_going : Bool = false
)
mark_stale_inputs
_, tasks = sorted_task_graph
check_dependencies
run_tasks(tasks, run_all, dry_run, parallel)
run_tasks(tasks, run_all, dry_run, parallel, keep_going)
end

# Run the tasks needed to create or update the requested targets
# run_all will run all tasks, not just the ones that are stale
# dry_run will only log what would be done, but not actually do it
#
# If `run_all` is true, run non-stale tasks too
# If `dry_run` is true, only log what would be done, but don't do it
# If `parallel` is true, run tasks in parallel
# If `keep_going` is true, keep going even if a task fails
def run_tasks(
targets : Array(String),
run_all : Bool = false,
dry_run : Bool = false,
parallel : Bool = false
parallel : Bool = false,
keep_going : Bool = false
)
mark_stale_inputs
tasks = dependencies(targets)
if parallel
_run_tasks_parallel(tasks, run_all, dry_run)
_run_tasks_parallel(tasks, run_all, dry_run, keep_going)
else
_run_tasks(tasks, run_all, dry_run)
_run_tasks(tasks, run_all, dry_run, keep_going)
end
end

# Helper to run tasks
def _run_tasks(outputs, run_all : Bool = false, dry_run : Bool = false)
# Internal helper to run tasks serially
def _run_tasks(
outputs,
run_all : Bool = false,
dry_run : Bool = false,
keep_going : Bool = false
)
finished = Set(Task).new
outputs.each do |output|
next unless tasks.has_key?(output)
Expand All @@ -415,13 +435,18 @@ module Croupier
next unless run_all || t.stale? || t.@always_run

Log.debug { "Running task for #{output}" }
t.run unless dry_run
begin
t.run unless dry_run
rescue ex
Log.error { "Error running task for #{output}: #{ex}" }
raise ex unless keep_going
end
finished << t
end
save_run
end

# Run all stale tasks as concurrently as possible.
# Internal helper to run tasks concurrently
#
# Whenever a task is ready, launch it in a separate fiber
# This is only concurrency, not parallelism, but on tests
Expand All @@ -431,18 +456,20 @@ module Croupier
def _run_tasks_parallel(
targets : Array(String) = [] of String,
run_all : Bool = false,
dry_run : Bool = false
dry_run : Bool = false,
keep_going : Bool = false # FIXME: implement
)
mark_stale_inputs

targets = tasks.keys if targets.empty?
_tasks = dependencies(targets)
finished_tasks = Set(Task).new
failed_tasks = Set(Task).new
errors = [] of String

loop do
stale_tasks = (_tasks.map { |t| tasks[t] }).select(&.stale?).reject { |t|
finished_tasks.includes?(t)
finished_tasks.includes?(t) || failed_tasks.includes?(t)
}

break if stale_tasks.empty?
Expand All @@ -456,6 +483,7 @@ module Croupier
begin
t.run unless dry_run
rescue ex
failed_tasks << t
errors << ex.message.to_s
ensure
# Task is done, do not run again
Expand All @@ -466,7 +494,7 @@ module Croupier
end
sleep(0.001)
end
raise errors.join("\n") unless errors.empty?
raise errors.join("\n") unless errors.empty? unless keep_going
# FIXME It's losing outputs for some reason
save_run
end
Expand Down

0 comments on commit 81a06eb

Please sign in to comment.