Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spawn molds instead of promoting workers #42

Merged
merged 1 commit into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Unreleased

- Spawn mold from workers instead of promoting workers (#42).

# 0.2.0

- Remove default middlewares.
Expand Down
54 changes: 32 additions & 22 deletions docs/REFORKING.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,50 +46,60 @@ first time, if you take a warmed up worker out of rotation, and use it to fork n
be shared again, and most of them won't be invalidated anymore.


When you start `pitchfork` it forks the desired amount of workers:
When you start `pitchfork` it forks a `mold` process which loads your application:

```
PID COMMAND
100 \_ pitchfork master
101 \_ pitchfork worker[0] (gen:0)
102 \_ pitchfork worker[1] (gen:0)
103 \_ pitchfork worker[2] (gen:0)
104 \_ pitchfork worker[3] (gen:0)
101 \_ pitchfork mold (gen:0)
```

When a reforking is triggered, one of the workers is selected to become a `mold`, and is taken out of rotation.
When promoted, molds no longer process any incoming HTTP requests, they become inactive:
Once the `mold` is done loading, the `master` asks it to spawn the desired number of workers:

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you're missing the ``` backticks to start the codeblock

```
PID COMMAND
100 \_ pitchfork master
101 \_ pitchfork mold (gen:1)
102 \_ pitchfork worker[1] (gen:0)
103 \_ pitchfork worker[2] (gen:0)
104 \_ pitchfork worker[3] (gen:0)
101 \_ pitchfork mold (gen:0)
102 \_ pitchfork worker[0] (gen:0)
103 \_ pitchfork worker[1] (gen:0)
104 \_ pitchfork worker[2] (gen:0)
105 \_ pitchfork worker[3] (gen:0)
```

When a new mold has been promoted, `pitchfork` starts a slow rollout of older workers and replace them with fresh workers
When a reforking is triggered, one of the workers is selected to fork a new `mold`.

```
PID COMMAND
100 \_ pitchfork master
101 \_ pitchfork mold (gen:0)
102 \_ pitchfork worker[0] (gen:0)
103 \_ pitchfork worker[1] (gen:0)
104 \_ pitchfork worker[2] (gen:0)
105 \_ pitchfork worker[3] (gen:0)
105 \_ pitchfork mold (gen:1)
```

When that new mold is ready, `pitchfork` terminates the old mold and starts a slow rollout of older workers and replace them with fresh workers
forked from the mold:

```
PID COMMAND
100 \_ pitchfork master
101 \_ pitchfork mold (gen:1)
105 \_ pitchfork worker[0] (gen:1)
102 \_ pitchfork worker[1] (gen:0)
103 \_ pitchfork worker[2] (gen:0)
104 \_ pitchfork worker[3] (gen:0)
102 \_ pitchfork worker[0] (gen:0)
103 \_ pitchfork worker[1] (gen:0)
104 \_ pitchfork worker[2] (gen:0)
105 \_ pitchfork worker[3] (gen:0)
105 \_ pitchfork mold (gen:1)
```

```
PID COMMAND
100 \_ pitchfork master
101 \_ pitchfork mold (gen:1)
105 \_ pitchfork worker[0] (gen:1)
106 \_ pitchfork worker[1] (gen:1)
103 \_ pitchfork worker[2] (gen:0)
104 \_ pitchfork worker[3] (gen:0)
103 \_ pitchfork worker[1] (gen:0)
104 \_ pitchfork worker[2] (gen:0)
105 \_ pitchfork worker[3] (gen:0)
105 \_ pitchfork mold (gen:1)
106 \_ pitchfork worker[0] (gen:1)
```

etc.
Expand Down
15 changes: 11 additions & 4 deletions lib/pitchfork/children.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,36 @@ def register(child)
def register_mold(mold)
@pending_molds[mold.pid] = mold
@children[mold.pid] = mold
@mold = mold
end

def fetch(pid)
@children.fetch(pid)
end

def update(message)
child = @children[message.pid] || (message.nr && @workers[message.nr])
old_nr = child.nr
case message
when Message::MoldSpawned
mold = Worker.new(nil)
mold.update(message)
@pending_molds[mold.pid] = mold
@children[mold.pid] = mold
return mold
end

child = @children[message.pid] || (message.nr && @workers[message.nr])
child.update(message)

if child.mold?
@workers.delete(old_nr)
@pending_molds.delete(child.pid)
@molds[child.pid] = child
@mold = child
end

if child.pid
@children[child.pid] = child
@pending_workers.delete(child.nr)
end

child
end

Expand Down
57 changes: 34 additions & 23 deletions lib/pitchfork/http_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -289,13 +289,15 @@ def monitor_loop(sleep = true)
worker = @children.update(message)
# TODO: should we send a message to the worker to acknowledge?
logger.info "worker=#{worker.nr} pid=#{worker.pid} registered"
when Message::WorkerPromoted
when Message::MoldSpawned
new_mold = @children.update(message)
logger.info("mold pid=#{new_mold.pid} gen=#{new_mold.generation} spawned")
when Message::MoldReady
old_molds = @children.molds
new_mold = @children.fetch(message.pid)
logger.info("worker=#{new_mold.nr} pid=#{new_mold.pid} promoted to a mold")
@children.update(message)
new_mold = @children.update(message)
logger.info("mold pid=#{new_mold.pid} gen=#{new_mold.generation} ready")
old_molds.each do |old_mold|
logger.info("Terminating old mold pid=#{old_mold.pid}")
logger.info("Terminating old mold pid=#{old_mold.pid} gen=#{old_mold.generation}")
old_mold.soft_kill(:QUIT)
end
else
Expand Down Expand Up @@ -449,9 +451,6 @@ def spawn_worker(worker, detach:)

after_fork_internal
worker_loop(worker)
if worker.mold?
mold_loop(worker)
end
end

worker
Expand All @@ -461,12 +460,14 @@ def spawn_initial_mold
mold = Worker.new(nil)
mold.create_socketpair!
mold.pid = Pitchfork.clean_fork do
mold.pid = Process.pid
@promotion_lock.try_lock
mold.after_fork_in_child
build_app!
bind_listeners!
mold_loop(mold)
end
@promotion_lock.at_fork
@children.register_mold(mold)
end

Expand All @@ -480,7 +481,7 @@ def spawn_missing_workers

if REFORKING_AVAILABLE
unless @children.mold&.spawn_worker(worker)
@logger.error("Failed to send a spawn_woker command")
@logger.error("Failed to send a spawn_worker command")
end
else
spawn_worker(worker, detach: false)
Expand Down Expand Up @@ -648,10 +649,10 @@ def init_worker_process(worker)
readers
end

def init_mold_process(worker)
proc_name "mold (gen: #{worker.generation})"
after_promotion.call(self, worker)
readers = [worker]
def init_mold_process(mold)
proc_name "mold (gen: #{mold.generation})"
after_promotion.call(self, mold)
readers = [mold]
trap(:QUIT) { nuke_listeners!(readers) }
readers
end
Expand Down Expand Up @@ -687,11 +688,7 @@ def worker_loop(worker)
if client
case client
when Message::PromoteWorker
if @promotion_lock.try_lock
logger.info("Refork asked by master, promoting ourselves")
worker.tick = time_now.to_i
return worker.promoted!
end
spawn_mold(worker.generation)
when Message
worker.update(client)
else
Expand All @@ -706,10 +703,8 @@ def worker_loop(worker)
worker.tick = Pitchfork.time_now(true)
if @refork_condition && !worker.outdated?
if @refork_condition.met?(worker, logger)
if @promotion_lock.try_lock
logger.info("Refork condition met, promoting ourselves")
return worker.promote! # We've been promoted we can exit the loop
else
logger.info("Refork condition met, promoting ourselves")
unless spawn_mold(worker.generation)
# TODO: if we couldn't acquire the lock, we should backoff the refork_condition to avoid hammering the lock
end
end
Expand All @@ -721,13 +716,29 @@ def worker_loop(worker)
end while readers[0]
end

def spawn_mold(current_generation)
return false unless @promotion_lock.try_lock

begin
Pitchfork.fork_sibling do
mold = Worker.new(nil, pid: Process.pid, generation: current_generation)
mold.promote!
mold.start_promotion(@control_socket[1])
mold_loop(mold)
end
ensure
@promotion_lock.at_fork # We let the spawned mold own the lock
end
end

def mold_loop(mold)
readers = init_mold_process(mold)
waiter = prep_readers(readers)
mold.declare_promotion(@control_socket[1])
@promotion_lock.unlock
ready = readers.dup

mold.finish_promotion(@control_socket[1])

begin
mold.tick = Pitchfork.time_now(true)
while sock = ready.shift
Expand Down
3 changes: 2 additions & 1 deletion lib/pitchfork/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ class Message
SpawnWorker = Message.new(:nr)
WorkerSpawned = Message.new(:nr, :pid, :generation, :pipe)
PromoteWorker = Message.new(:generation)
WorkerPromoted = Message.new(:nr, :pid, :generation)
MoldSpawned = Message.new(:nr, :pid, :generation, :pipe)
MoldReady = Message.new(:nr, :pid, :generation)

SoftKill = Message.new(:signum)
end
Expand Down
24 changes: 15 additions & 9 deletions lib/pitchfork/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ def exiting?
@exiting
end

def pending?
@master.nil?
end

def outdated?
CURRENT_GENERATION_DROP[0] > @generation
end
Expand All @@ -50,22 +54,24 @@ def update(message)
message.class.members.each do |member|
send("#{member}=", message.public_send(member))
end

case message
when Message::WorkerPromoted
promoted!
end
end

def register_to_master(control_socket)
create_socketpair!
message = Message::WorkerSpawned.new(@nr, Process.pid, generation, @master)
message = Message::WorkerSpawned.new(@nr, @pid, generation, @master)
control_socket.sendmsg(message)
@master.close
end

def declare_promotion(control_socket)
message = Message::WorkerPromoted.new(@nr, Process.pid, generation)
def start_promotion(control_socket)
create_socketpair!
message = Message::MoldSpawned.new(@nr, @pid, generation, @master)
control_socket.sendmsg(message)
@master.close
end

def finish_promotion(control_socket)
message = Message::MoldReady.new(@nr, @pid, generation)
control_socket.sendmsg(message)
CURRENT_GENERATION_DROP[0] = @generation
end
Expand Down Expand Up @@ -198,7 +204,7 @@ def create_socketpair!
end

def after_fork_in_child
@master.close
@master&.close
end

private
Expand Down
2 changes: 1 addition & 1 deletion test/integration/test_boot.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def test_boot_broken_after_promotion
listen "#{addr}:#{port}"
worker_processes 2
refork_after [50, 100, 1000]
after_promotion do |_server, _worker|
after_promotion do |_server, _mold|
raise "Oops"
end
RUBY
Expand Down
28 changes: 28 additions & 0 deletions test/integration/test_reforking.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,33 @@ def test_reforking

assert_clean_shutdown(pid)
end

def test_reforking_broken_after_promotion_hook
addr, port = unused_port

pid = spawn_server(app: File.join(ROOT, "test/integration/env.ru"), config: <<~CONFIG)
listen "#{addr}:#{port}"
worker_processes 2
refork_after [5, 5]
after_promotion do |_server, mold|
raise "Oops" if mold.generation > 0
end
CONFIG

assert_healthy("http://#{addr}:#{port}")
assert_stderr "worker=0 gen=0 ready"
assert_stderr "worker=1 gen=0 ready"

9.times do
assert_equal true, healthy?("http://#{addr}:#{port}")
end

assert_stderr "Refork condition met, promoting ourselves", timeout: 3
assert_stderr(/mold pid=\d+ gen=1 reaped/)

assert_equal true, healthy?("http://#{addr}:#{port}")

assert_clean_shutdown(pid)
end
end
end
2 changes: 1 addition & 1 deletion test/integration_test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def assert_exited(pid, exitstatus, timeout = 4)
break if status = Process.wait2(pid, Process::WNOHANG)
sleep 0.5
end
refute_nil status
assert status, "process pid=#{pid} didn't exit in #{timeout} seconds"
assert_equal exitstatus, status[1].exitstatus
end

Expand Down
Loading