Skip to content

Commit

Permalink
Move promotion logic inside workers
Browse files Browse the repository at this point in the history
Fix: #32
  • Loading branch information
byroot committed Mar 28, 2023
1 parent 891ff14 commit 30d0b99
Show file tree
Hide file tree
Showing 12 changed files with 161 additions and 86 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM ruby:3.1
FROM ruby:3.2
RUN apt-get update -y && apt-get install -y ragel socat netcat smem apache2-utils
WORKDIR /app
CMD [ "bash" ]
24 changes: 0 additions & 24 deletions docs/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -334,30 +334,6 @@ By default automatic reforking isn't enabled.

Make sure to read the [fork safety guide](FORK_SAFETY.md) before enabling reforking.

### `mold_selector`

Sets the mold selector implementation.

```ruby
mold_selector do |server|
candidate = server.children.workers.sample # return an random worker
server.logger.info("worker=#{worker.nr} pid=#{worker.pid} selected as new mold")
candidate
end
```

The has access to `server.children` a `Pitchfork::Children` instance.
This object can be used to introspect the state of the cluster and select the most
appropriate worker to be used as the new mold from which workers will be reforked.

The default implementation selects the worker with the least
amount of shared memory. This heuristic aim to select the most
warmed up worker.

This should be considered a very advanced API and it is discouraged
to use it unless you are confident you have a clear understanding
of pitchfork's architecture.

## Rack Features

### `default_middleware`
Expand Down
2 changes: 1 addition & 1 deletion lib/pitchfork.rb
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def self.clean_fork(&block)

%w(
const socket_helper stream_input tee_input mem_info children message http_parser
refork_condition mold_selector configurator tmpio http_response worker http_server
refork_condition configurator tmpio http_response worker http_server
).each do |s|
require_relative "pitchfork/#{s}"
end
5 changes: 0 additions & 5 deletions lib/pitchfork/configurator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ class Configurator
server.logger.info("worker=#{worker.nr} ready")
},
:early_hints => false,
:mold_selector => MoldSelector::LeastSharedMemory.new,
:refork_condition => nil,
:check_client_connection => false,
:rewindable_input => true,
Expand Down Expand Up @@ -139,10 +138,6 @@ def after_worker_exit(*args, &block)
set_hook(:after_worker_exit, block_given? ? block : args[0], 3)
end

def mold_selector(*args, &block)
set_hook(:mold_selector, block_given? ? block : args[0], 3)
end

def timeout(seconds)
set_int(:timeout, seconds, 3)
# POSIX says 31 days is the smallest allowed maximum timeout for select()
Expand Down
51 changes: 51 additions & 0 deletions lib/pitchfork/flock.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
require 'tempfile'

module Pitchfork
class Flock
Error = Class.new(StandardError)

def initialize(name)
@name = name
@file = Tempfile.create([name, '.lock'])
@file.write("#{Process.pid}\n")
@file.flush
@owned = false
end

def at_fork
@owned = false
@file.close
@file = File.open(@file.path, "w")
nil
end

def unlink
File.unlink(@file.path)
rescue Errno::ENOENT
false
end

def try_lock
raise Error, "Pitchfork::Flock(#{@name}) trying to lock an already owned lock" if @owned

if @file.flock(File::LOCK_EX | File::LOCK_NB)
@owned = true
else
false
end
end

def unlock
raise Error, "Pitchfork::Flock(#{@name}) trying to unlock a non-owned lock" unless @owned

begin
if @file.flock(File::LOCK_UN)
@owned = false
true
else
false
end
end
end
end
end
34 changes: 28 additions & 6 deletions lib/pitchfork/http_server.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- encoding: binary -*-
require 'pitchfork/pitchfork_http'
require 'pitchfork/flock'

module Pitchfork
# This is the process manager of Pitchfork. This manages worker
Expand All @@ -13,7 +14,7 @@ class HttpServer
:listener_opts, :children,
:orig_app, :config, :ready_pipe,
:default_middleware, :early_hints
attr_writer :after_worker_exit, :after_worker_ready, :refork_condition, :mold_selector
attr_writer :after_worker_exit, :after_worker_ready, :refork_condition

attr_reader :logger
include Pitchfork::SocketHelper
Expand Down Expand Up @@ -65,6 +66,8 @@ def initialize(app, options = {})
@respawn = false
@last_check = time_now
@default_middleware = true
@promotion_lock = Flock.new("pitchfork-promotion")

options = options.dup
@ready_pipe = options.delete(:ready_pipe)
@init_listeners = options[:listeners].dup || []
Expand Down Expand Up @@ -318,6 +321,7 @@ def stop(graceful = true)
reap_all_workers
end
kill_each_child(:KILL)
@promotion_lock.unlink
end

def rewindable_input
Expand Down Expand Up @@ -429,17 +433,17 @@ def trigger_refork
end

unless @children.pending_promotion?
@children.refresh
if new_mold = @mold_selector.call(self)
if new_mold = @children.fresh_workers.first
@children.promote(new_mold)
else
logger.error("The mold select didn't return a candidate")
logger.error("No children at all???")
end
else
end
end

def after_fork_internal
@promotion_lock.at_fork
@control_socket[0].close_write # this is master-only, now
@ready_pipe.close if @ready_pipe
Pitchfork::Configurator::RACKUP.clear
Expand Down Expand Up @@ -484,6 +488,7 @@ def spawn_initial_mold
mold = Worker.new(nil)
mold.create_socketpair!
mold.pid = Pitchfork.clean_fork do
@promotion_lock.try_lock
mold.after_fork_in_child
build_app!
bind_listeners!
Expand Down Expand Up @@ -721,6 +726,12 @@ def worker_loop(worker)
client = false if client == :wait_readable
if client
case client
when Message::PromoteWorker
if @promotion_lock.try_lock
logger.info("Refork asked by master, promoting ourselves")
worker.tick = time_now.to_i
return worker.promoted!
end
when Message
worker.update(client)
else
Expand All @@ -729,11 +740,21 @@ def worker_loop(worker)
end
worker.tick = time_now.to_i
end
return if worker.mold? # We've been promoted we can exit the loop
end

# timeout so we can .tick and keep parent from SIGKILL-ing us
worker.tick = time_now.to_i
if @refork_condition && !worker.outdated?
if @refork_condition.met?(worker, logger)
if @promotion_lock.try_lock
logger.info("Refork condition met, promoting ourselves")
return worker.promoted! # We've been promoted we can exit the loop
else
# TODO: if we couldn't acquire the lock, we should backoff the refork_condition to avoid hammering the lock
end
end
end

waiter.get_readers(ready, readers, @timeout * 500) # to milliseconds, but halved
rescue => e
Pitchfork.log_error(@logger, "listen loop error", e) if readers[0]
Expand All @@ -743,7 +764,8 @@ def worker_loop(worker)
def mold_loop(mold)
readers = init_mold_process(mold)
waiter = prep_readers(readers)
mold.acknowlege_promotion(@control_socket[1])
mold.declare_promotion(@control_socket[1])
@promotion_lock.unlock
ready = readers.dup

begin
Expand Down
29 changes: 0 additions & 29 deletions lib/pitchfork/mold_selector.rb

This file was deleted.

6 changes: 3 additions & 3 deletions lib/pitchfork/refork_condition.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ def initialize(request_counts)
@limits = request_counts
end

def met?(children, logger)
if limit = @limits[children.last_generation]
if worker = children.fresh_workers.find { |w| w.requests_count >= limit }
def met?(worker, logger)
if limit = @limits[worker.generation]
if worker.requests_count >= limit
logger.info("worker=#{worker.nr} pid=#{worker.pid} processed #{worker.requests_count} requests, triggering a refork")
return true
end
Expand Down
27 changes: 13 additions & 14 deletions lib/pitchfork/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ module Pitchfork
class Worker
# :stopdoc:
EXIT_SIGNALS = [:QUIT, :TERM]
@generation = 0
attr_accessor :nr, :pid, :generation
attr_reader :master
attr_reader :master, :requests_count

