Skip to content

Commit

Permalink
fix(process_check): ensure consistent state after recovery (#259)
Browse files Browse the repository at this point in the history
  • Loading branch information
stakach committed Mar 15, 2023
1 parent 9ad0802 commit 12932e2
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 60 deletions.
9 changes: 4 additions & 5 deletions shard.lock
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ shards:

active-model:
git: https://github.com/spider-gazelle/active-model.git
version: 4.2.3
version: 4.3.0

ameba:
git: https://github.com/crystal-ameba/ameba.git
Expand Down Expand Up @@ -139,7 +139,7 @@ shards:

lucky_router:
git: https://github.com/luckyframework/lucky_router.git
version: 0.5.1
version: 0.5.2

murmur3:
git: https://github.com/aca-labs/murmur3.git
Expand Down Expand Up @@ -203,7 +203,7 @@ shards:

placeos-driver:
git: https://github.com/placeos/driver.git
version: 6.7.3
version: 6.7.4

placeos-log-backend:
git: https://github.com/place-labs/log-backend.git
Expand Down Expand Up @@ -231,7 +231,7 @@ shards:

redis:
git: https://github.com/stefanwille/crystal-redis.git
version: 2.8.3
version: 2.9.0

redis-cluster:
git: https://github.com/caspiano/redis-cluster.cr.git
Expand Down Expand Up @@ -288,4 +288,3 @@ shards:
ulid:
git: https://github.com/place-labs/ulid.git
version: 0.1.2

Empty file modified spec/migration/run.sh
100755 → 100644
Empty file.
102 changes: 49 additions & 53 deletions src/placeos-core/process_check.cr
Original file line number Diff line number Diff line change
Expand Up @@ -36,81 +36,77 @@ module PlaceOS::Core
protected def process_check : Nil
Log.debug { "checking for dead driver processes" }

# NOTE:
# Generally, having an unbounded channel is a bad idea, as it can cause deadlocks.
# The ideal would be having a channel the size of the number of managers we're inspecting.
checks = Channel({State, {Driver::Protocol::Management, Array(String)}}).new
module_manager_map = local_processes.get_module_managers

total_protocol_managers = local_processes.with_module_managers do |module_manager_map|
# Group module keys by protcol manager
grouped_managers = module_manager_map.each_with_object({} of Driver::Protocol::Management => Array(String)) do |(module_id, protocol_manager), grouped|
(grouped[protocol_manager] ||= [] of String) << module_id
end
# Group module keys by protcol manager
grouped_managers = module_manager_map.each_with_object({} of Driver::Protocol::Management => Array(String)) do |(module_id, protocol_manager), grouped|
(grouped[protocol_manager] ||= [] of String) << module_id
end

# Asynchronously check if any processes are timing out on comms, and if so, restart them
grouped_managers.each do |protocol_manager, module_ids|
# Asynchronously check if any processes are timing out on comms, and if so, restart them
grouped_managers.each do |protocol_manager, module_ids|
# Asynchronously check if any processes are timing out on comms, and if so, restart them
spawn(same_thread: true) do
state = begin
# If there's an empty response, the modules that were meant to be running are not.
# This is taken as a sign that the process is dead.
# Alternatively, if the response times out, the process is dead.
Tasker.timeout(PROCESS_COMMS_TIMEOUT) do
protocol_manager.info
end

State::Running
rescue error : Tasker::Timeout
Log.warn(exception: error) { "unresponsive process manager for #{module_ids.join(", ")}" }
State::Unresponsive
spawn(same_thread: true) do
state = begin
# If there's an empty response, the modules that were meant to be running are not.
# This is taken as a sign that the process is dead.
# Alternatively, if the response times out, the process is dead.
Tasker.timeout(PROCESS_COMMS_TIMEOUT) do
protocol_manager.info
end

checks.send({state, {protocol_manager, module_ids}})
State::Running
rescue error : Tasker::Timeout
Log.warn(exception: error) { "unresponsive process manager for #{module_ids.join(", ")}" }
State::Unresponsive
end
end

grouped_managers.size
checks.send({state, {protocol_manager, module_ids}})
end
end

local_processes.with_module_managers do |module_manager_map|
# Synchronously handle restarting unresponsive/dead drivers
total_protocol_managers.times do
state, driver_state = checks.receive
protocol_manager, module_ids = driver_state
total_protocol_managers = grouped_managers.size

next if state.running?
# Synchronously handle restarting unresponsive/dead drivers
total_protocol_managers.times do
state, driver_state = checks.receive
protocol_manager, module_ids = driver_state

# Ensure the process is killed
if state.unresponsive?
Process.signal(Signal::KILL, protocol_manager.pid) rescue nil
end
next if state.running?

# Kill the process manager's IO, unblocking any fibers waiting on a response
protocol_manager.@io.try(&.close) rescue nil
# Ensure the process is killed
if state.unresponsive?
Process.signal(Signal::KILL, protocol_manager.pid) rescue nil
end

# Kill the process manager's IO, unblocking any fibers waiting on a response
protocol_manager.@io.try(&.close) rescue nil

Log.warn { {message: "restarting unresponsive driver", state: state.to_s, driver_path: protocol_manager.@driver_path} }
Log.warn { {message: "restarting unresponsive driver", state: state.to_s, driver_path: protocol_manager.@driver_path} }

# Determine if any new modules have been loaded onto the driver that needs to be restarted
fresh_module_ids = module_manager_map.compact_map do |module_id, pm|
# Determine if any new modules have been loaded onto the driver that needs to be restarted
local_processes.with_module_managers do |module_managers|
fresh_module_ids = module_managers.compact_map do |module_id, pm|
module_id if pm == protocol_manager
end

# Union of old and new module_ids
module_ids |= fresh_module_ids

# Remove the dead manager from the map
module_manager_map.reject!(module_ids)

# Restart all the modules previously assigned to the dead manager
#
# NOTE:
# Make this independent of a database query by using the dead manager's stored module_ids and payloads.
# This will allow this module to be included in `PlaceOS::Edge::Client`.
# To do so, one will need to create the module manager (currently done by the `load_module` below (which is in `PlaceOS::Core::ModuleManager`))
Model::Module.find_all(module_ids).each do |mod|
Log.debug { "reloading #{mod.id} after restarting unresponsive driver" }
load_module(mod)
end
module_managers.reject!(module_ids)
end

# Restart all the modules previously assigned to the dead manager
#
# NOTE:
# Make this independent of a database query by using the dead manager's stored module_ids and payloads.
# This will allow this module to be included in `PlaceOS::Edge::Client`.
# To do so, one will need to create the module manager (currently done by the `load_module` below (which is in `PlaceOS::Core::ModuleManager`))
Model::Module.find_all(module_ids).each do |mod|
Log.debug { "reloading #{mod.id} after restarting unresponsive driver" }
load_module(mod)
end
end

Expand Down
7 changes: 5 additions & 2 deletions src/placeos-core/process_manager/common.cr
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,11 @@ module PlaceOS::Core::ProcessManager::Common
@driver_protocol_managers : Hash(String, Driver::Protocol::Management) = {} of String => Driver::Protocol::Management

protected def with_module_managers(&)
managers = protocol_manager_lock.synchronize { @module_protocol_managers.dup }
yield managers
protocol_manager_lock.synchronize { yield @module_protocol_managers }
end

protected def get_module_managers
protocol_manager_lock.synchronize { @module_protocol_managers.dup }
end

protected def protocol_manager_by_module?(module_id) : Driver::Protocol::Management?
Expand Down

0 comments on commit 12932e2

Please sign in to comment.