Skip to content

Commit

Permalink
Merge 39a38bb into 1c18d3b
Browse files Browse the repository at this point in the history
  • Loading branch information
knu committed Jul 12, 2023
2 parents 1c18d3b + 39a38bb commit 3428d05
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 87 deletions.
72 changes: 43 additions & 29 deletions app/controllers/agents_controller.rb
Expand Up @@ -4,7 +4,8 @@ class AgentsController < ApplicationController
include SortableTable

def index
set_table_sort sorts: %w[name created_at last_check_at last_event_at last_receive_at], default: { created_at: :desc }
set_table_sort sorts: %w[name created_at last_check_at last_event_at last_receive_at],
default: { created_at: :desc }

@agents = current_user.agents.preload(:scenarios, :controllers).reorder(table_sort).page(params[:page])

Expand All @@ -31,7 +32,7 @@ def toggle_visibility
def handle_details_post
@agent = current_user.agents.find(params[:id])
if @agent.respond_to?(:handle_details_post)
render :json => @agent.handle_details_post(params) || {}
render json: @agent.handle_details_post(params) || {}
else
@agent.error "#handle_details_post called on an instance of #{@agent.class} that does not define it."
head 500
Expand All @@ -53,16 +54,16 @@ def type_details
initialize_presenter

render json: {
can_be_scheduled: @agent.can_be_scheduled?,
default_schedule: @agent.default_schedule,
can_receive_events: @agent.can_receive_events?,
can_create_events: @agent.can_create_events?,
can_control_other_agents: @agent.can_control_other_agents?,
can_dry_run: @agent.can_dry_run?,
options: @agent.default_options,
description_html: @agent.html_description,
oauthable: render_to_string(partial: 'oauth_dropdown', locals: { agent: @agent }),
form_options: render_to_string(partial: 'options', locals: { agent: @agent })
can_be_scheduled: @agent.can_be_scheduled?,
default_schedule: @agent.default_schedule,
can_receive_events: @agent.can_receive_events?,
can_create_events: @agent.can_create_events?,
can_control_other_agents: @agent.can_control_other_agents?,
can_dry_run: @agent.can_dry_run?,
options: @agent.default_options,
description_html: @agent.html_description,
oauthable: render_to_string(partial: 'oauth_dropdown', locals: { agent: @agent }),
form_options: render_to_string(partial: 'options', locals: { agent: @agent })
}
end

Expand All @@ -71,8 +72,8 @@ def event_descriptions
agents.map(&:html_event_description).uniq.map { |desc|
"<p><strong>#{type}</strong><br />" + desc + "</p>"
}
}.flatten.join()
render :json => { :description_html => html }
}.flatten.join
render json: { description_html: html }
end

def reemit_events
Expand Down Expand Up @@ -101,7 +102,9 @@ def propagate
respond_to do |format|
if AgentPropagateJob.can_enqueue?
details = Agent.receive! # Eventually this should probably be scoped to the current_user.
format.html { redirect_back "Queued propagation calls for #{details[:event_count]} event(s) on #{details[:agent_count]} agent(s)" }
format.html {
redirect_back "Queued propagation calls for #{details[:event_count]} event(s) on #{details[:agent_count]} agent(s)"
}
format.json { head :ok }
else
format.html { redirect_back "Event propagation is already scheduled to run." }
Expand All @@ -111,8 +114,10 @@ def propagate
end

def destroy_memory
@agent = current_user.agents.find(params[:id])
@agent.update!(memory: {})
Agent.transaction do
@agent = current_user.agents.lock.find(params[:id])
@agent.update!(memory: {})
end

respond_to do |format|
format.html { redirect_back "Memory erased for '#{@agent.name}'" }
Expand All @@ -132,11 +137,12 @@ def show
def new
agents = current_user.agents

if id = params[:id]
@agent = agents.build_clone(agents.find(id))
else
@agent = agents.build
end
@agent =
if id = params[:id]
agents.build_clone(agents.find(id))
else
agents.build
end

@agent.scenario_ids = [params[:scenario_id]] if params[:scenario_id] && current_user.scenarios.find_by(id: params[:scenario_id])

Expand Down Expand Up @@ -169,10 +175,13 @@ def create
end

def update
@agent = current_user.agents.find(params[:id])
saved = Agent.transaction do
@agent = current_user.agents.lock.find(params[:id])
@agent.update(agent_params)
end

respond_to do |format|
if @agent.update(agent_params)
if saved
format.html { redirect_back "'#{@agent.name}' was successfully updated.", return: agents_path }
format.json { render json: @agent, status: :ok, location: agent_path(@agent) }
else
Expand All @@ -184,9 +193,11 @@ def update
end

def leave_scenario
@agent = current_user.agents.find(params[:id])
@scenario = current_user.scenarios.find(params[:scenario_id])
@agent.scenarios.destroy(@scenario)
Agent.transaction do
@agent = current_user.agents.lock.find(params[:id])
@scenario = current_user.scenarios.find(params[:scenario_id])
@agent.scenarios.destroy(@scenario)
end

respond_to do |format|
format.html { redirect_back "'#{@agent.name}' removed from '#{@scenario.name}'" }
Expand All @@ -195,8 +206,10 @@ def leave_scenario
end

def destroy
@agent = current_user.agents.find(params[:id])
@agent.destroy
Agent.transaction do
@agent = current_user.agents.lock.find(params[:id])
@agent.destroy
end

respond_to do |format|
format.html { redirect_back "'#{@agent.name}' deleted" }
Expand Down Expand Up @@ -250,6 +263,7 @@ def initialize_presenter
end

