From 12932e2a8cb211c3e0ab2222065eba07756c376e Mon Sep 17 00:00:00 2001 From: Stephen von Takach Date: Wed, 15 Mar 2023 10:50:53 +1100 Subject: [PATCH] fix(process_check): ensure consistent state after recovery (#259) --- shard.lock | 9 +- spec/migration/run.sh | 0 src/placeos-core/process_check.cr | 102 ++++++++++----------- src/placeos-core/process_manager/common.cr | 7 +- 4 files changed, 58 insertions(+), 60 deletions(-) mode change 100755 => 100644 spec/migration/run.sh diff --git a/shard.lock b/shard.lock index c3560986..31167b6e 100644 --- a/shard.lock +++ b/shard.lock @@ -11,7 +11,7 @@ shards: active-model: git: https://github.com/spider-gazelle/active-model.git - version: 4.2.3 + version: 4.3.0 ameba: git: https://github.com/crystal-ameba/ameba.git @@ -139,7 +139,7 @@ shards: lucky_router: git: https://github.com/luckyframework/lucky_router.git - version: 0.5.1 + version: 0.5.2 murmur3: git: https://github.com/aca-labs/murmur3.git @@ -203,7 +203,7 @@ shards: placeos-driver: git: https://github.com/placeos/driver.git - version: 6.7.3 + version: 6.7.4 placeos-log-backend: git: https://github.com/place-labs/log-backend.git @@ -231,7 +231,7 @@ shards: redis: git: https://github.com/stefanwille/crystal-redis.git - version: 2.8.3 + version: 2.9.0 redis-cluster: git: https://github.com/caspiano/redis-cluster.cr.git @@ -288,4 +288,3 @@ shards: ulid: git: https://github.com/place-labs/ulid.git version: 0.1.2 - diff --git a/spec/migration/run.sh b/spec/migration/run.sh old mode 100755 new mode 100644 diff --git a/src/placeos-core/process_check.cr b/src/placeos-core/process_check.cr index 5b5030c3..8e7401ce 100644 --- a/src/placeos-core/process_check.cr +++ b/src/placeos-core/process_check.cr @@ -36,62 +36,58 @@ module PlaceOS::Core protected def process_check : Nil Log.debug { "checking for dead driver processes" } - # NOTE: - # Generally, having an unbounded channel is a bad idea, as it can cause deadlocks. - # The ideal would be having a channel the size of the number of managers we're inspecting. checks = Channel({State, {Driver::Protocol::Management, Array(String)}}).new + module_manager_map = local_processes.get_module_managers - total_protocol_managers = local_processes.with_module_managers do |module_manager_map| - # Group module keys by protcol manager - grouped_managers = module_manager_map.each_with_object({} of Driver::Protocol::Management => Array(String)) do |(module_id, protocol_manager), grouped| - (grouped[protocol_manager] ||= [] of String) << module_id - end + # Group module keys by protcol manager + grouped_managers = module_manager_map.each_with_object({} of Driver::Protocol::Management => Array(String)) do |(module_id, protocol_manager), grouped| + (grouped[protocol_manager] ||= [] of String) << module_id + end + # Asynchronously check if any processes are timing out on comms, and if so, restart them + grouped_managers.each do |protocol_manager, module_ids| # Asynchronously check if any processes are timing out on comms, and if so, restart them - grouped_managers.each do |protocol_manager, module_ids| - # Asynchronously check if any processes are timing out on comms, and if so, restart them - spawn(same_thread: true) do - state = begin - # If there's an empty response, the modules that were meant to be running are not. - # This is taken as a sign that the process is dead. - # Alternatively, if the response times out, the process is dead. - Tasker.timeout(PROCESS_COMMS_TIMEOUT) do - protocol_manager.info - end - - State::Running - rescue error : Tasker::Timeout - Log.warn(exception: error) { "unresponsive process manager for #{module_ids.join(", ")}" } - State::Unresponsive + spawn(same_thread: true) do + state = begin + # If there's an empty response, the modules that were meant to be running are not. + # This is taken as a sign that the process is dead. + # Alternatively, if the response times out, the process is dead. + Tasker.timeout(PROCESS_COMMS_TIMEOUT) do + protocol_manager.info end - checks.send({state, {protocol_manager, module_ids}}) + State::Running + rescue error : Tasker::Timeout + Log.warn(exception: error) { "unresponsive process manager for #{module_ids.join(", ")}" } + State::Unresponsive end - end - grouped_managers.size + checks.send({state, {protocol_manager, module_ids}}) + end end - local_processes.with_module_managers do |module_manager_map| - # Synchronously handle restarting unresponsive/dead drivers - total_protocol_managers.times do - state, driver_state = checks.receive - protocol_manager, module_ids = driver_state + total_protocol_managers = grouped_managers.size - next if state.running? + # Synchronously handle restarting unresponsive/dead drivers + total_protocol_managers.times do + state, driver_state = checks.receive + protocol_manager, module_ids = driver_state - # Ensure the process is killed - if state.unresponsive? - Process.signal(Signal::KILL, protocol_manager.pid) rescue nil - end + next if state.running? - # Kill the process manager's IO, unblocking any fibers waiting on a response - protocol_manager.@io.try(&.close) rescue nil + # Ensure the process is killed + if state.unresponsive? + Process.signal(Signal::KILL, protocol_manager.pid) rescue nil + end + + # Kill the process manager's IO, unblocking any fibers waiting on a response + protocol_manager.@io.try(&.close) rescue nil - Log.warn { {message: "restarting unresponsive driver", state: state.to_s, driver_path: protocol_manager.@driver_path} } + Log.warn { {message: "restarting unresponsive driver", state: state.to_s, driver_path: protocol_manager.@driver_path} } - # Determine if any new modules have been loaded onto the driver that needs to be restarted - fresh_module_ids = module_manager_map.compact_map do |module_id, pm| + # Determine if any new modules have been loaded onto the driver that needs to be restarted + local_processes.with_module_managers do |module_managers| + fresh_module_ids = module_managers.compact_map do |module_id, pm| module_id if pm == protocol_manager end @@ -99,18 +95,18 @@ module PlaceOS::Core module_ids |= fresh_module_ids # Remove the dead manager from the map - module_manager_map.reject!(module_ids) - - # Restart all the modules previously assigned to the dead manager - # - # NOTE: - # Make this independent of a database query by using the dead manager's stored module_ids and payloads. - # This will allow this module to be included in `PlaceOS::Edge::Client`. - # To do so, one will need to create the module manager (currently done by the `load_module` below (which is in `PlaceOS::Core::ModuleManager`)) - Model::Module.find_all(module_ids).each do |mod| - Log.debug { "reloading #{mod.id} after restarting unresponsive driver" } - load_module(mod) - end + module_managers.reject!(module_ids) + end + + # Restart all the modules previously assigned to the dead manager + # + # NOTE: + # Make this independent of a database query by using the dead manager's stored module_ids and payloads. + # This will allow this module to be included in `PlaceOS::Edge::Client`. + # To do so, one will need to create the module manager (currently done by the `load_module` below (which is in `PlaceOS::Core::ModuleManager`)) + Model::Module.find_all(module_ids).each do |mod| + Log.debug { "reloading #{mod.id} after restarting unresponsive driver" } + load_module(mod) end end diff --git a/src/placeos-core/process_manager/common.cr b/src/placeos-core/process_manager/common.cr index c5f2ea9f..b027aca2 100644 --- a/src/placeos-core/process_manager/common.cr +++ b/src/placeos-core/process_manager/common.cr @@ -217,8 +217,11 @@ module PlaceOS::Core::ProcessManager::Common @driver_protocol_managers : Hash(String, Driver::Protocol::Management) = {} of String => Driver::Protocol::Management protected def with_module_managers(&) - managers = protocol_manager_lock.synchronize { @module_protocol_managers.dup } - yield managers + protocol_manager_lock.synchronize { yield @module_protocol_managers } + end + + protected def get_module_managers + protocol_manager_lock.synchronize { @module_protocol_managers.dup } end protected def protocol_manager_by_module?(module_id) : Driver::Protocol::Management?