Skip to content

Commit

Permalink
Merge dbf88a2 into 149b666
Browse files Browse the repository at this point in the history
  • Loading branch information
agrare committed Feb 2, 2022
2 parents 149b666 + dbf88a2 commit ba8d77d
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ class ManageIQ::Providers::Vmware::InfraManager::Inventory::Collector
include PropertyCollector
include Vmdb::Logging

def initialize(ems, saver)
def initialize(ems)
@ems = ems
@exit_requested = false
@cache = ems.class::Inventory::Cache.new
@saver = saver
@cache = cache_klass.new
@saver = saver_klass.new
@vim_thread = nil
end

Expand Down Expand Up @@ -103,6 +103,11 @@ def targeted_refresh(vim, property_filter, version)

parse_updates(vim, parser, updated_objects)
save_inventory(persister)

# Prevent WaitForUpdatesEx from "spinning" in a tight loop if updates are
# constantly available. This allows for more updates to be batched together
# making for more efficient saving and reducing the API call load on the VC.
sleep(refresh_settings.update_poll_interval)
end

version
Expand Down Expand Up @@ -370,7 +375,7 @@ def parse_storage_profiles(vim, parser)
end

def save_inventory(persister)
saver.queue_save_inventory(persister)
saver.save_inventory(persister)
end

def log_header
Expand Down Expand Up @@ -398,18 +403,30 @@ def full_refresh_needed?
end

def full_refresh_interval
(Settings.ems_refresh["vmwarews"].try(:refresh_interval) || Settings.ems_refresh.refresh_interval).to_i_with_method
(refresh_settings.refresh_interval || Settings.ems_refresh.refresh_interval).to_i_with_method
end

def refresh_settings
Settings.ems_refresh.vmwarews
end

def cache_klass
ManageIQ::Providers::Vmware::InfraManager::Inventory::Cache
end

def full_persister_klass
@full_persister_klass ||= ems.class::Inventory::Persister::Full
ManageIQ::Providers::Vmware::InfraManager::Inventory::Persister::Full
end

def targeted_persister_klass
@targeted_persister_klass ||= ems.class::Inventory::Persister::Targeted
ManageIQ::Providers::Vmware::InfraManager::Inventory::Persister::Targeted
end

def parser_klass
@parser_klass ||= ems.class::Inventory::Parser
ManageIQ::Providers::Vmware::InfraManager::Inventory::Parser
end

def saver_klass
ManageIQ::Providers::Vmware::InfraManager::Inventory::Saver
end
end
Original file line number Diff line number Diff line change
@@ -1,85 +1,7 @@
class ManageIQ::Providers::Vmware::InfraManager::Inventory::Saver
include Vmdb::Logging

def initialize(threaded: true)
@join_limit = 30
@queue = Queue.new
@should_exit = Concurrent::AtomicBoolean.new
@threaded = threaded
@thread = nil
end

def start_thread
return unless threaded

@thread = Thread.new do
saver_thread
_log.info("Save inventory thread exiting")
end

_log.info("Save inventory thread started")
end

def stop_thread(wait: true)
return unless threaded

_log.info("Save inventory thread stopping...")

should_exit.make_true
queue.push(nil) # Force the blocking queue.pop call to return
join_thread if wait
end

# This method will re-start the saver thread if it has crashed or terminated
# prematurely, but is only safe to be called from a single thread. Given
# wait_for_updates has to be single threaded this should be fine but if you
# intend to queue up save_inventory from multiple calling threads a mutex
# must be added around ensure_saver_thread
def queue_save_inventory(persister)
_log.debug { "queueing save_inventory [#{persister.tracking_uuid}]" }

if threaded
ensure_saver_thread
queue.push(persister)
else
save_inventory(persister)
end
end

private

attr_reader :join_limit, :queue, :should_exit, :thread, :threaded

def saver_thread
until should_exit.true?
persister = queue.pop
next if persister.nil?

save_inventory(persister)
end
rescue => err
_log.warn(err)
_log.log_backtrace(err)
end

def join_thread
return unless thread&.alive?

unless thread.join(join_limit)
thread.kill
end
end

def ensure_saver_thread
return if thread&.alive?

_log.warn("Save inventory thread exited, restarting")
start_thread
end

