Skip to content

Commit

Permalink
Merge a81227e into 542e2f0
Browse files Browse the repository at this point in the history
  • Loading branch information
agrare authored Jan 24, 2022
2 parents 542e2f0 + a81227e commit a1c7e44
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ class ManageIQ::Providers::Vmware::InfraManager::Inventory::Collector
include PropertyCollector
include Vmdb::Logging

def initialize(ems, saver)
def initialize(ems)
@ems = ems
@exit_requested = false
@cache = ems.class::Inventory::Cache.new
@saver = saver
@cache = cache_klass.new
@saver = saver_klass.new
@vim_thread = nil
end

Expand Down Expand Up @@ -96,13 +96,19 @@ def full_refresh(vim, property_filter)
end

def targeted_refresh(vim, property_filter, version)
persister = targeted_persister_klass.new(ems)
parser = parser_klass.new(self, persister)

version, updated_objects = monitor_updates(vim, property_filter, version)
if updated_objects.any?
persister = targeted_persister_klass.new(ems)
parser = parser_klass.new(self, persister)

parse_updates(vim, parser, updated_objects)
save_inventory(persister)
parse_updates(vim, parser, updated_objects)
save_inventory(persister)

# Prevent WaitForUpdatesEx from "spinning" in a tight loop if updates are
# constantly available. This allows for more updates to be batched together
# making for more efficient saving and reducing the API call load on the VC.
sleep(refresh_settings.update_poll_interval)
end

version
end
Expand Down Expand Up @@ -369,7 +375,7 @@ def parse_storage_profiles(vim, parser)
end

def save_inventory(persister)
saver.queue_save_inventory(persister)
saver.save_inventory(persister)
end

def log_header
Expand Down Expand Up @@ -397,18 +403,30 @@ def full_refresh_needed?
end

def full_refresh_interval
(Settings.ems_refresh["vmwarews"].try(:refresh_interval) || Settings.ems_refresh.refresh_interval).to_i_with_method
(refresh_settings.refresh_interval || Settings.ems_refresh.refresh_interval).to_i_with_method
end

def refresh_settings
Settings.ems_refresh.vmwarews
end

def cache_klass
ManageIQ::Providers::Vmware::InfraManager::Inventory::Cache
end

def full_persister_klass
@full_persister_klass ||= ems.class::Inventory::Persister::Full
ManageIQ::Providers::Vmware::InfraManager::Inventory::Persister::Full
end

def targeted_persister_klass
@targeted_persister_klass ||= ems.class::Inventory::Persister::Targeted
ManageIQ::Providers::Vmware::InfraManager::Inventory::Persister::Targeted
end

def parser_klass
@parser_klass ||= ems.class::Inventory::Parser
ManageIQ::Providers::Vmware::InfraManager::Inventory::Parser
end

def saver_klass
ManageIQ::Providers::Vmware::InfraManager::Inventory::Saver
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@ class ManageIQ::Providers::Vmware::InfraManager::Inventory::Persister < ManageIQ
require_nested :Full
require_nested :Targeted

attr_reader :tracking_uuid

def initialize_inventory_collections
# Build a UUID which can be used to track the collection and saving of this persister instance
@tracking_uuid = SecureRandom.uuid

add_collection(infra, :customization_specs)
add_collection(infra, :disks, :parent_inventory_collections => %i[vms_and_templates])
add_collection(infra, :distributed_virtual_switches)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,80 +1,6 @@
class ManageIQ::Providers::Vmware::InfraManager::Inventory::Saver
include Vmdb::Logging

def initialize(threaded: true)
@join_limit = 30
@queue = Queue.new
@should_exit = Concurrent::AtomicBoolean.new
@threaded = threaded
@thread = nil
end

def start_thread
return unless threaded

@thread = Thread.new do
saver_thread
_log.info("Save inventory thread exiting")
end

_log.info("Save inventory thread started")
end

def stop_thread(wait: true)
return unless threaded

_log.info("Save inventory thread stopping...")

