Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve cluster stabilisation under adverse conditions #254

Merged
merged 1 commit into from
Feb 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions shard.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ shards:

action-controller:
git: https://github.com/spider-gazelle/action-controller.git
version: 5.5.3
version: 5.5.4

active-model:
git: https://github.com/spider-gazelle/active-model.git
version: 4.2.3

ameba:
git: https://github.com/crystal-ameba/ameba.git
version: 1.3.1
version: 1.4.0

auto_initialize:
git: https://github.com/kostya/auto_initialize.git
Expand Down Expand Up @@ -191,7 +191,7 @@ shards:

placeos-driver:
git: https://github.com/placeos/driver.git
version: 6.6.3
version: 6.7.2

placeos-log-backend:
git: https://github.com/place-labs/log-backend.git
Expand Down
4 changes: 2 additions & 2 deletions spec/processes/edge_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ module PlaceOS::Core::ProcessManager
{client, edge_manager}
end

def self.with_edge
def self.with_edge(&)
with_driver do |mod, driver_path, _driver|
if (existing_edge_id = mod.edge_id)
if existing_edge_id = mod.edge_id
mod.running = false
mod.save!
edge = Model::Edge.find!(existing_edge_id)
Expand Down
2 changes: 1 addition & 1 deletion spec/processes/local_spec.cr
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
require "../helper"

module PlaceOS::Core::ProcessManager
def self.with_driver
def self.with_driver(&)
_working_directory, repository, driver, mod = setup(role: PlaceOS::Model::Driver::Role::Service)
Cloning.clone_and_install(repository)
result = Compiler.build_driver(driver.file_name, repository.folder_name, driver.commit, id: driver.id)
Expand Down
2 changes: 1 addition & 1 deletion src/placeos-core/healthcheck.cr
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ module PlaceOS::Core::Healthcheck
).then(&.all?).get
end

private def self.check_resource?(resource)
private def self.check_resource?(resource, &)
Log.trace { "healthchecking #{resource}" }
!!yield
rescue exception
Expand Down
16 changes: 13 additions & 3 deletions src/placeos-core/module_manager.cr
Original file line number Diff line number Diff line change
Expand Up @@ -321,10 +321,12 @@ module PlaceOS::Core
rendezvous_hash = RendezvousHash.new(nodes: nodes.map(&->HoundDog::Discovery.to_hash_value(HoundDog::Service::Node)))

success_count, fail_count = 0_i64, 0_i64
waiting = Array(Promise::DeferredPromise(Nil)).new(STABILIZE_BATCH_SIZE)
timeout_period = 5.seconds
waiting = Hash(String?, Promise::DeferredPromise(Nil)).new

Model::Module.all.in_groups_of(STABILIZE_BATCH_SIZE, reuse: true) do |modules|
modules.each.reject(Nil).each do |mod|
waiting << Promise.defer(same_thread: true) do
waiting[mod.id] = Promise.defer(same_thread: true, timeout: timeout_period) do
begin
load_module(mod, rendezvous_hash)
success_count += 1
Expand All @@ -334,7 +336,15 @@ module PlaceOS::Core
end
nil
end
Promise.all(waiting).get

waiting.each do |mod_id, promise|
begin
promise.get
rescue error
fail_count += 1
Log.error(exception: error) { "load timeout during stabilization: #{mod_id}" }
end
end
waiting.clear
end
end
Expand Down
2 changes: 1 addition & 1 deletion src/placeos-core/process_manager/common.cr
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ module PlaceOS::Core::ProcessManager::Common
# Mapping from driver path to protocol manager
@driver_protocol_managers : Hash(String, Driver::Protocol::Management) = {} of String => Driver::Protocol::Management

protected def with_module_managers(&block)
protected def with_module_managers(&)
protocol_manager_lock.synchronize do
yield @module_protocol_managers
end
Expand Down
2 changes: 1 addition & 1 deletion src/placeos-core/process_manager/edge.cr
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ module PlaceOS::Core
response.try &.status
end

protected def boolean_response(sequence_id, request)
protected def boolean_response(sequence_id, request, &)
success = begin
result = yield
result.is_a?(Bool) ? result : true
Expand Down
4 changes: 3 additions & 1 deletion src/placeos-core/process_manager/local.cr
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ module PlaceOS::Core
return true
end

if (existing_driver_manager = protocol_manager_by_driver?(driver_key))
if existing_driver_manager = protocol_manager_by_driver?(driver_key)
Log.debug { "using existing protocol manager" }
set_module_protocol_manager(module_id, existing_driver_manager)
else
Expand Down Expand Up @@ -144,6 +144,7 @@ module PlaceOS::Core
request.code ||= 500
end

request.cmd = :result
request
end

Expand All @@ -170,6 +171,7 @@ module PlaceOS::Core
end
end

request.cmd = :result
request
end

Expand Down
2 changes: 1 addition & 1 deletion src/placeos-core/resource_manager.cr
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ module PlaceOS::Core
@cloning = cloning || Cloning.new(testing: testing)
end

def start
def start(&)
start_lock.synchronize {
return if started?

Expand Down
6 changes: 3 additions & 3 deletions src/placeos-edge/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ module PlaceOS::Edge
# Initialize the WebSocket API
#
# Optionally accepts a block called after connection has been established.
def connect(initial_socket : HTTP::WebSocket? = nil)
def connect(initial_socket : HTTP::WebSocket? = nil, &)
Log.info { "connecting to #{host}" }

@transport = Transport.new(
Expand Down Expand Up @@ -406,7 +406,7 @@ module PlaceOS::Edge
Log.context.set(module_id: module_id, driver_key: driver_key)

if !protocol_manager_by_module?(module_id)
if (existing_driver_manager = protocol_manager_by_driver?(driver_key))
if existing_driver_manager = protocol_manager_by_driver?(driver_key)
# Use the existing driver protocol manager
set_module_protocol_manager(module_id, existing_driver_manager)
else
Expand Down Expand Up @@ -521,7 +521,7 @@ module PlaceOS::Edge

# Bundles up the result of a command into a `Success` response
#
protected def boolean_command(sequence_id, request)
protected def boolean_command(sequence_id, request, &)
success = begin
result = yield
result.is_a?(Bool) ? result : true
Expand Down