def save_inventory(persister)
_log.debug { "running save_inventory [#{persister.tracking_uuid}]" }

save_inventory_start_time = Time.now.utc
persister.persist!
update_ems_refresh_stats(persister.manager)
Expand All @@ -93,6 +15,8 @@ def save_inventory(persister)
update_ems_refresh_stats(persister.manager, :error => err.to_s)
end

private

def update_ems_refresh_stats(ems, error: nil)
ems.update(:last_refresh_error => error, :last_refresh_date => Time.now.utc)
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,8 @@ def deliver_queue_message(msg)

attr_accessor :ems, :collector

def saver
@saver ||= ems.class::Inventory::Saver.new
end

def start_inventory_collector
self.collector = ems.class::Inventory::Collector.new(ems, saver)
self.collector = ManageIQ::Providers::Vmware::InfraManager::Inventory::Collector.new(ems)
collector.start
_log.info("Started inventory collector")
end
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
require 'VMwareWebService/MiqVim'
require 'http-access2' # Required in case it is not already loaded

module ManageIQ::Providers
module Vmware
class InfraManager::Refresher < ManageIQ::Providers::BaseManager::Refresher
Expand All @@ -16,8 +13,7 @@ def refresh
raise NotImplementedError, "not implemented in production mode" if Rails.env.production?

ems_by_ems_id.each do |_ems_id, ems|
saver = ems.class::Inventory::Saver.new(:threaded => false)
collector = ems.class::Inventory::Collector.new(ems, saver)
collector = ems.class::Inventory::Collector.new(ems)
collector.refresh
end
end
Expand Down
3 changes: 3 additions & 0 deletions config/settings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
:ems_refresh:
:vmware_cloud:
:get_public_images: false
:vmwarews:
:refresh_interval: 24.hours
:update_poll_interval: 1.second
:http_proxy:
:vmware_cloud:
:host:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
describe ManageIQ::Providers::Vmware::InfraManager::Inventory::Parser do
let(:ems) { FactoryBot.create(:ems_vmware) }
let(:saver) { ManageIQ::Providers::Vmware::InfraManager::Inventory::Saver.new(:threaded => false) }
let(:collector) { ManageIQ::Providers::Vmware::InfraManager::Inventory::Collector.new(ems, saver) }
let(:collector) { ManageIQ::Providers::Vmware::InfraManager::Inventory::Collector.new(ems) }
let(:persister) { ManageIQ::Providers::Vmware::InfraManager::Inventory::Persister::Targeted.new(ems) }
let(:parser) { described_class.new(collector, persister) }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,31 @@
ems.update_authentication(:default => {:userid => username, :password => password})
end
end
let(:saver) { ManageIQ::Providers::Vmware::InfraManager::Inventory::Saver.new(:threaded => false) }
let(:collector) { ManageIQ::Providers::Vmware::InfraManager::Inventory::Collector.new(ems, saver) }
let(:collector) { ManageIQ::Providers::Vmware::InfraManager::Inventory::Collector.new(ems) }
let(:category) do
require "vsphere-automation-cis"
VSphereAutomation::CIS::CisTaggingCategoryModel.new(
:id => "urn:vmomi:InventoryServiceCategory:aece75c1-0157-498c-b7d9-43e0532ddce8:GLOBAL",
:name => "Category1",
:description => "Description",
:cardinality => "SINGLE",
:used_by => []
)
end

let(:tag) do
require "vsphere-automation-cis"
VSphereAutomation::CIS::CisTaggingTagModel.new(
:id => "urn:vmomi:InventoryServiceTag:43b0c084-4e91-4950-8cc4-c81cb46b701f:GLOBAL",
:category_id => "urn:vmomi:InventoryServiceCategory:aece75c1-0157-498c-b7d9-43e0532ddce8:GLOBAL",
:name => "Tag1",
:description => "Tag Description",
:used_by => []
)
end

let!(:env_tag_mapping) { FactoryBot.create(:tag_mapping_with_category, :label_name => "Category1") }
let(:env_tag_mapping_category) { env_tag_mapping.tag.classification }

context "#monitor_updates" do
context "full refresh" do
Expand Down

0 comments on commit ba8d77d

Please sign in to comment.