Skip to content

Commit

Permalink
Implement run_auto for selected targets (#2)
Browse files Browse the repository at this point in the history
* Add support for running only some tasks in auto_run
* Implemented `TaskManager.inputs` to get the inputs for
  a given list of targets.
* Implemented `TaskManager.stop_watch` and watcher cleanup
* Uncommented skipping run if queued changes are empty in `auto_run`
* Added support for calling `watch` only for the dependencies of
  specific targets
* Simpler one-watcher implementation of watch
* Only react to specific Inotify flags in watch
  • Loading branch information
ralsina committed Jul 2, 2023
1 parent ff55ec1 commit 9a06d11
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 21 deletions.
12 changes: 12 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
# Changelog

## Version 0.3.2

* Add support for running only some tasks in auto_run
* Implemented `TaskManager.inputs` to get the inputs for
a given list of targets.
* Implemented `TaskManager.stop_watch` and watcher cleanup
* Uncommented skipping run if queued changes are empty in `auto_run`
* Added support for calling `watch` only for the dependencies of
specific targets
* Simpler one-watcher implementation of watch
* Only react to specific Inotify flags in watch

## Version 0.3.1

* Added auto_run / auto_stop that control a "watchdog" fiber that
Expand Down
2 changes: 1 addition & 1 deletion shard.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: croupier
version: 0.3.1
version: 0.3.2
description: A smart task definition and execution library
authors:
- Roberto Alsina <roberto.alsina@gmail.com>
Expand Down
48 changes: 47 additions & 1 deletion spec/croupier_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -703,12 +703,24 @@ describe "TaskManager" do
Fiber.yield
TaskManager.@queued_changes.should eq Set{"input"}
File.open("input2", "w") << "foo"
Fiber.yield
sleep 0.1.seconds # FIXME: this should work with a yield
TaskManager.@queued_changes.should eq Set{"input", "input2"}
end
end
end

describe "inputs" do
it "should list all inputs, including transitive dependencies" do
with_scenario("basic") do
TaskManager.inputs(["output1"]).empty?.should be_true
TaskManager.inputs(["output3"]).should eq Set{"input"}
TaskManager.inputs(["output4"]).should eq Set{"input", "output3"}
TaskManager.inputs(["output5"]).should eq Set{"input2"}
TaskManager.inputs(["output4", "output5"]).should eq Set{"input", "input2", "output3"}
end
end
end

describe "auto_run" do
it "should run tasks when inputs change" do
with_scenario("basic") do
Expand Down Expand Up @@ -801,6 +813,40 @@ describe "TaskManager" do
end
end
end

it "should only run the specified targets" do
with_scenario("basic") do
TaskManager.auto_run(targets: ["output3"])
# At this point output1/3 doesn't exist
File.exists?("output1").should be_false
File.exists?("output3").should be_false

# This triggers building output3
File.open("input", "w") << "bar"
Fiber.yield
TaskManager.auto_stop
# At this point output3 exists, output1 doesn't
File.exists?("output1").should be_false
File.exists?("output3").should be_true
end
end

it "should not be triggered by deps for not specified targets" do
with_scenario("basic") do
TaskManager.auto_run(targets: ["output5"])
sleep 0.2.seconds
# At this point output5 doesn't exist
File.exists?("output5").should be_false
File.exists?("output3").should be_false
# This triggers output3, which is not requested
File.open("input", "w") << "bar"
Fiber.yield
TaskManager.auto_stop
# No outputs created
File.exists?("output5").should be_false
File.exists?("output3").should be_false
end
end
end

describe "dependencies" do
Expand Down
67 changes: 48 additions & 19 deletions src/croupier.cr
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require "log"
require "yaml"

module Croupier
VERSION = "0.3.1"
VERSION = "0.3.2"

# A Task is an object that may generate output
#
Expand Down Expand Up @@ -227,6 +227,7 @@ module Croupier
@all_inputs.clear
@graph = Crystalline::Graph::DirectedAdjacencyGraph(String, Set(String)).new
@graph_sorted = [] of String
@queued_changes.clear
end

# Tasks as a dependency graph sorted topologically
Expand Down Expand Up @@ -286,6 +287,20 @@ module Croupier
@all_inputs
end

# The set of all inputs for the given tasks
def inputs(targets : Array(String))
result = Set(String).new
targets.each do |target|
raise "Unknown target #{target}" unless tasks.has_key? target
end

dependencies(targets).each do |task|
result.concat tasks[task].@inputs
end

result
end

# Get a task list of what tasks need to be done to produce `outputs`
# The list is sorted so it can be executed in order
def dependencies(outputs : Array(String))
Expand Down Expand Up @@ -464,15 +479,19 @@ module Croupier
@autorun_control = Channel(Bool).new
end

def auto_run
raise "No inputs to watch, can't auto_run" if all_inputs.empty?
watch
def auto_run(targets : Array(String) = [] of String)
targets = tasks.keys if targets.empty?
# Only want dependencies that are not tasks
inputs = inputs(targets)
raise "No inputs to watch, can't auto_run" if inputs.empty?
watch(targets)
spawn do
loop do
select
when @autorun_control.receive
Log.info { "Stopping automatic run" }
@autorun_control.close
@@watcher.close # Stop watchers
break
else
begin
Expand All @@ -482,10 +501,10 @@ module Croupier
# we can't see the side effects without sleeping in
# the tests.
sleep 0.1.seconds
# next if @queued_changes.empty?
next if @queued_changes.empty?
Log.info { "Detected changes in #{@queued_changes}" }
self.modified += @queued_changes
run_tasks
run_tasks(targets: targets)
# Only clean queued changes after a successful run
@queued_changes.clear
rescue ex
Expand All @@ -500,27 +519,37 @@ module Croupier
end
end

# Internal array of watchers
@@watcher = Inotify::Watcher.new

# Watch for changes in inputs.
# If an input has been changed BEFORE calling this method,
# it will NOT be detected as a change.
#
# Changes are added to queued_changes
def watch
all_inputs.each do |input|
def watch(targets : Array(String) = [] of String)
@@watcher.close
@@watcher = Inotify::Watcher.new
targets = tasks.keys if targets.empty?
target_inputs = inputs(targets)

@@watcher.on_event do |event|
# It's a file we care about, add it to the queue
@queued_changes << event.name.to_s if target_inputs.includes? event.name.to_s
end

watch_flags = LibInotify::IN_CLOSE_WRITE | LibInotify::IN_CREATE | LibInotify::IN_MODIFY

target_inputs.each do |input|
if File.exists? input
Inotify.watch input do |event|
unless event.name.nil?
@queued_changes << event.name.to_s
end
end
@@watcher.watch input, watch_flags
else
# It's a file that doesn't exist. To detect it
# being created, we watch the directory
Inotify.watch((Path[input].parent).to_s) do |event|
if all_inputs.includes? event.name
# It's a file we care about, add it to the queue
@queued_changes << event.name.to_s
end
# being created, we watch the parent directory
# if we are not already watching it.
path = (Path[input].parent).to_s
if !@@watcher.watching.includes?(path)
@@watcher.watch path, watch_flags
end
end
end
Expand Down

0 comments on commit 9a06d11

Please sign in to comment.