Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delay WaitForUpdates and save in one thread to prevent "update storm" causing saver thread to be backed up #765

Merged
merged 7 commits into from
Dec 15, 2021

Conversation

agrare
Copy link
Member

@agrare agrare commented Dec 2, 2021

With separate collector and saver threads it is possible for the rate of collection to be faster than the rate of saving which causes saving inventory to be very delayed.

The WaitForUpdatesEx method takes a version and returns updates that have happened since that version so updates aren't going to be missed, and leads to more updates being processed in each save_inventory call which is more efficient.

#745

With separate collector and saver threads it is possible for the rate of
collection to be faster than the rate of saving which causes saving
inventory to be very delayed.
@agrare agrare requested a review from Fryguy as a code owner December 2, 2021 20:20
@Fryguy
Copy link
Member

Fryguy commented Dec 3, 2021

What would be the motivation for a user to change this option back to true?

@agrare
Copy link
Member Author

agrare commented Dec 3, 2021

🤷 just being cautious, if we drop the option this can get simplified quite a bit

@@ -435,4 +434,8 @@ def targeted_persister_klass
def parser_klass
@parser_klass ||= ems.class::Inventory::Parser
end

def saver_klass
@saver_klass ||= self.class.module_parent::Saver
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why I cached these class names...wonder how much time is actually saved doing this

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was just trying to be "cute" and save some columns but they're just hard-coded constants

@agrare agrare added the bug label Dec 3, 2021
@Fryguy
Copy link
Member

Fryguy commented Dec 3, 2021

Spoke with @agrare , and we decided to go a different route.

The long term goal is that the saver doesn't actually live in this worker but instead this worker would post updates to kafka and a separate saver worker would actually write those changes to the database. This is part of the reason we introduced the saver thread in the first place, in order to emulate what would happen in that world, and to make it easier to split later, since the code is already split apart.

Changing the saver thread to serialize with the collector actually causes the collector to make less frequent updates, thereby making them chunkier. But, the saver thread getting bogged down with little tiny updates would be exactly what would happen in that future world, and removing the saver thread here just ignores that future problem.

Instead, we're going to try to solve it more generally using a poll sleep. If the general problem is that the collector thread is creating updates that are too frequent and too tiny, then the fix is to slow it down. This can be accomplished with a simpler poll sleep and would help the current saver-thread problem as well as the future problem.

Based on the logs we saw, WaitForUpdates is kicking roughly every 0.15s which is overly fast...if we introduce a sleep poll of 1s, users probably won't even notice, and the queue depth would drop by roughly 10x. This may also have the side effect of putting a little less pressure on the VC itself.

@Fryguy
Copy link
Member

Fryguy commented Dec 6, 2021

Can you change the title to better reflect what we are doing now?

@agrare agrare changed the title Add a setting to enable/disable saver thread Delay WaitForUpdates and save in one thread to prevent "update storm" causing saver thread to be backed up Dec 6, 2021
@miq-bot
Copy link
Member

miq-bot commented Dec 6, 2021

Checked commits agrare/manageiq-providers-vmware@83605cd~...1e79eb3 with ruby 2.6.3, rubocop 1.13.0, haml-lint 0.35.0, and yamllint
6 files checked, 0 offenses detected
Everything looks fine. 🍪

@Fryguy Fryguy merged commit a37bc28 into ManageIQ:master Dec 15, 2021
@agrare agrare deleted the add_threaded_saver_setting branch December 15, 2021 20:53
@lamm
Copy link

lamm commented Jan 12, 2022

@agrare : Will this fix be available in the morphy release ?

@agrare
Copy link
Member Author

agrare commented Jan 12, 2022

👍 good call @lamm labeled morphy/yes? we'll review on the next community github issue triage meeting

@Fryguy
Copy link
Member

Fryguy commented Jan 24, 2022

@agrare A conflict occurred during the backport of this pull request to morphy.

If this pull request is based on another pull request that has not been marked for backport, add the appropriate labels to the other pull request. Otherwise, please create a new pull request direct to the morphy branch in order to resolve this.

Conflict details:

diff --cc app/models/manageiq/providers/vmware/infra_manager/inventory/collector.rb
index 5c299b24,d9ecda61..00000000
--- a/app/models/manageiq/providers/vmware/infra_manager/inventory/collector.rb
+++ b/app/models/manageiq/providers/vmware/infra_manager/inventory/collector.rb
@@@ -96,13 -96,19 +96,24 @@@ class ManageIQ::Providers::Vmware::Infr
    end
  
    def targeted_refresh(vim, property_filter, version)
 +    persister = targeted_persister_klass.new(ems)
 +    parser    = parser_klass.new(self, persister)
 +
      version, updated_objects = monitor_updates(vim, property_filter, version)
 -    if updated_objects.any?
 -      persister = targeted_persister_klass.new(ems)
 -      parser    = parser_klass.new(self, persister)
  
++<<<<<<< HEAD
 +    parse_updates(vim, parser, updated_objects)
 +    save_inventory(persister)
++=======
+       parse_updates(vim, parser, updated_objects)
+       save_inventory(persister)
+ 
+       # Prevent WaitForUpdatesEx from "spinning" in a tight loop if updates are
+       # constantly available.  This allows for more updates to be batched together
+       # making for more efficient saving and reducing the API call load on the VC.
+       sleep(refresh_settings.update_poll_interval)
+     end
++>>>>>>> a37bc28b... Merge pull request #765 from agrare/add_threaded_saver_setting
  
      version
    end
diff --cc app/models/manageiq/providers/vmware/infra_manager/inventory/saver.rb
index 327c522c,b7be0024..00000000
--- a/app/models/manageiq/providers/vmware/infra_manager/inventory/saver.rb
+++ b/app/models/manageiq/providers/vmware/infra_manager/inventory/saver.rb
@@@ -1,80 -1,6 +1,83 @@@
  class ManageIQ::Providers::Vmware::InfraManager::Inventory::Saver
    include Vmdb::Logging
  
++<<<<<<< HEAD
 +  def initialize(threaded: true)
 +    @join_limit  = 30
 +    @queue       = Queue.new
 +    @should_exit = Concurrent::AtomicBoolean.new
 +    @threaded    = threaded
 +    @thread      = nil
 +  end
 +
 +  def start_thread
 +    return unless threaded
 +
 +    @thread = Thread.new do
 +      saver_thread
 +      _log.info("Save inventory thread exiting")
 +    end
 +
 +    _log.info("Save inventory thread started")
 +  end
 +
 +  def stop_thread(wait: true)
 +    return unless threaded
 +
 +    _log.info("Save inventory thread stopping...")
 +
 +    should_exit.make_true
 +    queue.push(nil) # Force the blocking queue.pop call to return
 +    join_thread if wait
 +  end
 +
 +  # This method will re-start the saver thread if it has crashed or terminated
 +  # prematurely, but is only safe to be called from a single thread.  Given
 +  # wait_for_updates has to be single threaded this should be fine but if you
 +  # intend to queue up save_inventory from multiple calling threads a mutex
 +  # must be added around ensure_saver_thread
 +  def queue_save_inventory(persister)
 +    if threaded
 +      ensure_saver_thread
 +      queue.push(persister)
 +    else
 +      save_inventory(persister)
 +    end
 +  end
 +
 +  private
 +
 +  attr_reader :join_limit, :queue, :should_exit, :thread, :threaded
 +
 +  def saver_thread
 +    until should_exit.true?
 +      persister = queue.pop
 +      next if persister.nil?
 +
 +      save_inventory(persister)
 +    end
 +  rescue => err
 +    _log.warn(err)
 +    _log.log_backtrace(err)
 +  end
 +
 +  def join_thread
 +    return unless thread&.alive?
 +
 +    unless thread.join(join_limit)
 +      thread.kill
 +    end
 +  end
 +
 +  def ensure_saver_thread
 +    return if thread&.alive?
 +
 +    _log.warn("Save inventory thread exited, restarting")
 +    start_thread
 +  end
 +
++=======
++>>>>>>> a37bc28b... Merge pull request #765 from agrare/add_threaded_saver_setting
    def save_inventory(persister)
      save_inventory_start_time = Time.now.utc
      persister.persist!

agrare pushed a commit to agrare/manageiq-providers-vmware that referenced this pull request Jan 24, 2022
Delay WaitForUpdates and save in one thread to prevent "update storm" causing saver thread to be backed up

(cherry picked from commit a37bc28)
@agrare
Copy link
Member Author

agrare commented Jan 24, 2022

Morphy backport PR #782

@Fryguy
Copy link
Member

