Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move promotion logic inside workers #38

Merged
merged 1 commit into from
Mar 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Unreleased

- Remove `mold_selector`. The promotion logic has been moved inside workers (#38).
- Add the `after_promotion` callback.
- Removed the `before_fork` callback.
- Fork workers and molds with a clean stack to allow more generations. (#30)
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM ruby:3.1
FROM ruby:3.2
RUN apt-get update -y && apt-get install -y ragel socat netcat smem apache2-utils
WORKDIR /app
CMD [ "bash" ]
24 changes: 0 additions & 24 deletions docs/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -334,30 +334,6 @@ By default automatic reforking isn't enabled.

Make sure to read the [fork safety guide](FORK_SAFETY.md) before enabling reforking.

### `mold_selector`

Sets the mold selector implementation.

```ruby
mold_selector do |server|
candidate = server.children.workers.sample # return an random worker
server.logger.info("worker=#{worker.nr} pid=#{worker.pid} selected as new mold")
candidate
end
```

The has access to `server.children` a `Pitchfork::Children` instance.
This object can be used to introspect the state of the cluster and select the most
appropriate worker to be used as the new mold from which workers will be reforked.

The default implementation selects the worker with the least
amount of shared memory. This heuristic aim to select the most
warmed up worker.

This should be considered a very advanced API and it is discouraged
to use it unless you are confident you have a clear understanding
of pitchfork's architecture.

## Rack Features

### `default_middleware`
Expand Down
2 changes: 1 addition & 1 deletion lib/pitchfork.rb
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def self.clean_fork(&block)

%w(
const socket_helper stream_input tee_input mem_info children message http_parser
refork_condition mold_selector configurator tmpio http_response worker http_server
refork_condition configurator tmpio http_response worker http_server
).each do |s|
require_relative "pitchfork/#{s}"
end
7 changes: 1 addition & 6 deletions lib/pitchfork/configurator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,9 @@ class Configurator
end
},
:after_worker_ready => lambda { |server, worker|
server.logger.info("worker=#{worker.nr} ready")
server.logger.info("worker=#{worker.nr} gen=#{worker.generation} ready")
},
:early_hints => false,
:mold_selector => MoldSelector::LeastSharedMemory.new,
:refork_condition => nil,
:check_client_connection => false,
:rewindable_input => true,
Expand Down Expand Up @@ -139,10 +138,6 @@ def after_worker_exit(*args, &block)
set_hook(:after_worker_exit, block_given? ? block : args[0], 3)
end

def mold_selector(*args, &block)
set_hook(:mold_selector, block_given? ? block : args[0], 3)
end

def timeout(seconds)
set_int(:timeout, seconds, 3)
# POSIX says 31 days is the smallest allowed maximum timeout for select()
Expand Down
51 changes: 51 additions & 0 deletions lib/pitchfork/flock.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
require 'tempfile'

module Pitchfork
class Flock
Error = Class.new(StandardError)

def initialize(name)
@name = name
@file = Tempfile.create([name, '.lock'])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own knowledge:

Original problem: Using a POSIX mutex was problematic because if the process holding the mutex died, it wouldn't unlock, and would deadlock the promotion flow thereafter.

Solution: A file lock solves this problem because the lock against the file is automatically released by the OS when the process terminates?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I explored several approach to IPC mutexes.

First to use a pthread_mutex_t inside a shared memory page. And as you point out it wouldn't cleanup on exit, which in a context where we might SIGKILL processes ourselves is not OK.

Then I tried sysV semaphores, they do have automatic cleanup, and we already use them via semian, but the API is very wonky to use, especially if you are trying to implement a mutex.

In the end I figured flock was the least worse. It's already available in ruby core, no need for more C, the only downsides are:

  • We need to write a file, which I would have liked to avoid.
  • We need to carefully re-open that file after fork.

But overall it seems to work well.

@file.write("#{Process.pid}\n")
@file.flush
@owned = false
end

def at_fork
@owned = false
@file.close
@file = File.open(@file.path, "w")
nil
end

def unlink
File.unlink(@file.path)
rescue Errno::ENOENT
false
end

def try_lock
raise Error, "Pitchfork::Flock(#{@name}) trying to lock an already owned lock" if @owned

if @file.flock(File::LOCK_EX | File::LOCK_NB)
@owned = true
else
false
end
end

def unlock
raise Error, "Pitchfork::Flock(#{@name}) trying to unlock a non-owned lock" unless @owned

begin
if @file.flock(File::LOCK_UN)
@owned = false
true
else
false
end
end
end
end
end
75 changes: 42 additions & 33 deletions lib/pitchfork/http_server.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- encoding: binary -*-
require 'pitchfork/pitchfork_http'
require 'pitchfork/flock'

module Pitchfork
# This is the process manager of Pitchfork. This manages worker
Expand All @@ -13,7 +14,7 @@ class HttpServer
:listener_opts, :children,
:orig_app, :config, :ready_pipe,
:default_middleware, :early_hints
attr_writer :after_worker_exit, :after_worker_ready, :refork_condition, :mold_selector
attr_writer :after_worker_exit, :after_worker_ready, :refork_condition

attr_reader :logger
include Pitchfork::SocketHelper
Expand Down Expand Up @@ -65,6 +66,8 @@ def initialize(app, options = {})
@respawn = false
@last_check = time_now
@default_middleware = true
@promotion_lock = Flock.new("pitchfork-promotion")

options = options.dup
@ready_pipe = options.delete(:ready_pipe)
@init_listeners = options[:listeners].dup || []
Expand Down Expand Up @@ -266,7 +269,7 @@ def monitor_loop(sleep = true)
end
if @respawn
maintain_worker_count
automatically_refork_workers if REFORKING_AVAILABLE
restart_outdated_workers if REFORKING_AVAILABLE
end

master_sleep(sleep_time) if sleep
Expand Down Expand Up @@ -318,6 +321,7 @@ def stop(graceful = true)
reap_all_workers
end
kill_each_child(:KILL)
@promotion_lock.unlink
end

def rewindable_input
Expand Down Expand Up @@ -429,17 +433,17 @@ def trigger_refork
end

unless @children.pending_promotion?
@children.refresh
if new_mold = @mold_selector.call(self)
if new_mold = @children.fresh_workers.first
@children.promote(new_mold)
else
logger.error("The mold select didn't return a candidate")
logger.error("No children at all???")
end
else
end
end

def after_fork_internal
@promotion_lock.at_fork
@control_socket[0].close_write # this is master-only, now
@ready_pipe.close if @ready_pipe
Pitchfork::Configurator::RACKUP.clear
Expand Down Expand Up @@ -484,6 +488,7 @@ def spawn_initial_mold
mold = Worker.new(nil)
mold.create_socketpair!
mold.pid = Pitchfork.clean_fork do
@promotion_lock.try_lock
mold.after_fork_in_child
build_app!
bind_listeners!
Expand Down Expand Up @@ -533,34 +538,21 @@ def maintain_worker_count
@children.each_worker { |w| w.nr >= worker_processes and w.soft_kill(:QUIT) }
end

def automatically_refork_workers
def restart_outdated_workers
# If we're already in the middle of forking a new generation, we just continue
if @children.mold
# We don't shutdown any outdated worker if any worker is already being spawned
# or a worker is exiting. Workers are only reforked one by one to minimize the
# impact on capacity.
# In the future we may want to use a dynamic limit, e.g. 10% of workers may be down at
# a time.
return if @children.pending_workers?
return if @children.workers.any?(&:exiting?)

if outdated_worker = @children.workers.find { |w| w.generation < @children.mold.generation }
logger.info("worker=#{outdated_worker.nr} pid=#{outdated_worker.pid} restarting")
outdated_worker.soft_kill(:QUIT)
return # That's all folks
end
end
return unless @children.mold

# If all workers are alive and well, we can consider reforking a new generation
if @refork_condition
@children.refresh
if @refork_condition.met?(@children, logger)
logger.info("Refork condition met, scheduling a promotion")
unless @sig_queue.include?(:USR2)
@sig_queue << :USR2
awaken_master
end
end
# We don't shutdown any outdated worker if any worker is already being spawned
# or a worker is exiting. Workers are only reforked one by one to minimize the
# impact on capacity.
# In the future we may want to use a dynamic limit, e.g. 10% of workers may be down at
# a time.
return if @children.pending_workers?
return if @children.workers.any?(&:exiting?)

if outdated_worker = @children.workers.find { |w| w.generation < @children.mold.generation }
logger.info("worker=#{outdated_worker.nr} pid=#{outdated_worker.pid} restarting")
outdated_worker.soft_kill(:QUIT)
end
end

Expand Down Expand Up @@ -721,6 +713,12 @@ def worker_loop(worker)
client = false if client == :wait_readable
if client
case client
when Message::PromoteWorker
if @promotion_lock.try_lock
logger.info("Refork asked by master, promoting ourselves")
worker.tick = time_now.to_i
return worker.promoted!
end
when Message
worker.update(client)
else
Expand All @@ -729,11 +727,21 @@ def worker_loop(worker)
end
worker.tick = time_now.to_i
end
return if worker.mold? # We've been promoted we can exit the loop
end

# timeout so we can .tick and keep parent from SIGKILL-ing us
worker.tick = time_now.to_i
if @refork_condition && !worker.outdated?
if @refork_condition.met?(worker, logger)
if @promotion_lock.try_lock
logger.info("Refork condition met, promoting ourselves")
return worker.promote! # We've been promoted we can exit the loop
else
# TODO: if we couldn't acquire the lock, we should backoff the refork_condition to avoid hammering the lock
end
end
end

waiter.get_readers(ready, readers, @timeout * 500) # to milliseconds, but halved
rescue => e
Pitchfork.log_error(@logger, "listen loop error", e) if readers[0]
Expand All @@ -743,7 +751,8 @@ def worker_loop(worker)
def mold_loop(mold)
readers = init_mold_process(mold)
waiter = prep_readers(readers)
mold.acknowlege_promotion(@control_socket[1])
mold.declare_promotion(@control_socket[1])
@promotion_lock.unlock
ready = readers.dup

begin
Expand Down
29 changes: 0 additions & 29 deletions lib/pitchfork/mold_selector.rb

This file was deleted.

6 changes: 3 additions & 3 deletions lib/pitchfork/refork_condition.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ def initialize(request_counts)
@limits = request_counts
end

def met?(children, logger)
if limit = @limits[children.last_generation]
if worker = children.fresh_workers.find { |w| w.requests_count >= limit }
def met?(worker, logger)
if limit = @limits[worker.generation]
if worker.requests_count >= limit
logger.info("worker=#{worker.nr} pid=#{worker.pid} processed #{worker.requests_count} requests, triggering a refork")
return true
end
Expand Down
Loading