diff --git a/app/controllers/agents_controller.rb b/app/controllers/agents_controller.rb index 686a64c306..991ec29ff3 100644 --- a/app/controllers/agents_controller.rb +++ b/app/controllers/agents_controller.rb @@ -4,7 +4,8 @@ class AgentsController < ApplicationController include SortableTable def index - set_table_sort sorts: %w[name created_at last_check_at last_event_at last_receive_at], default: { created_at: :desc } + set_table_sort sorts: %w[name created_at last_check_at last_event_at last_receive_at], + default: { created_at: :desc } @agents = current_user.agents.preload(:scenarios, :controllers).reorder(table_sort).page(params[:page]) @@ -31,7 +32,7 @@ def toggle_visibility def handle_details_post @agent = current_user.agents.find(params[:id]) if @agent.respond_to?(:handle_details_post) - render :json => @agent.handle_details_post(params) || {} + render json: @agent.handle_details_post(params) || {} else @agent.error "#handle_details_post called on an instance of #{@agent.class} that does not define it." head 500 @@ -53,16 +54,16 @@ def type_details initialize_presenter render json: { - can_be_scheduled: @agent.can_be_scheduled?, - default_schedule: @agent.default_schedule, - can_receive_events: @agent.can_receive_events?, - can_create_events: @agent.can_create_events?, - can_control_other_agents: @agent.can_control_other_agents?, - can_dry_run: @agent.can_dry_run?, - options: @agent.default_options, - description_html: @agent.html_description, - oauthable: render_to_string(partial: 'oauth_dropdown', locals: { agent: @agent }), - form_options: render_to_string(partial: 'options', locals: { agent: @agent }) + can_be_scheduled: @agent.can_be_scheduled?, + default_schedule: @agent.default_schedule, + can_receive_events: @agent.can_receive_events?, + can_create_events: @agent.can_create_events?, + can_control_other_agents: @agent.can_control_other_agents?, + can_dry_run: @agent.can_dry_run?, + options: @agent.default_options, + description_html: @agent.html_description, + oauthable: render_to_string(partial: 'oauth_dropdown', locals: { agent: @agent }), + form_options: render_to_string(partial: 'options', locals: { agent: @agent }) } end @@ -71,8 +72,8 @@ def event_descriptions agents.map(&:html_event_description).uniq.map { |desc| "

#{type}
" + desc + "

" } - }.flatten.join() - render :json => { :description_html => html } + }.flatten.join + render json: { description_html: html } end def reemit_events @@ -101,7 +102,9 @@ def propagate respond_to do |format| if AgentPropagateJob.can_enqueue? details = Agent.receive! # Eventually this should probably be scoped to the current_user. - format.html { redirect_back "Queued propagation calls for #{details[:event_count]} event(s) on #{details[:agent_count]} agent(s)" } + format.html { + redirect_back "Queued propagation calls for #{details[:event_count]} event(s) on #{details[:agent_count]} agent(s)" + } format.json { head :ok } else format.html { redirect_back "Event propagation is already scheduled to run." } @@ -111,8 +114,10 @@ def propagate end def destroy_memory - @agent = current_user.agents.find(params[:id]) - @agent.update!(memory: {}) + Agent.transaction do + @agent = current_user.agents.lock.find(params[:id]) + @agent.update!(memory: {}) + end respond_to do |format| format.html { redirect_back "Memory erased for '#{@agent.name}'" } @@ -132,11 +137,12 @@ def show def new agents = current_user.agents - if id = params[:id] - @agent = agents.build_clone(agents.find(id)) - else - @agent = agents.build - end + @agent = + if id = params[:id] + agents.build_clone(agents.find(id)) + else + agents.build + end @agent.scenario_ids = [params[:scenario_id]] if params[:scenario_id] && current_user.scenarios.find_by(id: params[:scenario_id]) @@ -169,10 +175,13 @@ def create end def update - @agent = current_user.agents.find(params[:id]) + saved = Agent.transaction do + @agent = current_user.agents.lock.find(params[:id]) + @agent.update(agent_params) + end respond_to do |format| - if @agent.update(agent_params) + if saved format.html { redirect_back "'#{@agent.name}' was successfully updated.", return: agents_path } format.json { render json: @agent, status: :ok, location: agent_path(@agent) } else @@ -184,9 +193,11 @@ def update end def leave_scenario - @agent = current_user.agents.find(params[:id]) - @scenario = current_user.scenarios.find(params[:scenario_id]) - @agent.scenarios.destroy(@scenario) + Agent.transaction do + @agent = current_user.agents.lock.find(params[:id]) + @scenario = current_user.scenarios.find(params[:scenario_id]) + @agent.scenarios.destroy(@scenario) + end respond_to do |format| format.html { redirect_back "'#{@agent.name}' removed from '#{@scenario.name}'" } @@ -195,8 +206,10 @@ def leave_scenario end def destroy - @agent = current_user.agents.find(params[:id]) - @agent.destroy + Agent.transaction do + @agent = current_user.agents.lock.find(params[:id]) + @agent.destroy + end respond_to do |format| format.html { redirect_back "'#{@agent.name}' deleted" } @@ -250,6 +263,7 @@ def initialize_presenter end private + def show_only_enabled_agents? !!cookies[:huginn_view_only_enabled_agents] end diff --git a/app/jobs/agent_check_job.rb b/app/jobs/agent_check_job.rb index 0603bdd84e..ae9337d740 100644 --- a/app/jobs/agent_check_job.rb +++ b/app/jobs/agent_check_job.rb @@ -1,15 +1,22 @@ class AgentCheckJob < ActiveJob::Base # Given an Agent id, load the Agent, call #check on it, and then save it with an updated `last_check_at` timestamp. def perform(agent_id) - agent = Agent.find(agent_id) - begin - return if agent.unavailable? - agent.check - agent.last_check_at = Time.now - agent.save! - rescue => e - agent.error "Exception during check. #{e.message}: #{e.backtrace.join("\n")}" - raise + error = nil + + ActiveRecord::Base.transaction do + ActiveRecord::Base.transaction(requires_new: true) do + agent = Agent.lock.find_by(id: agent_id) + return if !agent || agent.unavailable? + + agent.check + agent.last_check_at = Time.now + agent.save! + rescue StandardError => e + agent&.error "Exception during check. #{e.message}: #{e.backtrace.join("\n")}" + error = e + end end + + raise error if error end -end \ No newline at end of file +end diff --git a/app/jobs/agent_receive_job.rb b/app/jobs/agent_receive_job.rb index 385c3b8c76..20088a5c65 100644 --- a/app/jobs/agent_receive_job.rb +++ b/app/jobs/agent_receive_job.rb @@ -2,15 +2,22 @@ class AgentReceiveJob < ActiveJob::Base # Given an Agent id and an array of Event ids, load the Agent, call #receive on it with the Event objects, and then # save it with an updated `last_receive_at` timestamp. def perform(agent_id, event_ids) - agent = Agent.find(agent_id) - begin - return if agent.unavailable? - agent.receive(Event.where(:id => event_ids).order(:id)) - agent.last_receive_at = Time.now - agent.save! - rescue => e - agent.error "Exception during receive. #{e.message}: #{e.backtrace.join("\n")}" - raise + error = nil + + ActiveRecord::Base.transaction do + ActiveRecord::Base.transaction(requires_new: true) do + agent = Agent.lock.find_by(id: agent_id) + return if !agent || agent.unavailable? + + agent.receive(Event.where(id: event_ids).order(:id)) + agent.last_receive_at = Time.now + agent.save! + rescue StandardError => e + agent&.error "Exception during receive. #{e.message}: #{e.backtrace.join("\n")}" + error = e + end end + + raise error if error end -end \ No newline at end of file +end diff --git a/app/models/agent.rb b/app/models/agent.rb index d39062fe3a..d923b8c66a 100644 --- a/app/models/agent.rb +++ b/app/models/agent.rb @@ -150,7 +150,7 @@ def credential(name) end end - def reload + def reload(...) @credential_cache = {} super end @@ -424,7 +424,7 @@ def receive!(options = {}) agents_to_events[receiver_agent_id.to_i] << event_id end - Agent.where(id: agents_to_events.keys).each do |agent| + Agent.lock.where(id: agents_to_events.keys).each do |agent| event_ids = agents_to_events[agent.id].uniq agent.update_attribute :last_checked_event_id, event_ids.max diff --git a/app/models/service.rb b/app/models/service.rb index 48bfb8b83a..b65f0fe9bc 100644 --- a/app/models/service.rb +++ b/app/models/service.rb @@ -12,10 +12,12 @@ class Service < ActiveRecord::Base scope :by_name, lambda { |dir = 'desc'| order("services.name #{dir}") } def disable_agents(conditions = {}) - agents.where.not(conditions[:where_not] || {}).each do |agent| - agent.service_id = nil - agent.disabled = true - agent.save!(validate: false) + Agent.transaction do + agents.lock.where.not(conditions[:where_not] || {}).each do |agent| + agent.service_id = nil + agent.disabled = true + agent.save!(validate: false) + end end end diff --git a/lib/huginn_scheduler.rb b/lib/huginn_scheduler.rb index 33e533e548..dc24a2dbce 100644 --- a/lib/huginn_scheduler.rb +++ b/lib/huginn_scheduler.rb @@ -49,6 +49,7 @@ def schedule_scheduler_agent(agent) else if job return job if agent.memory['scheduled_at'] == job.scheduled_at.to_i + puts "Rescheduling SchedulerAgent##{agent.id}" job.unschedule else @@ -82,7 +83,9 @@ def schedule_scheduler_agent(agent) # orphaned jobs if any. def schedule_scheduler_agents scheduled_jobs = Agent.of_type(Agents::SchedulerAgent).map { |scheduler_agent| - schedule_scheduler_agent(scheduler_agent) + scheduler_agent.with_lock do + schedule_scheduler_agent(scheduler_agent) + end }.compact (scheduler_agent_jobs - scheduled_jobs).each { |job| @@ -131,7 +134,7 @@ def setup # Schedule repeating events. SCHEDULE_TO_CRON.keys.each do |schedule| - cron "#{SCHEDULE_TO_CRON[schedule]} #{tzinfo_friendly_timezone}" do + cron "#{SCHEDULE_TO_CRON[schedule]} #{tzinfo_friendly_timezone}" do run_schedule "every_#{schedule}" end end @@ -170,6 +173,7 @@ def run_schedule(time) def propagate! with_mutex do return unless AgentPropagateJob.can_enqueue? + puts "Queuing event propagation" AgentPropagateJob.perform_later end @@ -200,11 +204,9 @@ def hour_to_schedule_name(hour) end end - def with_mutex + def with_mutex(&block) mutex.synchronize do - ActiveRecord::Base.connection_pool.with_connection do - yield - end + ActiveRecord::Base.connection_pool.with_connection(&block) end end end diff --git a/spec/models/agent_spec.rb b/spec/models/agent_spec.rb index de8756769b..3747be2855 100644 --- a/spec/models/agent_spec.rb +++ b/spec/models/agent_spec.rb @@ -108,10 +108,10 @@ count = 0 allow_any_instance_of(UserCredential).to receive(:credential_value) { count += 1 }.and_return("foo") expect { expect(agent.credential("aws_secret")).to eq("foo") }.to change { count }.by(1) - expect { expect(agent.credential("aws_secret")).to eq("foo") }.not_to change { count } + expect { expect(agent.credential("aws_secret")).to eq("foo") }.not_to(change { count }) agent.reload expect { expect(agent.credential("aws_secret")).to eq("foo") }.to change { count }.by(1) - expect { expect(agent.credential("aws_secret")).to eq("foo") }.not_to change { count } + expect { expect(agent.credential("aws_secret")).to eq("foo") }.not_to(change { count }) end end @@ -235,7 +235,7 @@ def receive(events) expect(@checker).to receive(:can_create_events?) { false } expect { @checker.check - }.not_to change { Event.count } + }.not_to(change { Event.count }) expect(@checker.logs.first.message).to match(/cannot create events/i) end end @@ -248,11 +248,10 @@ def receive(events) end it "records last_check_at and calls check on the given Agent" do - expect(@checker).to receive(:check).once { - @checker.options[:new] = true - } - - allow(Agent).to receive(:find).with(@checker.id) { @checker } + expect_any_instance_of(Agents::SomethingSource).to(receive(:check).once { |agent| + expect(agent.id).to eq @checker.id + agent.options[:new] = true + }) expect(@checker.last_check_at).to be_nil Agents::SomethingSource.async_check(@checker.id) @@ -261,10 +260,11 @@ def receive(events) end it "should log exceptions" do - expect(@checker).to receive(:check).once { + expect_any_instance_of(Agents::SomethingSource).to(receive(:check).once { |agent| + expect(agent.id).to eq @checker.id raise "foo" - } - expect(Agent).to receive(:find).with(@checker.id) { @checker } + }) + expect { Agents::SomethingSource.async_check(@checker.id) }.to raise_error(RuntimeError) @@ -274,8 +274,7 @@ def receive(events) end it "should not run disabled Agents" do - expect(Agent).to receive(:find).with(agents(:bob_weather_agent).id) { agents(:bob_weather_agent) } - expect(agents(:bob_weather_agent)).not_to receive(:check) + expect_any_instance_of(agents(:bob_weather_agent).class).not_to receive(:check) agents(:bob_weather_agent).update_attribute :disabled, true Agent.async_check(agents(:bob_weather_agent).id) end @@ -346,11 +345,11 @@ def receive(events) Agent.async_check(agents(:bob_weather_agent).id) expect { Agent.receive! - }.to change { agents(:bob_rain_notifier_agent).reload.last_checked_event_id } + }.to(change { agents(:bob_rain_notifier_agent).reload.last_checked_event_id }) expect { Agent.receive! - }.not_to change { agents(:bob_rain_notifier_agent).reload.last_checked_event_id } + }.not_to(change { agents(:bob_rain_notifier_agent).reload.last_checked_event_id }) expect(count).to eq 1 end @@ -393,7 +392,7 @@ def receive(events) expect { Agent.receive! # event gets propagated - }.to change { agents(:bob_rain_notifier_agent).reload.last_checked_event_id } + }.to(change { agents(:bob_rain_notifier_agent).reload.last_checked_event_id }) # This agent creates a few events before we link to it, but after our last check. agent2.check @@ -405,14 +404,14 @@ def receive(events) expect { Agent.receive! # but we don't receive those events because they're too old - }.not_to change { agents(:bob_rain_notifier_agent).reload.last_checked_event_id } + }.not_to(change { agents(:bob_rain_notifier_agent).reload.last_checked_event_id }) # Now a new event is created by agent2 agent2.check expect { Agent.receive! # and we receive it - }.to change { agents(:bob_rain_notifier_agent).reload.last_checked_event_id } + }.to(change { agents(:bob_rain_notifier_agent).reload.last_checked_event_id }) expect(count).to eq 2 end @@ -426,11 +425,12 @@ def receive(events) end describe ".async_receive" do + let(:agent) { agents(:bob_rain_notifier_agent) } + it "should not run disabled Agents" do - expect(Agent).to receive(:find).with(agents(:bob_rain_notifier_agent).id) { agents(:bob_rain_notifier_agent) } - expect(agents(:bob_rain_notifier_agent)).not_to receive(:receive) - agents(:bob_rain_notifier_agent).update_attribute :disabled, true - Agent.async_receive(agents(:bob_rain_notifier_agent).id, [1, 2, 3]) + agent.update_attribute :disabled, true + expect_any_instance_of(agent.class).not_to receive(:receive) + Agent.async_receive(agent.id, [1, 2, 3]) end end @@ -699,7 +699,7 @@ def receive(events) @agent.options[:foo] = "bar1" @agent.keep_events_for = 3.days @agent.save! - }.to change { @event.reload.expires_at } + }.to(change { @event.reload.expires_at }) expect(@event.expires_at.to_i).to be_within(2).of(3.days.from_now.to_i) end end @@ -712,7 +712,7 @@ def receive(events) @agent.options[:foo] = "bar2" @agent.keep_events_for = 3.days @agent.save! - }.to change { @event.reload.expires_at } + }.to(change { @event.reload.expires_at }) expect(@event.expires_at.to_i).to be_within(60 * 61).of(1.days.from_now.to_i) # The larger time is to deal with daylight savings end @@ -954,7 +954,7 @@ def @agent.receive_webhook(params) Agent.async_check(agent1.id) Agent.receive! }.to change { agent1.events.count }.by(1) - }.not_to change { agent2.events.count } + }.not_to(change { agent2.events.count }) agent2.disabled = false agent2.drop_pending_events = true @@ -962,7 +962,7 @@ def @agent.receive_webhook(params) expect { Agent.receive! - }.not_to change { agent2.events.count } + }.not_to(change { agent2.events.count }) end end end