Skip to content

Commit

Permalink
Merge pull request #167 from qedi-r/prop_immediately
Browse files Browse the repository at this point in the history
Make agents propagate events immediately
  • Loading branch information
cantino committed Mar 4, 2014
2 parents d97d8c8 + 163f09e commit 882fe52
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 29 deletions.
2 changes: 2 additions & 0 deletions app/assets/javascripts/application.js.coffee.erb
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ showSchedule = ->

hideLinks = ->
$(".link-region .select2-container").hide()
$(".link-region .propagate-immediately").hide()
$(".link-region .cannot-receive-events").show()

showLinks = ->
$(".link-region .select2-container").show()
$(".link-region .propagate-immediately").show()
$(".link-region .cannot-receive-events").hide()
showEventDescriptions()

Expand Down
18 changes: 13 additions & 5 deletions app/models/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class Agent < ActiveRecord::Base

EVENT_RETENTION_SCHEDULES = [["Forever", 0], ["1 day", 1], *([2, 3, 4, 5, 7, 14, 21, 30, 45, 90, 180, 365].map {|n| ["#{n} days", n] })]

attr_accessible :options, :memory, :name, :type, :schedule, :source_ids, :keep_events_for
attr_accessible :options, :memory, :name, :type, :schedule, :source_ids, :keep_events_for, :propagate_immediately

json_serialize :options, :memory

Expand Down Expand Up @@ -96,7 +96,10 @@ def recent_error_logs?

def create_event(attrs)
if can_create_events?
events.create!({ :user => user, :expires_at => new_event_expiration_date }.merge(attrs))
events.create!({
:user => user,
:expires_at => new_event_expiration_date
}.merge(attrs))
else
error "This Agent cannot create events!"
end
Expand Down Expand Up @@ -246,14 +249,19 @@ def cannot_receive_events?
# Find all Agents that have received Events since the last execution of this method. Update those Agents with
# their new `last_checked_event_id` and queue each of the Agents to be called with #receive using `async_receive`.
# This is called by bin/schedule.rb periodically.
def receive!
def receive!(options={})
Agent.transaction do
sql = Agent.
scope = Agent.
select("agents.id AS receiver_agent_id, sources.id AS source_agent_id, events.id AS event_id").
joins("JOIN links ON (links.receiver_id = agents.id)").
joins("JOIN agents AS sources ON (links.source_id = sources.id)").
joins("JOIN events ON (events.agent_id = sources.id AND events.id > links.event_id_at_creation)").
where("agents.last_checked_event_id IS NULL OR events.id > agents.last_checked_event_id").to_sql
where("agents.last_checked_event_id IS NULL OR events.id > agents.last_checked_event_id")
if options[:only_receivers].present?
scope = scope.where("agents.id in (?)", options[:only_receivers])
end

sql = scope.to_sql()

agents_to_events = {}
Agent.connection.select_rows(sql).each do |receiver_agent_id, source_agent_id, event_id|
Expand Down
9 changes: 9 additions & 0 deletions app/models/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ class Event < ActiveRecord::Base
where("events.created_at > ?", timespan)
}

after_create :possibly_propagate

# Emit this event again, as a new Event.
def reemit!
agent.create_event :payload => payload, :lat => lat, :lng => lng
Expand All @@ -31,4 +33,11 @@ def self.cleanup_expired!
Event.where("expires_at IS NOT NULL AND expires_at < ?", Time.now).delete_all
Agent.where(:id => affected_agents).update_all "events_count = (select count(*) from events where agent_id = agents.id)"
end

protected
def possibly_propagate
#immediately schedule agents that want immediate updates
propagate_ids = agent.receivers.where(:propagate_immediately => true).pluck(:id)
Agent.receive!(:only_receivers => propagate_ids) unless propagate_ids.empty?
end
end
5 changes: 5 additions & 0 deletions app/views/agents/_form.html.erb
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@
@agent.source_ids),
{}, { :multiple => true, :size => 5, :class => 'span4 select2' }) %>
<span class='cannot-receive-events text-info'>This type of Agent cannot receive events.</span>
<span class="propagate-immediately"><br>
<%= f.label :propagate_immediately, :class => 'control-label' do %>Propagate immediately
<%= f.check_box :propagate_immediately %>
<% end %>
</span>
</div>
</div>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
class AddPropagateImmediatelyToAgent < ActiveRecord::Migration
def up
add_column :agents, :propagate_immediately, :boolean, :default => false, :null => false
end

def down
remove_column :agents, :propagate_immediately
end
end
41 changes: 17 additions & 24 deletions db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,54 +11,47 @@
#
# It's strongly recommended to check this file into your version control system.

ActiveRecord::Schema.define(:version => 20140213053001) do
ActiveRecord::Schema.define(:version => 20140216201250) do

create_table "agent_logs", :force => true do |t|
t.integer "agent_id", :null => false
t.text "message", :limit => 16777215, :null => false
t.integer "level", :default => 3, :null => false
t.integer "agent_id", :null => false
t.text "message", :null => false
t.integer "level", :default => 3, :null => false
t.integer "inbound_event_id"
t.integer "outbound_event_id"
t.datetime "created_at", :null => false
t.datetime "updated_at", :null => false
t.datetime "created_at", :null => false
t.datetime "updated_at", :null => false
end

create_table "agents", :force => true do |t|
t.integer "user_id"
t.text "options", :limit => 16777215
t.text "options"
t.string "type"
t.string "name"
t.string "schedule"
t.integer "events_count"
t.datetime "last_check_at"
t.datetime "last_receive_at"
t.integer "last_checked_event_id"
t.datetime "created_at", :null => false
t.datetime "updated_at", :null => false
t.datetime "created_at", :null => false
t.datetime "updated_at", :null => false
t.text "memory", :limit => 2147483647
t.datetime "last_webhook_at"
t.integer "keep_events_for", :default => 0, :null => false
t.datetime "last_event_at"
t.datetime "last_error_log_at"
t.integer "keep_events_for", :default => 0, :null => false
t.boolean "propagate_immediately", :default => false, :null => false
end

add_index "agents", ["schedule"], :name => "index_agents_on_schedule"
add_index "agents", ["type"], :name => "index_agents_on_type"
add_index "agents", ["user_id", "created_at"], :name => "index_agents_on_user_id_and_created_at"

create_table "contacts", :force => true do |t|
t.text "message"
t.string "name"
t.string "email"
t.datetime "created_at", :null => false
t.datetime "updated_at", :null => false
end

create_table "delayed_jobs", :force => true do |t|
t.integer "priority", :default => 0
t.integer "attempts", :default => 0
t.text "handler", :limit => 16777215
t.text "last_error", :limit => 16777215
t.text "last_error"
t.datetime "run_at"
t.datetime "locked_at"
t.datetime "failed_at"
Expand All @@ -73,11 +66,11 @@
create_table "events", :force => true do |t|
t.integer "user_id"
t.integer "agent_id"
t.decimal "lat", :precision => 15, :scale => 10
t.decimal "lng", :precision => 15, :scale => 10
t.text "payload", :limit => 2147483647
t.datetime "created_at", :null => false
t.datetime "updated_at", :null => false
t.decimal "lat", :precision => 15, :scale => 10
t.decimal "lng", :precision => 15, :scale => 10
t.text "payload", :limit => 16777215
t.datetime "created_at", :null => false
t.datetime "updated_at", :null => false
t.datetime "expires_at"
end

Expand Down
55 changes: 55 additions & 0 deletions spec/models/agent_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,61 @@ def receive(events)
end
end

describe "creating agents with propagate_immediately = true" do
it "should schedule subagent events immediately" do
Event.delete_all
sender = Agents::SomethingSource.new(:name => "Sending Agent")
sender.user = users(:bob)
sender.save!

receiver = Agents::CannotBeScheduled.new(
:name => "Receiving Agent",
)
receiver.propagate_immediately = true
receiver.user = users(:bob)
receiver.sources << sender
receiver.save!

sender.create_event :payload => {"message" => "new payload"}
sender.events.count.should == 1
receiver.events.count.should == 1
#should be true without calling Agent.receive!
end

it "should only schedule receiving agents that are set to propagate_immediately" do
Event.delete_all
sender = Agents::SomethingSource.new(:name => "Sending Agent")
sender.user = users(:bob)
sender.save!

im_receiver = Agents::CannotBeScheduled.new(
:name => "Immediate Receiving Agent",
)
im_receiver.propagate_immediately = true
im_receiver.user = users(:bob)
im_receiver.sources << sender

im_receiver.save!
slow_receiver = Agents::CannotBeScheduled.new(
:name => "Slow Receiving Agent",
)
slow_receiver.user = users(:bob)
slow_receiver.sources << sender
slow_receiver.save!

sender.create_event :payload => {"message" => "new payload"}
sender.events.count.should == 1
im_receiver.events.count.should == 1
#we should get the quick one
#but not the slow one
slow_receiver.events.count.should == 0
Agent.receive!
#now we should have one in both
im_receiver.events.count.should == 1
slow_receiver.events.count.should == 1
end
end

describe "validations" do
it "calls validate_options" do
agent = Agents::SomethingSource.new(:name => "something")
Expand Down

0 comments on commit 882fe52

Please sign in to comment.