Fryguy commented Jan 24, 2022

Backport depends on #753 and #754

@Fryguy
Copy link
Member

Fryguy commented Jan 24, 2022

Backported to morphy in commit ec93d9f.

commit ec93d9f455210949a8094414bcec48d518c15c96
Author: Jason Frey <fryguy9@gmail.com>
Date:   Wed Dec 15 15:52:33 2021 -0500

    Merge pull request #765 from agrare/add_threaded_saver_setting
    
    Delay WaitForUpdates and save in one thread to prevent "update storm" causing saver thread to be backed up
    
    (cherry picked from commit a37bc28bfdfe211caea99f4ae39ff3b7af0e254c)

Fryguy added a commit that referenced this pull request Jan 24, 2022
Delay WaitForUpdates and save in one thread to prevent "update storm" causing saver thread to be backed up

(cherry picked from commit a37bc28)
@Fryguy
Copy link
Member

Fryguy commented Feb 2, 2022

@agrare A conflict occurred during the backport of this pull request to lasker.

If this pull request is based on another pull request that has not been marked for backport, add the appropriate labels to the other pull request. Otherwise, please create a new pull request direct to the lasker branch in order to resolve this.

Conflict details:

diff --cc app/models/manageiq/providers/vmware/infra_manager/inventory/collector.rb
index 5c299b24,d9ecda61..00000000
--- a/app/models/manageiq/providers/vmware/infra_manager/inventory/collector.rb
+++ b/app/models/manageiq/providers/vmware/infra_manager/inventory/collector.rb
@@@ -96,13 -96,19 +96,24 @@@ class ManageIQ::Providers::Vmware::Infr
    end
  
    def targeted_refresh(vim, property_filter, version)
 +    persister = targeted_persister_klass.new(ems)
 +    parser    = parser_klass.new(self, persister)
 +
++<<<<<<< HEAD
      version, updated_objects = monitor_updates(vim, property_filter, version)
 -    if updated_objects.any?
 -      persister = targeted_persister_klass.new(ems)
 -      parser    = parser_klass.new(self, persister)
  
 +    parse_updates(vim, parser, updated_objects)
 +    save_inventory(persister)