private

def show_only_enabled_agents?
!!cookies[:huginn_view_only_enabled_agents]
end
Expand Down
27 changes: 17 additions & 10 deletions app/jobs/agent_check_job.rb
@@ -1,15 +1,22 @@
class AgentCheckJob < ActiveJob::Base
# Given an Agent id, load the Agent, call #check on it, and then save it with an updated `last_check_at` timestamp.
def perform(agent_id)
agent = Agent.find(agent_id)
begin
return if agent.unavailable?
agent.check
agent.last_check_at = Time.now
agent.save!
rescue => e
agent.error "Exception during check. #{e.message}: #{e.backtrace.join("\n")}"
raise
error = nil

ActiveRecord::Base.transaction do
ActiveRecord::Base.transaction(requires_new: true) do
agent = Agent.lock.find_by(id: agent_id)
return if !agent || agent.unavailable?

agent.check
agent.last_check_at = Time.now
agent.save!
rescue StandardError => e
agent&.error "Exception during check. #{e.message}: #{e.backtrace.join("\n")}"
error = e
end
end

raise error if error
end
end
end
27 changes: 17 additions & 10 deletions app/jobs/agent_receive_job.rb
Expand Up @@ -2,15 +2,22 @@ class AgentReceiveJob < ActiveJob::Base
# Given an Agent id and an array of Event ids, load the Agent, call #receive on it with the Event objects, and then
# save it with an updated `last_receive_at` timestamp.
def perform(agent_id, event_ids)
agent = Agent.find(agent_id)
begin
return if agent.unavailable?
agent.receive(Event.where(:id => event_ids).order(:id))
agent.last_receive_at = Time.now
agent.save!
rescue => e
agent.error "Exception during receive. #{e.message}: #{e.backtrace.join("\n")}"
raise
error = nil

ActiveRecord::Base.transaction do
ActiveRecord::Base.transaction(requires_new: true) do
agent = Agent.lock.find_by(id: agent_id)
return if !agent || agent.unavailable?

agent.receive(Event.where(id: event_ids).order(:id))
agent.last_receive_at = Time.now
agent.save!
rescue StandardError => e
agent&.error "Exception during receive. #{e.message}: #{e.backtrace.join("\n")}"
error = e
end
end

raise error if error
end
end
end
4 changes: 2 additions & 2 deletions app/models/agent.rb
Expand Up @@ -150,7 +150,7 @@ def credential(name)
end
end

def reload
def reload(...)
@credential_cache = {}
super
end
Expand Down Expand Up @@ -424,7 +424,7 @@ def receive!(options = {})
agents_to_events[receiver_agent_id.to_i] << event_id
end

Agent.where(id: agents_to_events.keys).each do |agent|
Agent.lock.where(id: agents_to_events.keys).each do |agent|
event_ids = agents_to_events[agent.id].uniq
agent.update_attribute :last_checked_event_id, event_ids.max

Expand Down
10 changes: 6 additions & 4 deletions app/models/service.rb
Expand Up @@ -12,10 +12,12 @@ class Service < ActiveRecord::Base
scope :by_name, lambda { |dir = 'desc'| order("services.name #{dir}") }

def disable_agents(conditions = {})
agents.where.not(conditions[:where_not] || {}).each do |agent|
agent.service_id = nil
agent.disabled = true
agent.save!(validate: false)
Agent.transaction do
agents.lock.where.not(conditions[:where_not] || {}).each do |agent|
agent.service_id = nil
agent.disabled = true
agent.save!(validate: false)
end
end
end

Expand Down
14 changes: 8 additions & 6 deletions lib/huginn_scheduler.rb
Expand Up @@ -49,6 +49,7 @@ def schedule_scheduler_agent(agent)
else
if job
return job if agent.memory['scheduled_at'] == job.scheduled_at.to_i

puts "Rescheduling SchedulerAgent##{agent.id}"
job.unschedule
else
Expand Down Expand Up @@ -82,7 +83,9 @@ def schedule_scheduler_agent(agent)
# orphaned jobs if any.
def schedule_scheduler_agents
scheduled_jobs = Agent.of_type(Agents::SchedulerAgent).map { |scheduler_agent|
schedule_scheduler_agent(scheduler_agent)
scheduler_agent.with_lock do
schedule_scheduler_agent(scheduler_agent)
end
}.compact

(scheduler_agent_jobs - scheduled_jobs).each { |job|
Expand Down Expand Up @@ -131,7 +134,7 @@ def setup

# Schedule repeating events.
SCHEDULE_TO_CRON.keys.each do |schedule|
cron "#{SCHEDULE_TO_CRON[schedule]} #{tzinfo_friendly_timezone}" do
cron "#{SCHEDULE_TO_CRON[schedule]} #{tzinfo_friendly_timezone}" do
run_schedule "every_#{schedule}"
end
end
Expand Down Expand Up @@ -170,6 +173,7 @@ def run_schedule(time)
def propagate!
with_mutex do
return unless AgentPropagateJob.can_enqueue?

puts "Queuing event propagation"
AgentPropagateJob.perform_later
end
Expand Down Expand Up @@ -200,11 +204,9 @@ def hour_to_schedule_name(hour)
end
end

def with_mutex
def with_mutex(&block)
mutex.synchronize do
ActiveRecord::Base.connection_pool.with_connection do
yield
end
ActiveRecord::Base.connection_pool.with_connection(&block)
end
end
end

0 comments on commit 3428d05

Please sign in to comment.