Skip to content

Commit

Permalink
Merge pull request #700 from agrare/stop_saver_thread_when_vim_thread…
Browse files Browse the repository at this point in the history
…_dies

Move save_inventory thread out of the inventory collector

(cherry picked from commit 9319c46)
  • Loading branch information
Fryguy committed Mar 5, 2021
1 parent f52b4db commit 42e38d3
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,20 @@ class ManageIQ::Providers::Vmware::InfraManager::Inventory::Collector
include PropertyCollector
include Vmdb::Logging

def initialize(ems)
def initialize(ems, saver)
@ems = ems
@exit_requested = false
@cache = ems.class::Inventory::Cache.new
@saver = ems.class::Inventory::Saver.new
@saver = saver
@vim_thread = nil
end

def refresh
saver.start_thread
self.exit_requested = true
vim_collector
saver.stop_thread
end

def start
saver.start_thread
self.vim_thread = vim_collector_thread
end

Expand All @@ -28,20 +25,16 @@ def running?

def stop(join_timeout = 2.minutes)
_log.info("#{log_header} Monitor updates thread exiting...")
self.exit_requested = true
return if join_timeout.nil?

# The WaitOptions for WaitForUpdatesEx call sets maxWaitSeconds to 60 seconds
self.exit_requested = true
vim_thread&.join(join_timeout)
saver.stop_thread
self.exit_requested = false
end

def restart(join_timeout = 2.minutes)
self.exit_requested = true
vim_thread&.join(join_timeout)

self.exit_requested = false
self.vim_thread = vim_collector_thread
stop(join_timeout)
start
end

attr_accessor :cache, :categories_by_id, :tags_by_id, :tag_ids_by_attached_object
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
class ManageIQ::Providers::Vmware::InfraManager::Inventory::Saver
include Vmdb::Logging

def initialize
def initialize(threaded: true)
@join_limit = 30
@queue = Queue.new
@should_exit = Concurrent::AtomicBoolean.new
@threaded = ENV["RAILS_ENV"] != "test"
@threaded = threaded
@thread = nil
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def before_exit(_message, _exit_code)
end

def do_before_work_loop
# No need to queue an initial full refresh if we are streaming
start_inventory_collector
end

def do_work
Expand All @@ -34,8 +34,12 @@ def deliver_queue_message(msg)

attr_accessor :ems, :collector

def saver
@saver ||= ems.class::Inventory::Saver.new
end

def start_inventory_collector
self.collector = ems.class::Inventory::Collector.new(ems)
self.collector = ems.class::Inventory::Collector.new(ems, saver)
collector.start
_log.info("Started inventory collector")
end
Expand All @@ -44,7 +48,7 @@ def ensure_inventory_collector
return if collector&.running?

_log.warn("Inventory collector thread not running, restarting...") unless collector.nil?
start_inventory_collector
restart_inventory_collector
end

def stop_inventory_collector
Expand Down
13 changes: 11 additions & 2 deletions app/models/manageiq/providers/vmware/infra_manager/refresher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,20 @@
module ManageIQ::Providers
module Vmware
class InfraManager::Refresher < ManageIQ::Providers::BaseManager::Refresher
# This is a helper method to allow for developers to run a full refresh from
# a rails console with the typical `EmsRefresh.refresh(ems)` pattern.
#
# In production this is not used as the RefreshWorker processes full refreshes
# directly by restarting the collector thread, not by actually calling #refresh.
#
# If you need to force a full refresh in production mode you can still queue a full
# refresh with `EmsRefresh.queue_refresh(ems)` or `ems.queue_refresh`
def refresh
collector_klass = ManageIQ::Providers::Vmware::InfraManager::Inventory::Collector
raise NotImplementedError, "not implemented in production mode" if Rails.env.production?

ems_by_ems_id.each do |_ems_id, ems|
collector = collector_klass.new(ems)
saver = ems.class::Inventory::Saver.new(:threaded => false)
collector = ems.class::Inventory::Collector.new(ems, saver)
collector.refresh
end
end
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
describe ManageIQ::Providers::Vmware::InfraManager::Inventory::Parser do
let(:ems) { FactoryBot.create(:ems_vmware) }
let(:collector) { ManageIQ::Providers::Vmware::InfraManager::Inventory::Collector.new(ems) }
let(:saver) { ManageIQ::Providers::Vmware::InfraManager::Inventory::Saver.new(:threaded => false) }
let(:collector) { ManageIQ::Providers::Vmware::InfraManager::Inventory::Collector.new(ems, saver) }
let(:persister) { ManageIQ::Providers::Vmware::InfraManager::Inventory::Persister::Targeted.new(ems) }
let(:parser) { described_class.new(collector, persister) }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
ems.update_authentication(:default => {:userid => username, :password => password})
end
end
let(:collector) { ManageIQ::Providers::Vmware::InfraManager::Inventory::Collector.new(ems) }
let(:saver) { ManageIQ::Providers::Vmware::InfraManager::Inventory::Saver.new(:threaded => false) }
let(:collector) { ManageIQ::Providers::Vmware::InfraManager::Inventory::Collector.new(ems, saver) }

context "#monitor_updates" do
context "full refresh" do
Expand Down

0 comments on commit 42e38d3

Please sign in to comment.