diff --git a/app/controllers/agents_controller.rb b/app/controllers/agents_controller.rb index 0aa1c44995..e51414b192 100644 --- a/app/controllers/agents_controller.rb +++ b/app/controllers/agents_controller.rb @@ -9,8 +9,7 @@ def index end def run - @agent = current_user.agents.find(params[:id]) - @agent.async_check + Agent.async_check(current_user.agents.find(params[:id]).id) redirect_to agents_path, notice: "Agent run queued" end diff --git a/app/models/agent.rb b/app/models/agent.rb index abbd46c6f3..c5e7570ab1 100644 --- a/app/models/agent.rb +++ b/app/models/agent.rb @@ -100,20 +100,6 @@ def last_event_at @memoized_last_event_at ||= events.select(:created_at).first.try(:created_at) end - def async_check - check - self.last_check_at = Time.now - save! - end - handle_asynchronously :async_check #, :priority => 10, :run_at => Proc.new { 5.minutes.from_now } - - def async_receive(event_ids) - receive(Event.where(:id => event_ids)) - self.last_receive_at = Time.now - save! - end - handle_asynchronously :async_receive #, :priority => 10, :run_at => Proc.new { 5.minutes.from_now } - def default_schedule self.class.default_schedule end @@ -135,67 +121,93 @@ def can_receive_events? end # Class Methods + class << self + def cannot_be_scheduled! + @cannot_be_scheduled = true + end - def self.cannot_be_scheduled! - @cannot_be_scheduled = true - end - - def self.cannot_be_scheduled? - !!@cannot_be_scheduled - end - - def self.default_schedule(schedule = nil) - @default_schedule = schedule unless schedule.nil? - @default_schedule - end - - def self.cannot_receive_events! - @cannot_receive_events = true - end + def cannot_be_scheduled? + !!@cannot_be_scheduled + end - def self.cannot_receive_events? - !!@cannot_receive_events - end + def default_schedule(schedule = nil) + @default_schedule = schedule unless schedule.nil? + @default_schedule + end - def self.receive! - sql = Agent. - select("agents.id AS receiver_agent_id, sources.id AS source_agent_id, events.id AS event_id"). - joins("JOIN links ON (links.receiver_id = agents.id)"). - joins("JOIN agents AS sources ON (links.source_id = sources.id)"). - joins("JOIN events ON (events.agent_id = sources.id)"). - where("agents.last_checked_event_id IS NULL OR events.id > agents.last_checked_event_id").to_sql + def cannot_receive_events! + @cannot_receive_events = true + end - agents_to_events = {} - Agent.connection.select_rows(sql).each do |receiver_agent_id, source_agent_id, event_id| - agents_to_events[receiver_agent_id] ||= [] - agents_to_events[receiver_agent_id] << event_id + def cannot_receive_events? + !!@cannot_receive_events end - event_ids = agents_to_events.values.flatten.uniq.compact + def receive! + sql = Agent. + select("agents.id AS receiver_agent_id, sources.id AS source_agent_id, events.id AS event_id"). + joins("JOIN links ON (links.receiver_id = agents.id)"). + joins("JOIN agents AS sources ON (links.source_id = sources.id)"). + joins("JOIN events ON (events.agent_id = sources.id)"). + where("agents.last_checked_event_id IS NULL OR events.id > agents.last_checked_event_id").to_sql + + agents_to_events = {} + Agent.connection.select_rows(sql).each do |receiver_agent_id, source_agent_id, event_id| + agents_to_events[receiver_agent_id] ||= [] + agents_to_events[receiver_agent_id] << event_id + end + + event_ids = agents_to_events.values.flatten.uniq.compact + + Agent.where(:id => agents_to_events.keys).each do |agent| + agent.update_attribute :last_checked_event_id, event_ids.max + Agent.async_receive(agent.id, agents_to_events[agent.id].uniq) + end + + { + :agent_count => agents_to_events.keys.length, + :event_count => event_ids.length + } + end - Agent.where(:id => agents_to_events.keys).each do |agent| - agent.update_attribute :last_checked_event_id, event_ids.max - agent.async_receive(agents_to_events[agent.id].uniq) + # Given an Agent id and an array of Event ids, load the Agent, call #receive on it with the Event objects, and then + # save it with an updated _last_receive_at_ timestamp. + # + # This method is tagged with _handle_asynchronously_ and will be delayed and run with delayed_job. It accepts Agent + # and Event ids instead of a literal ActiveRecord models because it is preferable to serialize delayed_jobs with ids. + def async_receive(agent_id, event_ids) + agent = Agent.find(agent_id) + agent.receive(Event.where(:id => event_ids)) + agent.last_receive_at = Time.now + agent.save! end + handle_asynchronously :async_receive - { - :agent_count => agents_to_events.keys.length, - :event_count => event_ids.length - } - end + def run_schedule(schedule) + types = where(:schedule => schedule).group(:type).pluck(:type) + types.each do |type| + type.constantize.bulk_check(schedule) + end + end - def self.run_schedule(schedule) - types = where(:schedule => schedule).group(:type).pluck(:type) - types.each do |type| - type.constantize.bulk_check(schedule) + # You can override this to define a custom bulk_check for your type of Agent. + def bulk_check(schedule) + raise "Call #bulk_check on the appropriate subclass of Agent" if self == Agent + where(:schedule => schedule).pluck("agents.id").each do |agent_id| + async_check(agent_id) + end end - end - # You can override this to define a custom bulk_check for your type of Agent. - def self.bulk_check(schedule) - raise "Call #bulk_check on the appropriate subclass of Agent" if self == Agent - where(:schedule => schedule).find_each do |agent| - agent.async_check + # Given an Agent id, load the Agent, call #check on it, and then save it with an updated _last_check_at_ timestamp. + # + # This method is tagged with _handle_asynchronously_ and will be delayed and run with delayed_job. It accepts an Agent + # id instead of a literal Agent because it is preferable to serialize delayed_jobs with ids. + def async_check(agent_id) + agent = Agent.find(agent_id) + agent.check + agent.last_check_at = Time.now + agent.save! end + handle_asynchronously :async_check end end diff --git a/app/models/agents/weather_agent.rb b/app/models/agents/weather_agent.rb index 86d2df2b64..eb3649d749 100644 --- a/app/models/agents/weather_agent.rb +++ b/app/models/agents/weather_agent.rb @@ -44,12 +44,16 @@ def working? end def wunderground - Wunderground.new(options[:api_key]) + Wunderground.new(options[:api_key]) if key_setup? + end + + def key_setup? + options[:api_key] && options[:api_key] != "your-key" end def default_options { - :api_key => "", + :api_key => "your-key", :zipcode => "94103" } @@ -57,13 +61,15 @@ def default_options def validate_options errors.add(:base, "zipcode is required") unless options[:zipcode].present? - errors.add(:base, "api_key is required") unless options[:api_key].present? + errors.add(:base, "api_key is required") unless key_setup? end def check - wunderground.forecast_for(options[:zipcode])["forecast"]["simpleforecast"]["forecastday"].each do |day| - if is_tomorrow?(day) - create_event :payload => day.merge(:zipcode => options[:zipcode]) + if key_setup? + wunderground.forecast_for(options[:zipcode])["forecast"]["simpleforecast"]["forecastday"].each do |day| + if is_tomorrow?(day) + create_event :payload => day.merge(:zipcode => options[:zipcode]) + end end end end diff --git a/spec/models/agent_spec.rb b/spec/models/agent_spec.rb index 36906d3431..ebd8a5b6f5 100644 --- a/spec/models/agent_spec.rb +++ b/spec/models/agent_spec.rb @@ -8,9 +8,11 @@ end it "runs agents with the given schedule" do - mock.any_instance_of(Agents::WeatherAgent).async_check.twice - mock.any_instance_of(Agents::WebsiteAgent).async_check.once + weather_agent_ids = [agents(:bob_weather_agent), agents(:jane_weather_agent)].map(&:id) + stub(Agents::WeatherAgent).async_check(anything) {|agent_id| weather_agent_ids.delete(agent_id) } + stub(Agents::WebsiteAgent).async_check(agents(:bob_website_agent).id) Agent.run_schedule("midnight") + weather_agent_ids.should be_empty end it "groups agents by type" do @@ -20,7 +22,7 @@ end it "only runs agents with the given schedule" do - do_not_allow.any_instance_of(Agents::WebsiteAgent).async_check + do_not_allow(Agents::WebsiteAgent).async_check Agent.run_schedule("blah") end end @@ -116,19 +118,21 @@ class Agents::CannotBeScheduled < Agent end end - describe "#async_check" do - it "records last_check_at and calls check" do + describe ".async_check" do + it "records last_check_at and calls check on the given Agent" do @checker = Agents::SomethingSource.new(:name => "something") @checker.user = users(:bob) @checker.save! - @checker.options[:new] = true - mock(@checker).check.once + mock(@checker).check.once { + @checker.options[:new] = true + } - @checker.last_check_at.should be_nil - @checker.async_check - @checker.last_check_at.should be_within(2).of(Time.now) + mock(Agent).find(@checker.id) { @checker } + @checker.last_check_at.should be_nil + Agents::SomethingSource.async_check(@checker.id) + @checker.reload.last_check_at.should be_within(2).of(Time.now) @checker.reload.options[:new].should be_true # Show that we save options end end @@ -141,13 +145,13 @@ class Agents::CannotBeScheduled < Agent it "should use available events" do mock.any_instance_of(Agents::TriggerAgent).receive(anything).once - agents(:bob_weather_agent).async_check + Agent.async_check(agents(:bob_weather_agent).id) Agent.receive! end it "should track when events have been seen and not see them again" do mock.any_instance_of(Agents::TriggerAgent).receive(anything).once - agents(:bob_weather_agent).async_check + Agent.async_check(agents(:bob_weather_agent).id) Agent.receive! Agent.receive! end @@ -161,8 +165,8 @@ class Agents::CannotBeScheduled < Agent mock.any_instance_of(Agents::TriggerAgent).receive(anything).twice { |events| events.map(&:user).map(&:username).uniq.length.should == 1 } - agents(:bob_weather_agent).async_check - agents(:jane_weather_agent).async_check + Agent.async_check(agents(:bob_weather_agent).id) + Agent.async_check(agents(:jane_weather_agent).id) Agent.receive! end end diff --git a/spec/models/agents/digest_email_agent_spec.rb b/spec/models/agents/digest_email_agent_spec.rb index 6be2693bc8..94a0a57bf7 100644 --- a/spec/models/agents/digest_email_agent_spec.rb +++ b/spec/models/agents/digest_email_agent_spec.rb @@ -23,20 +23,20 @@ def get_message_part(mail, content_type) event2.payload = "Something else you should know about" event2.save! - @checker.async_receive([event1.id, event2.id]) + Agents::DigestEmailAgent.async_receive(@checker.id, [event1.id, event2.id]) @checker.reload.memory[:queue].should == ["Something you should know about", "Something else you should know about"] end end describe "#check" do it "should send an email" do - @checker.async_check + Agents::DigestEmailAgent.async_check(@checker.id) ActionMailer::Base.deliveries.should == [] @checker.memory[:queue] = ["Something you should know about", { :title => "Foo", :url => "http://google.com", :bar => 2 }, { "message" => "hi", :woah => "there" }] @checker.save! - @checker.async_check + Agents::DigestEmailAgent.async_check(@checker.id) ActionMailer::Base.deliveries.last.to.should == ["bob@example.com"] ActionMailer::Base.deliveries.last.subject.should == "something interesting" get_message_part(ActionMailer::Base.deliveries.last, /plain/).strip.should == "Something you should know about\n\nFoo (bar: 2 and url: http://google.com)\n\nhi (woah: there)" diff --git a/spec/models/agents/trigger_agent_spec.rb b/spec/models/agents/trigger_agent_spec.rb index aeb6185957..661e955361 100644 --- a/spec/models/agents/trigger_agent_spec.rb +++ b/spec/models/agents/trigger_agent_spec.rb @@ -50,7 +50,7 @@ @event.save! @checker.should_not be_working # no events have ever been received - @checker.async_receive([@event.id]) + Agents::TriggerAgent.async_receive(@checker.id, [@event.id]) @checker.reload.should be_working # no events have ever been received three_days_from_now = 3.days.from_now stub(Time).now { three_days_from_now }