def initialize(nr, pid: nil, generation: 0)
@nr = nr
Expand All @@ -23,6 +22,7 @@ def initialize(nr, pid: nil, generation: 0)
@mold = false
@to_io = @master = nil
@exiting = false
@requests_count = 0
if nr
build_raindrops(nr)
else
Expand All @@ -42,13 +42,17 @@ def exiting?
@exiting
end

def outdated?
CURRENT_GENERATION_DROP[0] > @generation
end

def update(message)
message.class.members.each do |member|
send("#{member}=", message.public_send(member))
end

case message
when Message::WorkerPromoted, Message::PromoteWorker
when Message::WorkerPromoted
promoted!
end
end
Expand All @@ -60,9 +64,10 @@ def register_to_master(control_socket)
@master.close
end

def acknowlege_promotion(control_socket)
def declare_promotion(control_socket)
message = Message::WorkerPromoted.new(@nr, Process.pid, generation)
control_socket.sendmsg(message)
CURRENT_GENERATION_DROP[0] = @generation
end

def promote(generation)
Expand Down Expand Up @@ -170,15 +175,11 @@ def tick # :nodoc:
end

def reset
@requests_drop[@drop_offset] = 0
end

def requests_count
@requests_drop[@drop_offset]
@requests_count = 0
end

def increment_requests_count
@requests_drop.incr(@drop_offset)
@requests_count += 1
end

# called in both the master (reaping worker) and worker (SIGQUIT handler)
Expand Down Expand Up @@ -216,9 +217,9 @@ def send_message_nonblock(message)
end

MOLD_DROP = Raindrops.new(1)
CURRENT_GENERATION_DROP = Raindrops.new(1)
PER_DROP = Raindrops::PAGE_SIZE / Raindrops::SIZE
TICK_DROPS = []
REQUEST_DROPS = []

class << self
# Since workers are created from another process, we have to
Expand All @@ -229,7 +230,6 @@ class << self
def preallocate_drops(workers_count)
0.upto(workers_count / PER_DROP) do |i|
TICK_DROPS[i] = Raindrops.new(PER_DROP)
REQUEST_DROPS[i] = Raindrops.new(PER_DROP)
end
end
end
Expand All @@ -238,8 +238,7 @@ def build_raindrops(drop_nr)
drop_index = drop_nr / PER_DROP
@drop_offset = drop_nr % PER_DROP
@tick_drop = TICK_DROPS[drop_index] ||= Raindrops.new(PER_DROP)
@requests_drop = REQUEST_DROPS[drop_index] ||= Raindrops.new(PER_DROP)
@tick_drop[@drop_offset] = @requests_drop[@drop_offset] = 0
@tick_drop[@drop_offset] = 0
end
end
end
4 changes: 2 additions & 2 deletions test/integration/test_boot.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def test_boot_minimal

pid = spawn_server(app: APP, config: <<~RUBY)
listen "#{addr}:#{port}"
worker_processes 2 # this should be >= nr_cpus
worker_processes 2
refork_after [50, 100, 1000]
RUBY

Expand All @@ -75,7 +75,7 @@ def test_boot_broken_after_promotion

pid = spawn_server(app: APP, config: <<~RUBY)
listen "#{addr}:#{port}"
worker_processes 2 # this should be >= nr_cpus
worker_processes 2
refork_after [50, 100, 1000]
after_promotion do |_server, _worker|
raise "Oops"
Expand Down
2 changes: 1 addition & 1 deletion test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# Some tests watch a log file or a pid file to spring up to check state
# Can't rely on inotify on non-Linux and logging to a pipe makes things
# more complicated
DEFAULT_TRIES = 1000
DEFAULT_TRIES = 100
DEFAULT_RES = 0.2

require 'minitest/autorun'
Expand Down
Loading

0 comments on commit 30d0b99

Please sign in to comment.