Skip to content

Commit

Permalink
Merge branch 'main' of github.com:ralsina/croupier
Browse files Browse the repository at this point in the history
  • Loading branch information
ralsina committed Jun 30, 2023
2 parents 51ca2dd + 0001311 commit 2e550b8
Show file tree
Hide file tree
Showing 6 changed files with 206 additions and 16 deletions.
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## Version 0.3.1

* Added auto_run / auto_stop that control a "watchdog" fiber that
automatically runs tasks if their dependencies change.

## Version 0.3.0

* Removed name parameter
Expand Down
7 changes: 4 additions & 3 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@
## Things it may make sense to add

* Instrument the concurrent runner using [Fiber Metrics](https://github.com/didactic-drunk/fiber_metrics.cr)
* Once it works fine with files, generalize to a k/v store using [kiwi](ihttps://github.com/crystal-community/kiwi)
* Once it works fine with files, generalize to a k/v store using [kiwi](https://github.com/crystal-community/kiwi)
* Use state machines for tasks (see veelenga/aasm.cr)
* Implement -k -i make options (keep going / ignore errors)
* Add a faster stale input check using file dates instead of hashes (like make)
* Add directory dependencies (depend on all files in the tree)
* Add wildcard dependencies (depend on all files / tasks matching a pattern)
* Implement failed state for tasks
* Implement a "watchdog" mode
* Implement -k -i make options (keep going / ignore errors)
* Decide what to do in auto_run when no task has inputs

* ~~Implement a "watchdog" mode~~
* ~~Rationalize id/name/output thing~~
* ~~Make it fast again :-)~~ [Sort of]
* ~~Implement the missing parts of the parallel runner~~
Expand Down
4 changes: 3 additions & 1 deletion shard.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: croupier
version: 0.2.4
version: 0.3.1
description: A smart task definition and execution library
authors:
- Roberto Alsina <roberto.alsina@gmail.com>
Expand All @@ -11,6 +11,8 @@ license: MIT
dependencies:
crystalline:
github: ralsina/crystalline
inotify:
github: petoem/inotify.cr

# Not the same crystalline as above :-)
crystalline:
Expand Down
110 changes: 109 additions & 1 deletion spec/croupier_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def with_scenario(
"dummy" => TaskProc.new { "" },
"counter" => TaskProc.new {
x += 1
""
x.to_s
},
"output2" => TaskProc.new {
x += 1
Expand Down Expand Up @@ -686,6 +686,114 @@ describe "TaskManager" do
end
end

describe "watch" do
it "should always start with no queued changes" do
with_scenario("basic", to_create: {"input" => "foo"}) do
TaskManager.watch
Fiber.yield
TaskManager.@queued_changes.empty?.should be_true
end
end

it "should queue changed inputs" do
with_scenario("basic", to_create: {"input" => "foo"}) do
TaskManager.watch
File.open("input", "w") << "bar"
# We need to yield or else the watch callbacks never run
Fiber.yield
TaskManager.@queued_changes.should eq Set{"input"}
File.open("input2", "w") << "foo"
Fiber.yield
TaskManager.@queued_changes.should eq Set{"input", "input2"}
end
end
end

describe "auto_run" do
it "should run tasks when inputs change" do
with_scenario("basic") do
TaskManager.auto_run
# We need to yield or else the watch callbacks never run
Fiber.yield
# At this point output3 doesn't exist
File.exists?("output3").should be_false
# We create input, which is output3's dependency
File.open("input", "w") << "bar"
Fiber.yield
# Tasks are not runnable (missing input2)
File.exists?("output3").should be_false
# We create input, which is output3's dependency
File.open("input2", "w") << "bar"
Fiber.yield
TaskManager.auto_stop
# And now output3 should exist
File.exists?("output3").should be_true
end
end

it "should not re-raise exceptions" do
with_scenario("empty") do
x = 0
error_proc = TaskProc.new { x += 1; raise "boom" }
Task.new(output: "t1", inputs: ["i"], proc: error_proc)
TaskManager.auto_run
Fiber.yield
File.open("i", "w") << "foo"
# We need to yield or else the watch callbacks never run
Fiber.yield
# auto_run logs all errors and continues, because it's
# normal to have failed runs in auto mode
TaskManager.auto_stop
# It should have run
(x > 0).should be_true
end
end

it "should not run when no inputs have changed" do
with_scenario("empty") do
x = 0
counter = TaskProc.new { x += 1; x.to_s }
Task.new(output: "t1", inputs: ["i"], proc: counter)
TaskManager.auto_run
# We need to yield or else the watch callbacks never run
Fiber.yield
TaskManager.auto_stop
# It should never have ran
x.should eq 0
end
end

it "should run only when inputs have changed" do
with_scenario("empty") do
x = 0
counter = TaskProc.new { x += 1; x.to_s }
Task.new(output: "t1", inputs: ["i"], proc: counter)
TaskManager.auto_run
Fiber.yield
File.open("i", "w") << "foo"
Fiber.yield
TaskManager.auto_stop
# It should only have ran once
x.should eq 1
end
end

it "should run tasks without outputs" do
with_scenario("empty") do
x = 0
counter = TaskProc.new { x += 1; x.to_s }
Task.new(id: "t1", inputs: ["i"], proc: counter)
TaskManager.auto_run
Fiber.yield
File.open("i", "w") << "foo"
Fiber.yield
TaskManager.auto_stop
# It should only have ran once
x.should eq 1
end
end
end

describe "dependencies" do
it "should report all tasks required to produce an output" do
with_scenario("basic", to_create: {"input" => "foo", "input2" => "bar"}) do
Expand Down
2 changes: 1 addition & 1 deletion spec/testcases/basic/tasks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ output1:
always_run: false
no_save: false
stale: true
procs: "dummy"
procs: "counter"
output2:
id: 052cd9c6f04c7451518f3f12a3f48caad072ec2a
name: name
Expand Down
94 changes: 84 additions & 10 deletions src/croupier.cr
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
# Croupier describes a task graph and lets you operate on them
require "digest/sha1"
require "yaml"
require "./topo_sort"
require "crystalline"
require "digest/sha1"
require "inotify"
require "log"
require "./topo_sort"
require "yaml"

module Croupier
VERSION = "0.2.4"
VERSION = "0.3.1"

# A Task is an object that may generate output
#
Expand Down Expand Up @@ -154,7 +155,6 @@ module Croupier
return true if @always_run || @inputs.empty?
# Tasks don't get stale twice
return false unless @stale

@outputs.any? { |output| !File.exists?(output) } ||
# Any input file is modified
@inputs.any? { |input| TaskManager.modified.includes? input } ||
Expand Down Expand Up @@ -203,7 +203,7 @@ module Croupier
end
end

struct TaskManagerType
class TaskManagerType
# Registry of all tasks
property tasks = {} of String => Croupier::Task
# Registry of modified files, which will make tasks stale
Expand All @@ -215,6 +215,8 @@ module Croupier
# SAH1 of input files as of ending this run
property next_run = {} of String => String

@queued_changes : Set(String) = Set(String).new

# Remove all tasks and everything else (good for tests)
def cleanup
modified.clear
Expand Down Expand Up @@ -393,12 +395,13 @@ module Croupier
finished = Set(Task).new
outputs.each do |output|
next unless tasks.has_key?(output)
next if finished.includes?(tasks[output])
next unless run_all || tasks[output].stale? || tasks[output].@always_run
t = tasks[output]
next if finished.includes?(t)
next unless run_all || t.stale? || t.@always_run

Log.debug { "Running task for #{output}" }
tasks[output].run unless dry_run
finished << tasks[output]
t.run unless dry_run
finished << t
end
save_run
end
Expand Down Expand Up @@ -452,6 +455,77 @@ module Croupier
# FIXME It's losing outputs for some reason
save_run
end

@autorun_control = Channel(Bool).new

def auto_stop
@autorun_control.send true
@autorun_control.receive?
@autorun_control = Channel(Bool).new
end

def auto_run
# TODO consider how to handle task trees with no inputs
# should they run? Once? Infinite times?
watch
spawn do
loop do
select
when @autorun_control.receive
Log.info { "Stopping automatic run" }
@autorun_control.close
break
else
begin
# Sleep early is better for race conditions in tests
# If we sleep late, it's likely that we'll get the
# stop order and break the loop without running, so
# we can't see the side effects without sleeping in
# the tests.
sleep 0.1.seconds
# next if @queued_changes.empty?
Log.info { "Detected changes in #{@queued_changes}" }
self.modified += @queued_changes
run_tasks
# Only clean queued changes after a successful run
@queued_changes.clear
rescue ex
# Sometimes we can't run because not all dependencies
# are there yet or whatever. We'll try again later
unless ex.message.to_s.starts_with?("Can't run: Unknown inputs")
Log.warn { "Automatic run failed (will retry): #{ex.message}" }
end
end
end
end
end
end

# Watch for changes in inputs.
# If an input has been changed BEFORE calling this method,
# it will NOT be detected as a change.
#
# Changes are added to queued_changes
def watch
all_inputs.each do |input|
if File.exists? input
Inotify.watch input do |event|
unless event.name.nil?
@queued_changes << event.name.to_s
end
end
else
# It's a file that doesn't exist. To detect it
# being created, we watch the directory
Inotify.watch((Path[input].parent).to_s) do |event|
if all_inputs.includes? event.name
# It's a file we care about, add it to the queue
@queued_changes << event.name.to_s
end
end
end
end
end
end

# The global task manager (singleton)
Expand Down

0 comments on commit 2e550b8

Please sign in to comment.