diff --git a/CHANGELOG.md b/CHANGELOG.md index 88dc5028..9cea622d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ # Unreleased +- Remove `mold_selector`. The promotion logic has been moved inside workers (#38). - Add the `after_promotion` callback. - Removed the `before_fork` callback. - Fork workers and molds with a clean stack to allow more generations. (#30) diff --git a/Dockerfile b/Dockerfile index e2d2e519..caa0a822 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM ruby:3.1 +FROM ruby:3.2 RUN apt-get update -y && apt-get install -y ragel socat netcat smem apache2-utils WORKDIR /app CMD [ "bash" ] diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index 41b08b05..3f9a2cf7 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -334,30 +334,6 @@ By default automatic reforking isn't enabled. Make sure to read the [fork safety guide](FORK_SAFETY.md) before enabling reforking. -### `mold_selector` - -Sets the mold selector implementation. - -```ruby -mold_selector do |server| - candidate = server.children.workers.sample # return an random worker - server.logger.info("worker=#{worker.nr} pid=#{worker.pid} selected as new mold") - candidate -end -``` - -The has access to `server.children` a `Pitchfork::Children` instance. -This object can be used to introspect the state of the cluster and select the most -appropriate worker to be used as the new mold from which workers will be reforked. - -The default implementation selects the worker with the least -amount of shared memory. This heuristic aim to select the most -warmed up worker. - -This should be considered a very advanced API and it is discouraged -to use it unless you are confident you have a clear understanding -of pitchfork's architecture. - ## Rack Features ### `default_middleware` diff --git a/lib/pitchfork.rb b/lib/pitchfork.rb index fc768a1e..f2862e84 100644 --- a/lib/pitchfork.rb +++ b/lib/pitchfork.rb @@ -171,7 +171,7 @@ def self.clean_fork(&block) %w( const socket_helper stream_input tee_input mem_info children message http_parser - refork_condition mold_selector configurator tmpio http_response worker http_server + refork_condition configurator tmpio http_response worker http_server ).each do |s| require_relative "pitchfork/#{s}" end diff --git a/lib/pitchfork/configurator.rb b/lib/pitchfork/configurator.rb index 2d5c7137..37b5efc1 100644 --- a/lib/pitchfork/configurator.rb +++ b/lib/pitchfork/configurator.rb @@ -51,10 +51,9 @@ class Configurator end }, :after_worker_ready => lambda { |server, worker| - server.logger.info("worker=#{worker.nr} ready") + server.logger.info("worker=#{worker.nr} gen=#{worker.generation} ready") }, :early_hints => false, - :mold_selector => MoldSelector::LeastSharedMemory.new, :refork_condition => nil, :check_client_connection => false, :rewindable_input => true, @@ -139,10 +138,6 @@ def after_worker_exit(*args, &block) set_hook(:after_worker_exit, block_given? ? block : args[0], 3) end - def mold_selector(*args, &block) - set_hook(:mold_selector, block_given? ? block : args[0], 3) - end - def timeout(seconds) set_int(:timeout, seconds, 3) # POSIX says 31 days is the smallest allowed maximum timeout for select() diff --git a/lib/pitchfork/flock.rb b/lib/pitchfork/flock.rb new file mode 100644 index 00000000..ad38cb71 --- /dev/null +++ b/lib/pitchfork/flock.rb @@ -0,0 +1,51 @@ +require 'tempfile' + +module Pitchfork + class Flock + Error = Class.new(StandardError) + + def initialize(name) + @name = name + @file = Tempfile.create([name, '.lock']) + @file.write("#{Process.pid}\n") + @file.flush + @owned = false + end + + def at_fork + @owned = false + @file.close + @file = File.open(@file.path, "w") + nil + end + + def unlink + File.unlink(@file.path) + rescue Errno::ENOENT + false + end + + def try_lock + raise Error, "Pitchfork::Flock(#{@name}) trying to lock an already owned lock" if @owned + + if @file.flock(File::LOCK_EX | File::LOCK_NB) + @owned = true + else + false + end + end + + def unlock + raise Error, "Pitchfork::Flock(#{@name}) trying to unlock a non-owned lock" unless @owned + + begin + if @file.flock(File::LOCK_UN) + @owned = false + true + else + false + end + end + end + end +end diff --git a/lib/pitchfork/http_server.rb b/lib/pitchfork/http_server.rb index cc11056d..484be3e6 100644 --- a/lib/pitchfork/http_server.rb +++ b/lib/pitchfork/http_server.rb @@ -1,5 +1,6 @@ # -*- encoding: binary -*- require 'pitchfork/pitchfork_http' +require 'pitchfork/flock' module Pitchfork # This is the process manager of Pitchfork. This manages worker @@ -13,7 +14,7 @@ class HttpServer :listener_opts, :children, :orig_app, :config, :ready_pipe, :default_middleware, :early_hints - attr_writer :after_worker_exit, :after_worker_ready, :refork_condition, :mold_selector + attr_writer :after_worker_exit, :after_worker_ready, :refork_condition attr_reader :logger include Pitchfork::SocketHelper @@ -65,6 +66,8 @@ def initialize(app, options = {}) @respawn = false @last_check = time_now @default_middleware = true + @promotion_lock = Flock.new("pitchfork-promotion") + options = options.dup @ready_pipe = options.delete(:ready_pipe) @init_listeners = options[:listeners].dup || [] @@ -266,7 +269,7 @@ def monitor_loop(sleep = true) end if @respawn maintain_worker_count - automatically_refork_workers if REFORKING_AVAILABLE + restart_outdated_workers if REFORKING_AVAILABLE end master_sleep(sleep_time) if sleep @@ -318,6 +321,7 @@ def stop(graceful = true) reap_all_workers end kill_each_child(:KILL) + @promotion_lock.unlink end def rewindable_input @@ -429,17 +433,17 @@ def trigger_refork end unless @children.pending_promotion? - @children.refresh - if new_mold = @mold_selector.call(self) + if new_mold = @children.fresh_workers.first @children.promote(new_mold) else - logger.error("The mold select didn't return a candidate") + logger.error("No children at all???") end else end end def after_fork_internal + @promotion_lock.at_fork @control_socket[0].close_write # this is master-only, now @ready_pipe.close if @ready_pipe Pitchfork::Configurator::RACKUP.clear @@ -484,6 +488,7 @@ def spawn_initial_mold mold = Worker.new(nil) mold.create_socketpair! mold.pid = Pitchfork.clean_fork do + @promotion_lock.try_lock mold.after_fork_in_child build_app! bind_listeners! @@ -533,34 +538,21 @@ def maintain_worker_count @children.each_worker { |w| w.nr >= worker_processes and w.soft_kill(:QUIT) } end - def automatically_refork_workers + def restart_outdated_workers # If we're already in the middle of forking a new generation, we just continue - if @children.mold - # We don't shutdown any outdated worker if any worker is already being spawned - # or a worker is exiting. Workers are only reforked one by one to minimize the - # impact on capacity. - # In the future we may want to use a dynamic limit, e.g. 10% of workers may be down at - # a time. - return if @children.pending_workers? - return if @children.workers.any?(&:exiting?) - - if outdated_worker = @children.workers.find { |w| w.generation < @children.mold.generation } - logger.info("worker=#{outdated_worker.nr} pid=#{outdated_worker.pid} restarting") - outdated_worker.soft_kill(:QUIT) - return # That's all folks - end - end + return unless @children.mold - # If all workers are alive and well, we can consider reforking a new generation - if @refork_condition - @children.refresh - if @refork_condition.met?(@children, logger) - logger.info("Refork condition met, scheduling a promotion") - unless @sig_queue.include?(:USR2) - @sig_queue << :USR2 - awaken_master - end - end + # We don't shutdown any outdated worker if any worker is already being spawned + # or a worker is exiting. Workers are only reforked one by one to minimize the + # impact on capacity. + # In the future we may want to use a dynamic limit, e.g. 10% of workers may be down at + # a time. + return if @children.pending_workers? + return if @children.workers.any?(&:exiting?) + + if outdated_worker = @children.workers.find { |w| w.generation < @children.mold.generation } + logger.info("worker=#{outdated_worker.nr} pid=#{outdated_worker.pid} restarting") + outdated_worker.soft_kill(:QUIT) end end @@ -721,6 +713,12 @@ def worker_loop(worker) client = false if client == :wait_readable if client case client + when Message::PromoteWorker + if @promotion_lock.try_lock + logger.info("Refork asked by master, promoting ourselves") + worker.tick = time_now.to_i + return worker.promoted! + end when Message worker.update(client) else @@ -729,11 +727,21 @@ def worker_loop(worker) end worker.tick = time_now.to_i end - return if worker.mold? # We've been promoted we can exit the loop end # timeout so we can .tick and keep parent from SIGKILL-ing us worker.tick = time_now.to_i + if @refork_condition && !worker.outdated? + if @refork_condition.met?(worker, logger) + if @promotion_lock.try_lock + logger.info("Refork condition met, promoting ourselves") + return worker.promote! # We've been promoted we can exit the loop + else + # TODO: if we couldn't acquire the lock, we should backoff the refork_condition to avoid hammering the lock + end + end + end + waiter.get_readers(ready, readers, @timeout * 500) # to milliseconds, but halved rescue => e Pitchfork.log_error(@logger, "listen loop error", e) if readers[0] @@ -743,7 +751,8 @@ def worker_loop(worker) def mold_loop(mold) readers = init_mold_process(mold) waiter = prep_readers(readers) - mold.acknowlege_promotion(@control_socket[1]) + mold.declare_promotion(@control_socket[1]) + @promotion_lock.unlock ready = readers.dup begin diff --git a/lib/pitchfork/mold_selector.rb b/lib/pitchfork/mold_selector.rb deleted file mode 100644 index 274ce338..00000000 --- a/lib/pitchfork/mold_selector.rb +++ /dev/null @@ -1,29 +0,0 @@ -# frozen_string_literal: true - -module Pitchfork - module MoldSelector - class LeastSharedMemory - def call(server) - workers = server.children.fresh_workers - if workers.empty? - server.logger.info("No current generation workers yet") - return - end - candidate = workers.shift - - workers.each do |worker| - if worker.meminfo.shared_memory < candidate.meminfo.shared_memory - # We suppose that a worker with a lower amount of shared memory - # has warmed up more caches & such, hence is closer to stabilize - # making it a better candidate. - candidate = worker - end - end - parent_meminfo = server.children.mold&.meminfo || MemInfo.new(Process.pid) - cow_efficiency = candidate.meminfo.cow_efficiency(parent_meminfo) - server.logger.info("worker=#{candidate.nr} pid=#{candidate.pid} selected as new mold shared_memory_kb=#{candidate.meminfo.shared_memory} cow=#{cow_efficiency.round(1)}%") - candidate - end - end - end -end diff --git a/lib/pitchfork/refork_condition.rb b/lib/pitchfork/refork_condition.rb index 80d9403e..7941096b 100644 --- a/lib/pitchfork/refork_condition.rb +++ b/lib/pitchfork/refork_condition.rb @@ -7,9 +7,9 @@ def initialize(request_counts) @limits = request_counts end - def met?(children, logger) - if limit = @limits[children.last_generation] - if worker = children.fresh_workers.find { |w| w.requests_count >= limit } + def met?(worker, logger) + if limit = @limits[worker.generation] + if worker.requests_count >= limit logger.info("worker=#{worker.nr} pid=#{worker.pid} processed #{worker.requests_count} requests, triggering a refork") return true end diff --git a/lib/pitchfork/worker.rb b/lib/pitchfork/worker.rb index c7f5d062..ec531fdf 100644 --- a/lib/pitchfork/worker.rb +++ b/lib/pitchfork/worker.rb @@ -12,9 +12,8 @@ module Pitchfork class Worker # :stopdoc: EXIT_SIGNALS = [:QUIT, :TERM] - @generation = 0 attr_accessor :nr, :pid, :generation - attr_reader :master + attr_reader :master, :requests_count def initialize(nr, pid: nil, generation: 0) @nr = nr @@ -23,6 +22,7 @@ def initialize(nr, pid: nil, generation: 0) @mold = false @to_io = @master = nil @exiting = false + @requests_count = 0 if nr build_raindrops(nr) else @@ -42,13 +42,17 @@ def exiting? @exiting end + def outdated? + CURRENT_GENERATION_DROP[0] > @generation + end + def update(message) message.class.members.each do |member| send("#{member}=", message.public_send(member)) end case message - when Message::WorkerPromoted, Message::PromoteWorker + when Message::WorkerPromoted promoted! end end @@ -60,9 +64,10 @@ def register_to_master(control_socket) @master.close end - def acknowlege_promotion(control_socket) + def declare_promotion(control_socket) message = Message::WorkerPromoted.new(@nr, Process.pid, generation) control_socket.sendmsg(message) + CURRENT_GENERATION_DROP[0] = @generation end def promote(generation) @@ -73,6 +78,11 @@ def spawn_worker(new_worker) send_message_nonblock(Message::SpawnWorker.new(new_worker.nr)) end + def promote! + @generation += 1 + promoted! + end + def promoted! @mold = true @nr = nil @@ -170,15 +180,11 @@ def tick # :nodoc: end def reset - @requests_drop[@drop_offset] = 0 - end - - def requests_count - @requests_drop[@drop_offset] + @requests_count = 0 end def increment_requests_count - @requests_drop.incr(@drop_offset) + @requests_count += 1 end # called in both the master (reaping worker) and worker (SIGQUIT handler) @@ -216,9 +222,9 @@ def send_message_nonblock(message) end MOLD_DROP = Raindrops.new(1) + CURRENT_GENERATION_DROP = Raindrops.new(1) PER_DROP = Raindrops::PAGE_SIZE / Raindrops::SIZE TICK_DROPS = [] - REQUEST_DROPS = [] class << self # Since workers are created from another process, we have to @@ -229,7 +235,6 @@ class << self def preallocate_drops(workers_count) 0.upto(workers_count / PER_DROP) do |i| TICK_DROPS[i] = Raindrops.new(PER_DROP) - REQUEST_DROPS[i] = Raindrops.new(PER_DROP) end end end @@ -238,8 +243,7 @@ def build_raindrops(drop_nr) drop_index = drop_nr / PER_DROP @drop_offset = drop_nr % PER_DROP @tick_drop = TICK_DROPS[drop_index] ||= Raindrops.new(PER_DROP) - @requests_drop = REQUEST_DROPS[drop_index] ||= Raindrops.new(PER_DROP) - @tick_drop[@drop_offset] = @requests_drop[@drop_offset] = 0 + @tick_drop[@drop_offset] = 0 end end end diff --git a/test/integration/test_boot.rb b/test/integration/test_boot.rb index 31048823..38dc123d 100644 --- a/test/integration/test_boot.rb +++ b/test/integration/test_boot.rb @@ -62,7 +62,7 @@ def test_boot_minimal pid = spawn_server(app: APP, config: <<~RUBY) listen "#{addr}:#{port}" - worker_processes 2 # this should be >= nr_cpus + worker_processes 2 refork_after [50, 100, 1000] RUBY @@ -75,7 +75,7 @@ def test_boot_broken_after_promotion pid = spawn_server(app: APP, config: <<~RUBY) listen "#{addr}:#{port}" - worker_processes 2 # this should be >= nr_cpus + worker_processes 2 refork_after [50, 100, 1000] after_promotion do |_server, _worker| raise "Oops" diff --git a/test/integration/test_reforking.rb b/test/integration/test_reforking.rb new file mode 100644 index 00000000..d91d3d4f --- /dev/null +++ b/test/integration/test_reforking.rb @@ -0,0 +1,39 @@ +require 'integration_test_helper' + +class ReforkingTest < Pitchfork::IntegrationTest + if Pitchfork::HttpServer::REFORKING_AVAILABLE + def test_reforking + addr, port = unused_port + + pid = spawn_server(app: File.join(ROOT, "test/integration/env.ru"), config: <<~CONFIG) + listen "#{addr}:#{port}" + worker_processes 2 + refork_after [5, 5] + CONFIG + + assert_healthy("http://#{addr}:#{port}") + assert_stderr "worker=0 gen=0 ready" + assert_stderr "worker=1 gen=0 ready" + + 9.times do + assert_equal true, healthy?("http://#{addr}:#{port}") + end + + assert_stderr "Refork condition met, promoting ourselves", timeout: 3 + assert_stderr "Terminating old mold pid=" + assert_stderr "worker=0 gen=1 ready" + assert_stderr "worker=1 gen=1 ready" + + File.truncate("stderr.log", 0) + + 9.times do + assert_equal true, healthy?("http://#{addr}:#{port}") + end + + assert_stderr "worker=0 gen=2 ready", timeout: 3 + assert_stderr "worker=1 gen=2 ready" + + assert_clean_shutdown(pid) + end + end +end diff --git a/test/integration_test_helper.rb b/test/integration_test_helper.rb index 0b4c9dde..47cf7976 100644 --- a/test/integration_test_helper.rb +++ b/test/integration_test_helper.rb @@ -96,9 +96,23 @@ def unused_port private - def assert_stderr(pattern) + def assert_stderr(pattern, timeout: 1) + wait_stderr?(pattern, timeout) + assert_match(pattern, read_stderr) + end + + def read_stderr # We have to strip because file truncation is not always atomic. - assert_match(pattern, File.read("stderr.log").strip) + File.read("stderr.log").strip + end + + def wait_stderr?(pattern, timeout) + pattern = Regexp.new(Regexp.escape(pattern)) if String === pattern + (timeout * 10).times do + return true if pattern.match?(read_stderr) + sleep 0.1 + end + false end def assert_clean_shutdown(pid, timeout = 4) diff --git a/test/test_helper.rb b/test/test_helper.rb index 6a9616ad..177d9342 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -15,7 +15,7 @@ # Some tests watch a log file or a pid file to spring up to check state # Can't rely on inotify on non-Linux and logging to a pipe makes things # more complicated -DEFAULT_TRIES = 1000 +DEFAULT_TRIES = 100 DEFAULT_RES = 0.2 require 'minitest/autorun' diff --git a/test/unit/test_flock.rb b/test/unit/test_flock.rb new file mode 100644 index 00000000..a5f76fcf --- /dev/null +++ b/test/unit/test_flock.rb @@ -0,0 +1,61 @@ +require 'test_helper' + +class TestFlock < Pitchfork::Test + def setup + @flock = Pitchfork::Flock.new("test") + end + + def teardown + @flock.unlink + end + + def test_try_lock + assert_equal true, @flock.try_lock + assert_raises Pitchfork::Flock::Error do + @flock.try_lock + end + end + + def test_unlock + assert_raises Pitchfork::Flock::Error do + @flock.unlock + end + assert_equal true, @flock.try_lock + assert_equal true, @flock.unlock + end + + def test_at_fork + @flock.try_lock + + parent_rd, child_wr = IO.pipe + child_rd, parent_wr = IO.pipe + pid = fork do + error = begin + @flock.try_lock + rescue => e + e + end + child_wr.write(Marshal.dump(error)) + @flock.at_fork + child_wr.write(Marshal.dump(@flock.try_lock)) + + child_rd.read("next\n".bytesize) + child_wr.write(Marshal.dump(@flock.try_lock)) + child_rd.read("next\n".bytesize) # block forever + end + + error = Marshal.load(parent_rd) + assert_instance_of Pitchfork::Flock::Error, error + assert_match "trying to lock an already owned lock", error.message + assert_equal false, Marshal.load(parent_rd) + + assert_equal true, @flock.unlock + parent_wr.write("lock\n") + assert_equal true, Marshal.load(parent_rd) + assert_equal false, @flock.try_lock + + Process.kill('KILL', pid) + Process.wait(pid) + assert_equal true, @flock.try_lock + end +end