++=======
+       parse_updates(vim, parser, updated_objects)
+       save_inventory(persister)
+ 
+       # Prevent WaitForUpdatesEx from "spinning" in a tight loop if updates are
+       # constantly available.  This allows for more updates to be batched together
+       # making for more efficient saving and reducing the API call load on the VC.
+       sleep(refresh_settings.update_poll_interval)
+     end
++>>>>>>> a37bc28b (Merge pull request #765 from agrare/add_threaded_saver_setting)
  
      version
    end
diff --cc app/models/manageiq/providers/vmware/infra_manager/inventory/saver.rb
index 327c522c,b7be0024..00000000
--- a/app/models/manageiq/providers/vmware/infra_manager/inventory/saver.rb
+++ b/app/models/manageiq/providers/vmware/infra_manager/inventory/saver.rb
@@@ -1,80 -1,6 +1,83 @@@
  class ManageIQ::Providers::Vmware::InfraManager::Inventory::Saver
    include Vmdb::Logging
  
++<<<<<<< HEAD
 +  def initialize(threaded: true)
 +    @join_limit  = 30
 +    @queue       = Queue.new
 +    @should_exit = Concurrent::AtomicBoolean.new
 +    @threaded    = threaded
 +    @thread      = nil
 +  end
 +
 +  def start_thread
 +    return unless threaded
 +
 +    @thread = Thread.new do
 +      saver_thread
 +      _log.info("Save inventory thread exiting")
 +    end
 +
 +    _log.info("Save inventory thread started")
 +  end
 +
 +  def stop_thread(wait: true)
 +    return unless threaded
 +
 +    _log.info("Save inventory thread stopping...")
 +
 +    should_exit.make_true
 +    queue.push(nil) # Force the blocking queue.pop call to return
 +    join_thread if wait
 +  end
 +
 +  # This method will re-start the saver thread if it has crashed or terminated
 +  # prematurely, but is only safe to be called from a single thread.  Given
 +  # wait_for_updates has to be single threaded this should be fine but if you
 +  # intend to queue up save_inventory from multiple calling threads a mutex
 +  # must be added around ensure_saver_thread
 +  def queue_save_inventory(persister)
 +    if threaded
 +      ensure_saver_thread
 +      queue.push(persister)
 +    else
 +      save_inventory(persister)
 +    end
 +  end
 +
 +  private
 +
 +  attr_reader :join_limit, :queue, :should_exit, :thread, :threaded
 +
 +  def saver_thread
 +    until should_exit.true?
 +      persister = queue.pop
 +      next if persister.nil?
 +
 +      save_inventory(persister)
 +    end
 +  rescue => err
 +    _log.warn(err)
 +    _log.log_backtrace(err)
 +  end
 +
 +  def join_thread
 +    return unless thread&.alive?
 +
 +    unless thread.join(join_limit)
 +      thread.kill
 +    end
 +  end
 +
 +  def ensure_saver_thread
 +    return if thread&.alive?
 +
 +    _log.warn("Save inventory thread exited, restarting")
 +    start_thread
 +  end
 +
++=======
++>>>>>>> a37bc28b (Merge pull request #765 from agrare/add_threaded_saver_setting)
    def save_inventory(persister)
      save_inventory_start_time = Time.now.utc
      persister.persist!
diff --cc spec/models/manageiq/providers/vmware/infra_manager/refresher_spec.rb
index c5511b26,945aaaf7..00000000
--- a/spec/models/manageiq/providers/vmware/infra_manager/refresher_spec.rb
+++ b/spec/models/manageiq/providers/vmware/infra_manager/refresher_spec.rb
@@@ -16,8 -16,31 +16,36 @@@ describe ManageIQ::Providers::Vmware::I
        ems.update_authentication(:default => {:userid => username, :password => password})
      end
    end
++<<<<<<< HEAD
 +  let(:saver)     { ManageIQ::Providers::Vmware::InfraManager::Inventory::Saver.new(:threaded => false) }
 +  let(:collector) { ManageIQ::Providers::Vmware::InfraManager::Inventory::Collector.new(ems, saver) }
++=======
+   let(:collector) { ManageIQ::Providers::Vmware::InfraManager::Inventory::Collector.new(ems) }
+   let(:category) do
+     require "vsphere-automation-cis"
+     VSphereAutomation::CIS::CisTaggingCategoryModel.new(
+       :id          => "urn:vmomi:InventoryServiceCategory:aece75c1-0157-498c-b7d9-43e0532ddce8:GLOBAL",
+       :name        => "Category1",
+       :description => "Description",
+       :cardinality => "SINGLE",
+       :used_by     => []
+     )
+   end
+ 
+   let(:tag) do
+     require "vsphere-automation-cis"
+     VSphereAutomation::CIS::CisTaggingTagModel.new(
+       :id          => "urn:vmomi:InventoryServiceTag:43b0c084-4e91-4950-8cc4-c81cb46b701f:GLOBAL",
+       :category_id => "urn:vmomi:InventoryServiceCategory:aece75c1-0157-498c-b7d9-43e0532ddce8:GLOBAL",
+       :name        => "Tag1",
+       :description => "Tag Description",
+       :used_by     => []
+     )
+   end
+ 
+   let!(:env_tag_mapping)         { FactoryBot.create(:tag_mapping_with_category, :label_name => "Category1") }
+   let(:env_tag_mapping_category) { env_tag_mapping.tag.classification }
++>>>>>>> a37bc28b (Merge pull request #765 from agrare/add_threaded_saver_setting)
  
    context "#monitor_updates" do
      context "full refresh" do

@Fryguy
Copy link
Member

Fryguy commented Feb 2, 2022

@agrare Does a lasker backport also depend on #753 and #754 ?

agrare pushed a commit to agrare/manageiq-providers-vmware that referenced this pull request Feb 2, 2022
Delay WaitForUpdates and save in one thread to prevent "update storm" causing saver thread to be backed up

(cherry picked from commit a37bc28)
@agrare
Copy link
Member Author

agrare commented Feb 2, 2022

@Fryguy yes it does (thought they were already since they were in morphy), there is another conflict in a spec that I had to resolve manually but a really simple one

agrare pushed a commit to agrare/manageiq-providers-vmware that referenced this pull request Feb 2, 2022
Delay WaitForUpdates and save in one thread to prevent "update storm" causing saver thread to be backed up

(cherry picked from commit a37bc28)
@Fryguy
Copy link
Member

Fryguy commented Feb 2, 2022

Backported to lasker in #787

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants