Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More time sensitive Agents #157

Closed
qedi-r opened this issue Feb 2, 2014 · 6 comments
Closed

More time sensitive Agents #157

qedi-r opened this issue Feb 2, 2014 · 6 comments

Comments

@qedi-r
Copy link

qedi-r commented Feb 2, 2014

I'm currently writing some agents to work with my home automation infrastructure and have a working implementation for a Phillips Hue agent that I will probably add a pull request for soon. In this kind of environment, though, the 30s-1m delay between agents as they pass events from one to another is much too long. Currently, I've modified Huginn's event code to immediately kick off linked agents as follows:

diff --git a/app/models/event.rb b/app/models/event.rb
index efa56cc..3c678ac 100644
--- a/app/models/event.rb
+++ b/app/models/event.rb
@@ -19,6 +19,20 @@ class Event < ActiveRecord::Base
     where("events.created_at > ?", timespan)
   }

+  def create
+     #create the database event
+     super
+     #and immediately schedule agents that will otherwise wait
+     #for the next tick
+     if agent.links_as_source
+        agent.links_as_source.each do |a| 
+           if a.receiver.cannot_be_scheduled?
+              a.receiver.receive([self]) #as an Array because receive expects multiple
+           end
+        end
+     end
+  end
+
   # Emit this event again, as a new Event.
   def reemit!
     agent.create_event :payload => payload, :lat => lat, :lng => lng

But I would probably also need to add some sort of locking to make sure we don't accidentally schedule the immediate event twice, once with this code and once with the scheduler.

What I don't know, being new to the codebase is if there are any unknown problems to this approach?

@cantino
Copy link
Member

cantino commented Feb 2, 2014

Hey @qedi-r, I'm glad you're building on Huginn!

I've been thinking about how to make Huginn closer to real-time for a while. I think it would make sense to allow Agents to cause immediate propagation when desired. This would enable many interesting new uses.

I'm happy to advise you on how to build it this. We should definitely write good RSpecs to demonstrate how it works.

Maybe you should update create_event in agent.rb with a new :propagate_immediately:

def create_event(attrs)
  if can_create_events?
    events.create!({ :user => user, :expires_at => new_event_expiration_date, :propagate_immediately => propagate_immediately }.merge(attrs))
  else
    error "This Agent cannot create events!"
  end
end

and add propagate_immediately as a boolean attribute to Agents with a database migration. I'd suggest making it :null => false, :default => false. You could then add it to the Agent editing UI and show page, so anyone can set their agents to propagate events immediately or not.

Once that's done, it should be easy to modify Event similar to how you suggest:

attr_accessor :propagate_immediately # This is used to determine if a created event should propagate immediately or not.

attr_accessible :lat, :lng, :payload, :user_id, :user, :expires_at, :propagate_immediately

after_create :possibly_propagate

protected

def possibly_propagate
  if propagate_immediately
    Agent.receive!(:only_receivers => agent.receivers.pluck(:id))
    self.propagate_immediately = false # may not be needed
  end
end

Finally, change Agent#receive! to accept that optional argument:

def receive!(options = {})
  Agent.transaction do
    scope = Agent.
            select("agents.id AS receiver_agent_id, sources.id AS source_agent_id, events.id AS event_id").
            joins("JOIN links ON (links.receiver_id = agents.id)").
            joins("JOIN agents AS sources ON (links.source_id = sources.id)").
            joins("JOIN events ON (events.agent_id = sources.id)").
            where("agents.last_checked_event_id IS NULL OR events.id > agents.last_checked_event_id")

    scope.where("receiver_agent_id IN (?)", options[:only_receivers]) if options[:only_receivers]

    sql = scope.to_sql

    agents_to_events = {}
    Agent.connection.select_rows(sql).each do |receiver_agent_id, source_agent_id, event_id|
      agents_to_events[receiver_agent_id] ||= []
      agents_to_events[receiver_agent_id] << event_id
    end

    ...

The primary change is the scope.where("receiver_agent_id IN (?)", options[:only_receivers]) if ... line.

I think this will work transactionally, causing full propagation only for downstream agents of the one creating the event. I haven't tested this, though.

The reason we need full propagation for those Agents is so that we don't accidentally bump their last_checked_event_id values to our new Event's value and skip intermediate events that they haven't received yet. Does my logic seem sound?

@qedi-r
Copy link
Author

qedi-r commented Feb 3, 2014

If I understand what you've written in event.rb in your comment, it sounds like you'd prefer propagate_immediately to be set on the agent that generates the event, but I think it should be set for agents that receive the event.

That way, if you have Agent A and receiving agents X, which has propagate_immediately set and Y, which is something that has it's own schedule, X would run immediately after A, but Y should wait around.

If A has to set propagate_immediately, we might end up breaking user's schedules.

@cantino
Copy link
Member

cantino commented Feb 4, 2014

Well, propagating immediately shouldn't break anyone's schedules because propagation happens every 1 minute anyway.

I agree that it conceptually makes a bit more sense to be set on the receiver. That could be accomplished in Event like this:

after_create :possibly_propagate

protected

def possibly_propagate
  propagate_immediately_agent_ids = agent.receivers.where(:possibly_propagate => true).pluck(:id)
  if propagate_immediately_agent_ids.length > 0
    Agent.receive!(:only_receivers => propagate_immediately_agent_ids)
  end
end

I'm trying to figure out if there are any race conditions that can occur by having receive! be called very frequently.

@slef
Copy link

slef commented Feb 13, 2014

I see one problem with this in case there are loops in the system (which might not be detectable in the graph). In that case if Agent A produces event E sent to agents B and C, and B triggers back A, you would run in a loop A-B-A-B and starve C.
Maybe a better solution would be to run all "instant" events through a Queue and process them in order. What do you think?

@cantino
Copy link
Member

cantino commented Feb 13, 2014

Good point. Agent.receive! is handled asynchronously by delayed_job, so I think we'll have the queue semantics.

@cantino
Copy link
Member

cantino commented Feb 17, 2014

Is this working for you okay?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants