Skip to content

Commit

Permalink
Merge pull request #765 from agrare/add_threaded_saver_setting
Browse files Browse the repository at this point in the history
Delay WaitForUpdates and save in one thread to prevent "update storm" causing saver thread to be backed up

(cherry picked from commit a37bc28)
  • Loading branch information
Fryguy authored and agrare committed Feb 2, 2022
1 parent 149b666 commit dbf88a2
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ class ManageIQ::Providers::Vmware::InfraManager::Inventory::Collector
include PropertyCollector
include Vmdb::Logging

def initialize(ems, saver)
def initialize(ems)
@ems = ems
@exit_requested = false
@cache = ems.class::Inventory::Cache.new
@saver = saver
@cache = cache_klass.new
@saver = saver_klass.new
@vim_thread = nil
end

Expand Down Expand Up @@ -103,6 +103,11 @@ def targeted_refresh(vim, property_filter, version)

parse_updates(vim, parser, updated_objects)
save_inventory(persister)

# Prevent WaitForUpdatesEx from "spinning" in a tight loop if updates are
# constantly available. This allows for more updates to be batched together
# making for more efficient saving and reducing the API call load on the VC.
sleep(refresh_settings.update_poll_interval)
end

version
Expand Down Expand Up @@ -370,7 +375,7 @@ def parse_storage_profiles(vim, parser)
end

def save_inventory(persister)
saver.queue_save_inventory(persister)
saver.save_inventory(persister)
end

def log_header
Expand Down Expand Up @@ -398,18 +403,30 @@ def full_refresh_needed?
end

def full_refresh_interval
(Settings.ems_refresh["vmwarews"].try(:refresh_interval) || Settings.ems_refresh.refresh_interval).to_i_with_method
(refresh_settings.refresh_interval || Settings.ems_refresh.refresh_interval).to_i_with_method
end

def refresh_settings
Settings.ems_refresh.vmwarews
end

def cache_klass
ManageIQ::Providers::Vmware::InfraManager::Inventory::Cache
end

def full_persister_klass
@full_persister_klass ||= ems.class::Inventory::Persister::Full
ManageIQ::Providers::Vmware::InfraManager::Inventory::Persister::Full
end

def targeted_persister_klass
@targeted_persister_klass ||= ems.class::Inventory::Persister::Targeted
ManageIQ::Providers::Vmware::InfraManager::Inventory::Persister::Targeted
end

def parser_klass
@parser_klass ||= ems.class::Inventory::Parser
ManageIQ::Providers::Vmware::InfraManager::Inventory::Parser
end

def saver_klass
ManageIQ::Providers::Vmware::InfraManager::Inventory::Saver
end
end
Original file line number Diff line number Diff line change
@@ -1,85 +1,7 @@
class ManageIQ::Providers::Vmware::InfraManager::Inventory::Saver
include Vmdb::Logging

def initialize(threaded: true)
@join_limit = 30
@queue = Queue.new
@should_exit = Concurrent::AtomicBoolean.new
@threaded = threaded
@thread = nil
end

def start_thread
return unless threaded

@thread = Thread.new do
saver_thread
_log.info("Save inventory thread exiting")
end

_log.info("Save inventory thread started")
end

def stop_thread(wait: true)
return unless threaded

_log.info("Save inventory thread stopping...")

should_exit.make_true
queue.push(nil) # Force the blocking queue.pop call to return
join_thread if wait
end

# This method will re-start the saver thread if it has crashed or terminated
# prematurely, but is only safe to be called from a single thread. Given
# wait_for_updates has to be single threaded this should be fine but if you
# intend to queue up save_inventory from multiple calling threads a mutex
# must be added around ensure_saver_thread
def queue_save_inventory(persister)
_log.debug { "queueing save_inventory [#{persister.tracking_uuid}]" }

if threaded
ensure_saver_thread
queue.push(persister)
else
save_inventory(persister)
end
end

private

attr_reader :join_limit, :queue, :should_exit, :thread, :threaded

def saver_thread
until should_exit.true?
persister = queue.pop
next if persister.nil?

save_inventory(persister)
end
rescue => err
_log.warn(err)
_log.log_backtrace(err)
end

def join_thread
return unless thread&.alive?

unless thread.join(join_limit)
thread.kill
end
end

def ensure_saver_thread
return if thread&.alive?

_log.warn("Save inventory thread exited, restarting")
start_thread
end

def save_inventory(persister)
_log.debug { "running save_inventory [#{persister.tracking_uuid}]" }

save_inventory_start_time = Time.now.utc
persister.persist!
update_ems_refresh_stats(persister.manager)
Expand All @@ -93,6 +15,8 @@ def save_inventory(persister)
update_ems_refresh_stats(persister.manager, :error => err.to_s)
end

private

def update_ems_refresh_stats(ems, error: nil)
ems.update(:last_refresh_error => error, :last_refresh_date => Time.now.utc)
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,8 @@ def deliver_queue_message(msg)

attr_accessor :ems, :collector

def saver
@saver ||= ems.class::Inventory::Saver.new
end

def start_inventory_collector
self.collector = ems.class::Inventory::Collector.new(ems, saver)
self.collector = ManageIQ::Providers::Vmware::InfraManager::Inventory::Collector.new(ems)
collector.start
_log.info("Started inventory collector")
end
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
require 'VMwareWebService/MiqVim'
require 'http-access2' # Required in case it is not already loaded

module ManageIQ::Providers
module Vmware
class InfraManager::Refresher < ManageIQ::Providers::BaseManager::Refresher
Expand All @@ -16,8 +13,7 @@ def refresh
raise NotImplementedError, "not implemented in production mode" if Rails.env.production?

ems_by_ems_id.each do |_ems_id, ems|
saver = ems.class::Inventory::Saver.new(:threaded => false)
collector = ems.class::Inventory::Collector.new(ems, saver)
collector = ems.class::Inventory::Collector.new(ems)
collector.refresh
end
end
Expand Down
3 changes: 3 additions & 0 deletions config/settings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
:ems_refresh:
:vmware_cloud:
:get_public_images: false
:vmwarews:
:refresh_interval: 24.hours
:update_poll_interval: 1.second
:http_proxy:
:vmware_cloud:
:host:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
describe ManageIQ::Providers::Vmware::InfraManager::Inventory::Parser do
let(:ems) { FactoryBot.create(:ems_vmware) }
let(:saver) { ManageIQ::Providers::Vmware::InfraManager::Inventory::Saver.new(:threaded => false) }
let(:collector) { ManageIQ::Providers::Vmware::InfraManager::Inventory::Collector.new(ems, saver) }
let(:collector) { ManageIQ::Providers::Vmware::InfraManager::Inventory::Collector.new(ems) }
let(:persister) { ManageIQ::Providers::Vmware::InfraManager::Inventory::Persister::Targeted.new(ems) }
let(:parser) { described_class.new(collector, persister) }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,31 @@
ems.update_authentication(:default => {:userid => username, :password => password})
end
end
let(:saver) { ManageIQ::Providers::Vmware::InfraManager::Inventory::Saver.new(:threaded => false) }
let(:collector) { ManageIQ::Providers::Vmware::InfraManager::Inventory::Collector.new(ems, saver) }
let(:collector) { ManageIQ::Providers::Vmware::InfraManager::Inventory::Collector.new(ems) }
let(:category) do
require "vsphere-automation-cis"
VSphereAutomation::CIS::CisTaggingCategoryModel.new(
:id => "urn:vmomi:InventoryServiceCategory:aece75c1-0157-498c-b7d9-43e0532ddce8:GLOBAL",
:name => "Category1",
:description => "Description",
:cardinality => "SINGLE",
:used_by => []
)
end

let(:tag) do
require "vsphere-automation-cis"
VSphereAutomation::CIS::CisTaggingTagModel.new(
:id => "urn:vmomi:InventoryServiceTag:43b0c084-4e91-4950-8cc4-c81cb46b701f:GLOBAL",
:category_id => "urn:vmomi:InventoryServiceCategory:aece75c1-0157-498c-b7d9-43e0532ddce8:GLOBAL",
:name => "Tag1",
:description => "Tag Description",
:used_by => []
)
end

let!(:env_tag_mapping) { FactoryBot.create(:tag_mapping_with_category, :label_name => "Category1") }
let(:env_tag_mapping_category) { env_tag_mapping.tag.classification }

context "#monitor_updates" do
context "full refresh" do
Expand Down

0 comments on commit dbf88a2

Please sign in to comment.