should_exit.make_true
queue.push(nil) # Force the blocking queue.pop call to return
join_thread if wait
end

# This method will re-start the saver thread if it has crashed or terminated
# prematurely, but is only safe to be called from a single thread. Given
# wait_for_updates has to be single threaded this should be fine but if you
# intend to queue up save_inventory from multiple calling threads a mutex
# must be added around ensure_saver_thread
def queue_save_inventory(persister)
if threaded
ensure_saver_thread
queue.push(persister)
else
save_inventory(persister)
end
end

private

attr_reader :join_limit, :queue, :should_exit, :thread, :threaded

def saver_thread
until should_exit.true?
persister = queue.pop
next if persister.nil?

save_inventory(persister)
end
rescue => err
_log.warn(err)
_log.log_backtrace(err)
end

def join_thread
return unless thread&.alive?

unless thread.join(join_limit)
thread.kill
end
end

def ensure_saver_thread
return if thread&.alive?

_log.warn("Save inventory thread exited, restarting")
start_thread
end

def save_inventory(persister)
save_inventory_start_time = Time.now.utc
persister.persist!
Expand All @@ -89,6 +15,8 @@ def save_inventory(persister)
update_ems_refresh_stats(persister.manager, :error => err.to_s)
end

private

def update_ems_refresh_stats(ems, error: nil)
ems.update(:last_refresh_error => error, :last_refresh_date => Time.now.utc)
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,8 @@ def deliver_queue_message(msg)

attr_accessor :ems, :collector

def saver
@saver ||= ems.class::Inventory::Saver.new
end

def start_inventory_collector
self.collector = ems.class::Inventory::Collector.new(ems, saver)
self.collector = ManageIQ::Providers::Vmware::InfraManager::Inventory::Collector.new(ems)
collector.start
_log.info("Started inventory collector")
end
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
require 'VMwareWebService/MiqVim'
require 'http-access2' # Required in case it is not already loaded

module ManageIQ::Providers
module Vmware
class InfraManager::Refresher < ManageIQ::Providers::BaseManager::Refresher
Expand All @@ -16,8 +13,7 @@ def refresh
raise NotImplementedError, "not implemented in production mode" if Rails.env.production?

ems_by_ems_id.each do |_ems_id, ems|
saver = ems.class::Inventory::Saver.new(:threaded => false)
collector = ems.class::Inventory::Collector.new(ems, saver)
collector = ems.class::Inventory::Collector.new(ems)
collector.refresh
end
end
Expand Down
3 changes: 3 additions & 0 deletions config/settings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
:ems_refresh:
:vmware_cloud:
:get_public_images: false
:vmwarews:
:refresh_interval: 24.hours
:update_poll_interval: 1.second
:http_proxy:
:vmware_cloud:
:host:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
describe ManageIQ::Providers::Vmware::InfraManager::Inventory::Parser do
let(:ems) { FactoryBot.create(:ems_vmware) }
let(:saver) { ManageIQ::Providers::Vmware::InfraManager::Inventory::Saver.new(:threaded => false) }
let(:collector) { ManageIQ::Providers::Vmware::InfraManager::Inventory::Collector.new(ems, saver) }
let(:collector) { ManageIQ::Providers::Vmware::InfraManager::Inventory::Collector.new(ems) }
let(:persister) { ManageIQ::Providers::Vmware::InfraManager::Inventory::Persister::Targeted.new(ems) }
let(:parser) { described_class.new(collector, persister) }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
ems.update_authentication(:default => {:userid => username, :password => password})
end
end
let(:saver) { ManageIQ::Providers::Vmware::InfraManager::Inventory::Saver.new(:threaded => false) }
let(:collector) { ManageIQ::Providers::Vmware::InfraManager::Inventory::Collector.new(ems, saver) }
let(:collector) { ManageIQ::Providers::Vmware::InfraManager::Inventory::Collector.new(ems) }
let(:category) do
require "vsphere-automation-cis"
VSphereAutomation::CIS::CisTaggingCategoryModel.new(
Expand Down

0 comments on commit a1c7e44

Please sign in to comment.