Skip to content

Commit

Permalink
Merge branch 'master' of github.com:cantino/huginn
Browse files Browse the repository at this point in the history
  • Loading branch information
cantino committed Mar 24, 2013
2 parents 24d7af6 + 35c9b64 commit 9f0e949
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 89 deletions.
3 changes: 1 addition & 2 deletions app/controllers/agents_controller.rb
Expand Up @@ -9,8 +9,7 @@ def index
end

def run
@agent = current_user.agents.find(params[:id])
@agent.async_check
Agent.async_check(current_user.agents.find(params[:id]).id)
redirect_to agents_path, notice: "Agent run queued"
end

Expand Down
138 changes: 75 additions & 63 deletions app/models/agent.rb
Expand Up @@ -100,20 +100,6 @@ def last_event_at
@memoized_last_event_at ||= events.select(:created_at).first.try(:created_at)
end

def async_check
check
self.last_check_at = Time.now
save!
end
handle_asynchronously :async_check #, :priority => 10, :run_at => Proc.new { 5.minutes.from_now }

def async_receive(event_ids)
receive(Event.where(:id => event_ids))
self.last_receive_at = Time.now
save!
end
handle_asynchronously :async_receive #, :priority => 10, :run_at => Proc.new { 5.minutes.from_now }

def default_schedule
self.class.default_schedule
end
Expand All @@ -135,67 +121,93 @@ def can_receive_events?
end

# Class Methods
class << self
def cannot_be_scheduled!
@cannot_be_scheduled = true
end

def self.cannot_be_scheduled!
@cannot_be_scheduled = true
end

def self.cannot_be_scheduled?
!!@cannot_be_scheduled
end

def self.default_schedule(schedule = nil)
@default_schedule = schedule unless schedule.nil?
@default_schedule
end

def self.cannot_receive_events!
@cannot_receive_events = true
end
def cannot_be_scheduled?
!!@cannot_be_scheduled
end

def self.cannot_receive_events?
!!@cannot_receive_events
end
def default_schedule(schedule = nil)
@default_schedule = schedule unless schedule.nil?
@default_schedule
end

def self.receive!
sql = Agent.
select("agents.id AS receiver_agent_id, sources.id AS source_agent_id, events.id AS event_id").
joins("JOIN links ON (links.receiver_id = agents.id)").
joins("JOIN agents AS sources ON (links.source_id = sources.id)").
joins("JOIN events ON (events.agent_id = sources.id)").
where("agents.last_checked_event_id IS NULL OR events.id > agents.last_checked_event_id").to_sql
def cannot_receive_events!
@cannot_receive_events = true
end

agents_to_events = {}
Agent.connection.select_rows(sql).each do |receiver_agent_id, source_agent_id, event_id|
agents_to_events[receiver_agent_id] ||= []
agents_to_events[receiver_agent_id] << event_id
def cannot_receive_events?
!!@cannot_receive_events
end

event_ids = agents_to_events.values.flatten.uniq.compact
def receive!
sql = Agent.
select("agents.id AS receiver_agent_id, sources.id AS source_agent_id, events.id AS event_id").
joins("JOIN links ON (links.receiver_id = agents.id)").
joins("JOIN agents AS sources ON (links.source_id = sources.id)").
joins("JOIN events ON (events.agent_id = sources.id)").
where("agents.last_checked_event_id IS NULL OR events.id > agents.last_checked_event_id").to_sql

agents_to_events = {}
Agent.connection.select_rows(sql).each do |receiver_agent_id, source_agent_id, event_id|
agents_to_events[receiver_agent_id] ||= []
agents_to_events[receiver_agent_id] << event_id
end

event_ids = agents_to_events.values.flatten.uniq.compact

Agent.where(:id => agents_to_events.keys).each do |agent|
agent.update_attribute :last_checked_event_id, event_ids.max
Agent.async_receive(agent.id, agents_to_events[agent.id].uniq)
end

{
:agent_count => agents_to_events.keys.length,
:event_count => event_ids.length
}
end

Agent.where(:id => agents_to_events.keys).each do |agent|
agent.update_attribute :last_checked_event_id, event_ids.max
agent.async_receive(agents_to_events[agent.id].uniq)
# Given an Agent id and an array of Event ids, load the Agent, call #receive on it with the Event objects, and then
# save it with an updated _last_receive_at_ timestamp.
#
# This method is tagged with _handle_asynchronously_ and will be delayed and run with delayed_job. It accepts Agent
# and Event ids instead of a literal ActiveRecord models because it is preferable to serialize delayed_jobs with ids.
def async_receive(agent_id, event_ids)
agent = Agent.find(agent_id)
agent.receive(Event.where(:id => event_ids))
agent.last_receive_at = Time.now
agent.save!
end
handle_asynchronously :async_receive

{
:agent_count => agents_to_events.keys.length,
:event_count => event_ids.length
}
end
def run_schedule(schedule)
types = where(:schedule => schedule).group(:type).pluck(:type)
types.each do |type|
type.constantize.bulk_check(schedule)
end
end

def self.run_schedule(schedule)
types = where(:schedule => schedule).group(:type).pluck(:type)
types.each do |type|
type.constantize.bulk_check(schedule)
# You can override this to define a custom bulk_check for your type of Agent.
def bulk_check(schedule)
raise "Call #bulk_check on the appropriate subclass of Agent" if self == Agent
where(:schedule => schedule).pluck("agents.id").each do |agent_id|
async_check(agent_id)
end
end
end

# You can override this to define a custom bulk_check for your type of Agent.
def self.bulk_check(schedule)
raise "Call #bulk_check on the appropriate subclass of Agent" if self == Agent
where(:schedule => schedule).find_each do |agent|
agent.async_check
# Given an Agent id, load the Agent, call #check on it, and then save it with an updated _last_check_at_ timestamp.
#
# This method is tagged with _handle_asynchronously_ and will be delayed and run with delayed_job. It accepts an Agent
# id instead of a literal Agent because it is preferable to serialize delayed_jobs with ids.
def async_check(agent_id)
agent = Agent.find(agent_id)
agent.check
agent.last_check_at = Time.now
agent.save!
end
handle_asynchronously :async_check
end
end
18 changes: 12 additions & 6 deletions app/models/agents/weather_agent.rb
Expand Up @@ -44,26 +44,32 @@ def working?
end

def wunderground
Wunderground.new(options[:api_key])
Wunderground.new(options[:api_key]) if key_setup?
end

def key_setup?
options[:api_key] && options[:api_key] != "your-key"
end

def default_options
{
:api_key => "",
:api_key => "your-key",
:zipcode => "94103"
}

end

def validate_options
errors.add(:base, "zipcode is required") unless options[:zipcode].present?
errors.add(:base, "api_key is required") unless options[:api_key].present?
errors.add(:base, "api_key is required") unless key_setup?
end

def check
wunderground.forecast_for(options[:zipcode])["forecast"]["simpleforecast"]["forecastday"].each do |day|
if is_tomorrow?(day)
create_event :payload => day.merge(:zipcode => options[:zipcode])
if key_setup?
wunderground.forecast_for(options[:zipcode])["forecast"]["simpleforecast"]["forecastday"].each do |day|
if is_tomorrow?(day)
create_event :payload => day.merge(:zipcode => options[:zipcode])
end
end
end
end
Expand Down
32 changes: 18 additions & 14 deletions spec/models/agent_spec.rb
Expand Up @@ -8,9 +8,11 @@
end

it "runs agents with the given schedule" do
mock.any_instance_of(Agents::WeatherAgent).async_check.twice
mock.any_instance_of(Agents::WebsiteAgent).async_check.once
weather_agent_ids = [agents(:bob_weather_agent), agents(:jane_weather_agent)].map(&:id)
stub(Agents::WeatherAgent).async_check(anything) {|agent_id| weather_agent_ids.delete(agent_id) }
stub(Agents::WebsiteAgent).async_check(agents(:bob_website_agent).id)
Agent.run_schedule("midnight")
weather_agent_ids.should be_empty
end

it "groups agents by type" do
Expand All @@ -20,7 +22,7 @@
end

it "only runs agents with the given schedule" do
do_not_allow.any_instance_of(Agents::WebsiteAgent).async_check
do_not_allow(Agents::WebsiteAgent).async_check
Agent.run_schedule("blah")
end
end
Expand Down Expand Up @@ -116,19 +118,21 @@ class Agents::CannotBeScheduled < Agent
end
end

describe "#async_check" do
it "records last_check_at and calls check" do
describe ".async_check" do
it "records last_check_at and calls check on the given Agent" do
@checker = Agents::SomethingSource.new(:name => "something")
@checker.user = users(:bob)
@checker.save!

@checker.options[:new] = true
mock(@checker).check.once
mock(@checker).check.once {
@checker.options[:new] = true
}

@checker.last_check_at.should be_nil
@checker.async_check
@checker.last_check_at.should be_within(2).of(Time.now)
mock(Agent).find(@checker.id) { @checker }

@checker.last_check_at.should be_nil
Agents::SomethingSource.async_check(@checker.id)
@checker.reload.last_check_at.should be_within(2).of(Time.now)
@checker.reload.options[:new].should be_true # Show that we save options
end
end
Expand All @@ -141,13 +145,13 @@ class Agents::CannotBeScheduled < Agent

it "should use available events" do
mock.any_instance_of(Agents::TriggerAgent).receive(anything).once
agents(:bob_weather_agent).async_check
Agent.async_check(agents(:bob_weather_agent).id)
Agent.receive!
end

it "should track when events have been seen and not see them again" do
mock.any_instance_of(Agents::TriggerAgent).receive(anything).once
agents(:bob_weather_agent).async_check
Agent.async_check(agents(:bob_weather_agent).id)
Agent.receive!
Agent.receive!
end
Expand All @@ -161,8 +165,8 @@ class Agents::CannotBeScheduled < Agent
mock.any_instance_of(Agents::TriggerAgent).receive(anything).twice { |events|
events.map(&:user).map(&:username).uniq.length.should == 1
}
agents(:bob_weather_agent).async_check
agents(:jane_weather_agent).async_check
Agent.async_check(agents(:bob_weather_agent).id)
Agent.async_check(agents(:jane_weather_agent).id)
Agent.receive!
end
end
Expand Down
6 changes: 3 additions & 3 deletions spec/models/agents/digest_email_agent_spec.rb
Expand Up @@ -23,20 +23,20 @@ def get_message_part(mail, content_type)
event2.payload = "Something else you should know about"
event2.save!

@checker.async_receive([event1.id, event2.id])
Agents::DigestEmailAgent.async_receive(@checker.id, [event1.id, event2.id])
@checker.reload.memory[:queue].should == ["Something you should know about", "Something else you should know about"]
end
end

describe "#check" do
it "should send an email" do
@checker.async_check
Agents::DigestEmailAgent.async_check(@checker.id)
ActionMailer::Base.deliveries.should == []

@checker.memory[:queue] = ["Something you should know about", { :title => "Foo", :url => "http://google.com", :bar => 2 }, { "message" => "hi", :woah => "there" }]
@checker.save!

@checker.async_check
Agents::DigestEmailAgent.async_check(@checker.id)
ActionMailer::Base.deliveries.last.to.should == ["bob@example.com"]
ActionMailer::Base.deliveries.last.subject.should == "something interesting"
get_message_part(ActionMailer::Base.deliveries.last, /plain/).strip.should == "Something you should know about\n\nFoo (bar: 2 and url: http://google.com)\n\nhi (woah: there)"
Expand Down
2 changes: 1 addition & 1 deletion spec/models/agents/trigger_agent_spec.rb
Expand Up @@ -50,7 +50,7 @@
@event.save!

@checker.should_not be_working # no events have ever been received
@checker.async_receive([@event.id])
Agents::TriggerAgent.async_receive(@checker.id, [@event.id])
@checker.reload.should be_working # no events have ever been received
three_days_from_now = 3.days.from_now
stub(Time).now { three_days_from_now }
Expand Down

0 comments on commit 9f0e949

Please sign in to comment.