Skip to content

Commit

Permalink
Merge pull request #38 from Shopify/worker-auto-promotion
Browse files Browse the repository at this point in the history
Move promotion logic inside workers
  • Loading branch information
casperisfine committed Mar 28, 2023
2 parents 891ff14 + 7239fba commit 105b87a
Show file tree
Hide file tree
Showing 15 changed files with 237 additions and 116 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Unreleased

- Remove `mold_selector`. The promotion logic has been moved inside workers (#38).
- Add the `after_promotion` callback.
- Removed the `before_fork` callback.
- Fork workers and molds with a clean stack to allow more generations. (#30)
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM ruby:3.1
FROM ruby:3.2
RUN apt-get update -y && apt-get install -y ragel socat netcat smem apache2-utils
WORKDIR /app
CMD [ "bash" ]
24 changes: 0 additions & 24 deletions docs/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -334,30 +334,6 @@ By default automatic reforking isn't enabled.

Make sure to read the [fork safety guide](FORK_SAFETY.md) before enabling reforking.

### `mold_selector`

Sets the mold selector implementation.

```ruby
mold_selector do |server|
candidate = server.children.workers.sample # return an random worker
server.logger.info("worker=#{worker.nr} pid=#{worker.pid} selected as new mold")
candidate
end
```

The has access to `server.children` a `Pitchfork::Children` instance.
This object can be used to introspect the state of the cluster and select the most
appropriate worker to be used as the new mold from which workers will be reforked.

The default implementation selects the worker with the least
amount of shared memory. This heuristic aim to select the most
warmed up worker.

This should be considered a very advanced API and it is discouraged
to use it unless you are confident you have a clear understanding
of pitchfork's architecture.

## Rack Features

### `default_middleware`
Expand Down
2 changes: 1 addition & 1 deletion lib/pitchfork.rb
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def self.clean_fork(&block)

%w(
const socket_helper stream_input tee_input mem_info children message http_parser
refork_condition mold_selector configurator tmpio http_response worker http_server
refork_condition configurator tmpio http_response worker http_server
).each do |s|
require_relative "pitchfork/#{s}"
end
7 changes: 1 addition & 6 deletions lib/pitchfork/configurator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,9 @@ class Configurator
end
},
:after_worker_ready => lambda { |server, worker|
server.logger.info("worker=#{worker.nr} ready")
server.logger.info("worker=#{worker.nr} gen=#{worker.generation} ready")
},
:early_hints => false,
:mold_selector => MoldSelector::LeastSharedMemory.new,
:refork_condition => nil,
:check_client_connection => false,
:rewindable_input => true,
Expand Down Expand Up @@ -139,10 +138,6 @@ def after_worker_exit(*args, &block)
set_hook(:after_worker_exit, block_given? ? block : args[0], 3)
end

def mold_selector(*args, &block)
set_hook(:mold_selector, block_given? ? block : args[0], 3)
end

def timeout(seconds)
set_int(:timeout, seconds, 3)
# POSIX says 31 days is the smallest allowed maximum timeout for select()
Expand Down
51 changes: 51 additions & 0 deletions lib/pitchfork/flock.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
require 'tempfile'

module Pitchfork
class Flock
Error = Class.new(StandardError)

def initialize(name)
@name = name
@file = Tempfile.create([name, '.lock'])
@file.write("#{Process.pid}\n")
@file.flush
@owned = false
end

def at_fork
@owned = false
@file.close
@file = File.open(@file.path, "w")
nil
end

def unlink
File.unlink(@file.path)
rescue Errno::ENOENT
false
end

def try_lock
raise Error, "Pitchfork::Flock(#{@name}) trying to lock an already owned lock" if @owned

if @file.flock(File::LOCK_EX | File::LOCK_NB)
@owned = true
else
false
end
end

def unlock
raise Error, "Pitchfork::Flock(#{@name}) trying to unlock a non-owned lock" unless @owned

begin
if @file.flock(File::LOCK_UN)
@owned = false
true
else
false
end
end
end
end
end
75 changes: 42 additions & 33 deletions lib/pitchfork/http_server.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- encoding: binary -*-
require 'pitchfork/pitchfork_http'
require 'pitchfork/flock'

module Pitchfork
# This is the process manager of Pitchfork. This manages worker
Expand All @@ -13,7 +14,7 @@ class HttpServer
:listener_opts, :children,
:orig_app, :config, :ready_pipe,
:default_middleware, :early_hints
attr_writer :after_worker_exit, :after_worker_ready, :refork_condition, :mold_selector
attr_writer :after_worker_exit, :after_worker_ready, :refork_condition

attr_reader :logger
include Pitchfork::SocketHelper
Expand Down Expand Up @@ -65,6 +66,8 @@ def initialize(app, options = {})
@respawn = false
@last_check = time_now
@default_middleware = true
@promotion_lock = Flock.new("pitchfork-promotion")

options = options.dup
@ready_pipe = options.delete(:ready_pipe)
@init_listeners = options[:listeners].dup || []
Expand Down Expand Up @@ -266,7 +269,7 @@ def monitor_loop(sleep = true)
end
if @respawn
maintain_worker_count
automatically_refork_workers if REFORKING_AVAILABLE
restart_outdated_workers if REFORKING_AVAILABLE
end

master_sleep(sleep_time) if sleep
Expand Down Expand Up @@ -318,6 +321,7 @@ def stop(graceful = true)
reap_all_workers
end
kill_each_child(:KILL)
@promotion_lock.unlink
end

def rewindable_input
Expand Down Expand Up @@ -429,17 +433,17 @@ def trigger_refork
end

unless @children.pending_promotion?
@children.refresh
if new_mold = @mold_selector.call(self)
if new_mold = @children.fresh_workers.first
@children.promote(new_mold)
else
logger.error("The mold select didn't return a candidate")
logger.error("No children at all???")
end
else
end
end

def after_fork_internal
@promotion_lock.at_fork
@control_socket[0].close_write # this is master-only, now
@ready_pipe.close if @ready_pipe
Pitchfork::Configurator::RACKUP.clear
Expand Down Expand Up @@ -484,6 +488,7 @@ def spawn_initial_mold
mold = Worker.new(nil)
mold.create_socketpair!
mold.pid = Pitchfork.clean_fork do
@promotion_lock.try_lock
mold.after_fork_in_child
build_app!
bind_listeners!
Expand Down Expand Up @@ -533,34 +538,21 @@ def maintain_worker_count
@children.each_worker { |w| w.nr >= worker_processes and w.soft_kill(:QUIT) }
end

def automatically_refork_workers
def restart_outdated_workers
# If we're already in the middle of forking a new generation, we just continue
if @children.mold
# We don't shutdown any outdated worker if any worker is already being spawned
# or a worker is exiting. Workers are only reforked one by one to minimize the
# impact on capacity.
# In the future we may want to use a dynamic limit, e.g. 10% of workers may be down at
# a time.
return if @children.pending_workers?
return if @children.workers.any?(&:exiting?)

if outdated_worker = @children.workers.find { |w| w.generation < @children.mold.generation }
logger.info("worker=#{outdated_worker.nr} pid=#{outdated_worker.pid} restarting")
outdated_worker.soft_kill(:QUIT)
return # That's all folks
end
end
return unless @children.mold

# If all workers are alive and well, we can consider reforking a new generation
if @refork_condition
@children.refresh
if @refork_condition.met?(@children, logger)
logger.info("Refork condition met, scheduling a promotion")
unless @sig_queue.include?(:USR2)
@sig_queue << :USR2
awaken_master
end
end
# We don't shutdown any outdated worker if any worker is already being spawned
# or a worker is exiting. Workers are only reforked one by one to minimize the
# impact on capacity.
# In the future we may want to use a dynamic limit, e.g. 10% of workers may be down at
# a time.
return if @children.pending_workers?
return if @children.workers.any?(&:exiting?)

if outdated_worker = @children.workers.find { |w| w.generation < @children.mold.generation }
logger.info("worker=#{outdated_worker.nr} pid=#{outdated_worker.pid} restarting")
outdated_worker.soft_kill(:QUIT)
end
end

Expand Down Expand Up @@ -721,6 +713,12 @@ def worker_loop(worker)
client = false if client == :wait_readable
if client
case client
when Message::PromoteWorker
if @promotion_lock.try_lock
logger.info("Refork asked by master, promoting ourselves")
worker.tick = time_now.to_i
return worker.promoted!
end
when Message
worker.update(client)
else
Expand All @@ -729,11 +727,21 @@ def worker_loop(worker)
end
worker.tick = time_now.to_i
end
return if worker.mold? # We've been promoted we can exit the loop
end

# timeout so we can .tick and keep parent from SIGKILL-ing us
worker.tick = time_now.to_i
if @refork_condition && !worker.outdated?
if @refork_condition.met?(worker, logger)
if @promotion_lock.try_lock
logger.info("Refork condition met, promoting ourselves")
return worker.promote! # We've been promoted we can exit the loop
else
# TODO: if we couldn't acquire the lock, we should backoff the refork_condition to avoid hammering the lock
end
end
end

waiter.get_readers(ready, readers, @timeout * 500) # to milliseconds, but halved
rescue => e
Pitchfork.log_error(@logger, "listen loop error", e) if readers[0]
Expand All @@ -743,7 +751,8 @@ def worker_loop(worker)
def mold_loop(mold)
readers = init_mold_process(mold)
waiter = prep_readers(readers)
mold.acknowlege_promotion(@control_socket[1])
mold.declare_promotion(@control_socket[1])
@promotion_lock.unlock
ready = readers.dup

begin
Expand Down
29 changes: 0 additions & 29 deletions lib/pitchfork/mold_selector.rb

This file was deleted.

6 changes: 3 additions & 3 deletions lib/pitchfork/refork_condition.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ def initialize(request_counts)
@limits = request_counts
end

def met?(children, logger)
if limit = @limits[children.last_generation]
if worker = children.fresh_workers.find { |w| w.requests_count >= limit }
def met?(worker, logger)
if limit = @limits[worker.generation]
if worker.requests_count >= limit
logger.info("worker=#{worker.nr} pid=#{worker.pid} processed #{worker.requests_count} requests, triggering a refork")
return true
end
Expand Down
Loading

0 comments on commit 105b87a

Please sign in to comment.