Skip to content
This repository
Browse code

switch to class methods and ids to shorten the payload size of serial…

…ized delayed_jobs
  • Loading branch information...
commit eead335705fdcbb3310adb47ecf798574ab093f6 1 parent 15b03fc
Andrew Cantino authored March 23, 2013
3  app/controllers/agents_controller.rb
@@ -9,8 +9,7 @@ def index
9 9
   end
10 10
 
11 11
   def run
12  
-    @agent = current_user.agents.find(params[:id])
13  
-    @agent.async_check
  12
+    Agent.async_check(current_user.agents.find(params[:id]).id)
14 13
     redirect_to agents_path, notice: "Agent run queued"
15 14
   end
16 15
 
138  app/models/agent.rb
@@ -100,20 +100,6 @@ def last_event_at
100 100
     @memoized_last_event_at ||= events.select(:created_at).first.try(:created_at)
101 101
   end
102 102
 
103  
-  def async_check
104  
-    check
105  
-    self.last_check_at = Time.now
106  
-    save!
107  
-  end
108  
-  handle_asynchronously :async_check #, :priority => 10, :run_at => Proc.new { 5.minutes.from_now }
109  
-
110  
-  def async_receive(event_ids)
111  
-    receive(Event.where(:id => event_ids))
112  
-    self.last_receive_at = Time.now
113  
-    save!
114  
-  end
115  
-  handle_asynchronously :async_receive #, :priority => 10, :run_at => Proc.new { 5.minutes.from_now }
116  
-
117 103
   def default_schedule
118 104
     self.class.default_schedule
119 105
   end
@@ -135,67 +121,93 @@ def can_receive_events?
135 121
   end
136 122
 
137 123
   # Class Methods
  124
+  class << self
  125
+    def cannot_be_scheduled!
  126
+      @cannot_be_scheduled = true
  127
+    end
138 128
 
139  
-  def self.cannot_be_scheduled!
140  
-    @cannot_be_scheduled = true
141  
-  end
142  
-
143  
-  def self.cannot_be_scheduled?
144  
-    !!@cannot_be_scheduled
145  
-  end
146  
-
147  
-  def self.default_schedule(schedule = nil)
148  
-    @default_schedule = schedule unless schedule.nil?
149  
-    @default_schedule
150  
-  end
151  
-
152  
-  def self.cannot_receive_events!
153  
-    @cannot_receive_events = true
154  
-  end
  129
+    def cannot_be_scheduled?
  130
+      !!@cannot_be_scheduled
  131
+    end
155 132
 
156  
-  def self.cannot_receive_events?
157  
-    !!@cannot_receive_events
158  
-  end
  133
+    def default_schedule(schedule = nil)
  134
+      @default_schedule = schedule unless schedule.nil?
  135
+      @default_schedule
  136
+    end
159 137
 
160  
-  def self.receive!
161  
-    sql = Agent.
162  
-            select("agents.id AS receiver_agent_id, sources.id AS source_agent_id, events.id AS event_id").
163  
-            joins("JOIN links ON (links.receiver_id = agents.id)").
164  
-            joins("JOIN agents AS sources ON (links.source_id = sources.id)").
165  
-            joins("JOIN events ON (events.agent_id = sources.id)").
166  
-            where("agents.last_checked_event_id IS NULL OR events.id > agents.last_checked_event_id").to_sql
  138
+    def cannot_receive_events!
  139
+      @cannot_receive_events = true
  140
+    end
167 141
 
168  
-    agents_to_events = {}
169  
-    Agent.connection.select_rows(sql).each do |receiver_agent_id, source_agent_id, event_id|
170  
-      agents_to_events[receiver_agent_id] ||= []
171  
-      agents_to_events[receiver_agent_id] << event_id
  142
+    def cannot_receive_events?
  143
+      !!@cannot_receive_events
172 144
     end
173 145
 
174  
-    event_ids = agents_to_events.values.flatten.uniq.compact
  146
+    def receive!
  147
+      sql = Agent.
  148
+              select("agents.id AS receiver_agent_id, sources.id AS source_agent_id, events.id AS event_id").
  149
+              joins("JOIN links ON (links.receiver_id = agents.id)").
  150
+              joins("JOIN agents AS sources ON (links.source_id = sources.id)").
  151
+              joins("JOIN events ON (events.agent_id = sources.id)").
  152
+              where("agents.last_checked_event_id IS NULL OR events.id > agents.last_checked_event_id").to_sql
  153
+
  154
+      agents_to_events = {}
  155
+      Agent.connection.select_rows(sql).each do |receiver_agent_id, source_agent_id, event_id|
  156
+        agents_to_events[receiver_agent_id] ||= []
  157
+        agents_to_events[receiver_agent_id] << event_id
  158
+      end
  159
+
  160
+      event_ids = agents_to_events.values.flatten.uniq.compact
  161
+
  162
+      Agent.where(:id => agents_to_events.keys).each do |agent|
  163
+        agent.update_attribute :last_checked_event_id, event_ids.max
  164
+        Agent.async_receive(agent.id, agents_to_events[agent.id].uniq)
  165
+      end
  166
+
  167
+      {
  168
+          :agent_count => agents_to_events.keys.length,
  169
+          :event_count => event_ids.length
  170
+      }
  171
+    end
175 172
 
176  
-    Agent.where(:id => agents_to_events.keys).each do |agent|
177  
-      agent.update_attribute :last_checked_event_id, event_ids.max
178  
-      agent.async_receive(agents_to_events[agent.id].uniq)
  173
+    # Given an Agent id and an array of Event ids, load the Agent, call #receive on it with the Event objects, and then
  174
+    # save it with an updated _last_receive_at_ timestamp.
  175
+    #
  176
+    # This method is tagged with _handle_asynchronously_ and will be delayed and run with delayed_job.  It accepts Agent
  177
+    # and Event ids instead of a literal ActiveRecord models because it is preferable to serialize delayed_jobs with ids.
  178
+    def async_receive(agent_id, event_ids)
  179
+      agent = Agent.find(agent_id)
  180
+      agent.receive(Event.where(:id => event_ids))
  181
+      agent.last_receive_at = Time.now
  182
+      agent.save!
179 183
     end
  184
+    handle_asynchronously :async_receive
180 185
 
181  
-    {
182  
-        :agent_count => agents_to_events.keys.length,
183  
-        :event_count => event_ids.length
184  
-    }
185  
-  end
  186
+    def run_schedule(schedule)
  187
+      types = where(:schedule => schedule).group(:type).pluck(:type)
  188
+      types.each do |type|
  189
+        type.constantize.bulk_check(schedule)
  190
+      end
  191
+    end
186 192
 
187  
-  def self.run_schedule(schedule)
188  
-    types = where(:schedule => schedule).group(:type).pluck(:type)
189  
-    types.each do |type|
190  
-      type.constantize.bulk_check(schedule)
  193
+    # You can override this to define a custom bulk_check for your type of Agent.
  194
+    def bulk_check(schedule)
  195
+      raise "Call #bulk_check on the appropriate subclass of Agent" if self == Agent
  196
+      where(:schedule => schedule).pluck("agents.id").each do |agent_id|
  197
+        async_check(agent_id)
  198
+      end
191 199
     end
192  
-  end
193 200
 
194  
-  # You can override this to define a custom bulk_check for your type of Agent.
195  
-  def self.bulk_check(schedule)
196  
-    raise "Call #bulk_check on the appropriate subclass of Agent" if self == Agent
197  
-    where(:schedule => schedule).find_each do |agent|
198  
-      agent.async_check
  201
+    # Given an Agent id, load the Agent, call #check on it, and then save it with an updated _last_check_at_ timestamp.
  202
+    #
  203
+    # This method is tagged with _handle_asynchronously_ and will be delayed and run with delayed_job.  It accepts an Agent
  204
+    # id instead of a literal Agent because it is preferable to serialize delayed_jobs with ids.
  205
+    def async_check(agent_id)
  206
+      agent = Agent.find(agent_id)
  207
+      agent.check
  208
+      agent.last_check_at = Time.now
  209
+      agent.save!
199 210
     end
  211
+    handle_asynchronously :async_check
200 212
   end
201 213
 end
32  spec/models/agent_spec.rb
@@ -8,9 +8,11 @@
8 8
     end
9 9
 
10 10
     it "runs agents with the given schedule" do
11  
-      mock.any_instance_of(Agents::WeatherAgent).async_check.twice
12  
-      mock.any_instance_of(Agents::WebsiteAgent).async_check.once
  11
+      weather_agent_ids = [agents(:bob_weather_agent), agents(:jane_weather_agent)].map(&:id)
  12
+      stub(Agents::WeatherAgent).async_check(anything) {|agent_id| weather_agent_ids.delete(agent_id) }
  13
+      stub(Agents::WebsiteAgent).async_check(agents(:bob_website_agent).id)
13 14
       Agent.run_schedule("midnight")
  15
+      weather_agent_ids.should be_empty
14 16
     end
15 17
 
16 18
     it "groups agents by type" do
@@ -20,7 +22,7 @@
20 22
     end
21 23
 
22 24
     it "only runs agents with the given schedule" do
23  
-      do_not_allow.any_instance_of(Agents::WebsiteAgent).async_check
  25
+      do_not_allow(Agents::WebsiteAgent).async_check
24 26
       Agent.run_schedule("blah")
25 27
     end
26 28
   end
@@ -116,19 +118,21 @@ class Agents::CannotBeScheduled < Agent
116 118
       end
117 119
     end
118 120
 
119  
-    describe "#async_check" do
120  
-      it "records last_check_at and calls check" do
  121
+    describe ".async_check" do
  122
+      it "records last_check_at and calls check on the given Agent" do
121 123
         @checker = Agents::SomethingSource.new(:name => "something")
122 124
         @checker.user = users(:bob)
123 125
         @checker.save!
124 126
 
125  
-        @checker.options[:new] = true
126  
-        mock(@checker).check.once
  127
+        mock(@checker).check.once {
  128
+          @checker.options[:new] = true
  129
+        }
127 130
 
128  
-        @checker.last_check_at.should be_nil
129  
-        @checker.async_check
130  
-        @checker.last_check_at.should be_within(2).of(Time.now)
  131
+        mock(Agent).find(@checker.id) { @checker }
131 132
 
  133
+        @checker.last_check_at.should be_nil
  134
+        Agents::SomethingSource.async_check(@checker.id)
  135
+        @checker.reload.last_check_at.should be_within(2).of(Time.now)
132 136
         @checker.reload.options[:new].should be_true # Show that we save options
133 137
       end
134 138
     end
@@ -141,13 +145,13 @@ class Agents::CannotBeScheduled < Agent
141 145
 
142 146
       it "should use available events" do
143 147
         mock.any_instance_of(Agents::TriggerAgent).receive(anything).once
144  
-        agents(:bob_weather_agent).async_check
  148
+        Agent.async_check(agents(:bob_weather_agent).id)
145 149
         Agent.receive!
146 150
       end
147 151
 
148 152
       it "should track when events have been seen and not see them again" do
149 153
         mock.any_instance_of(Agents::TriggerAgent).receive(anything).once
150  
-        agents(:bob_weather_agent).async_check
  154
+        Agent.async_check(agents(:bob_weather_agent).id)
151 155
         Agent.receive!
152 156
         Agent.receive!
153 157
       end
@@ -161,8 +165,8 @@ class Agents::CannotBeScheduled < Agent
161 165
         mock.any_instance_of(Agents::TriggerAgent).receive(anything).twice { |events|
162 166
           events.map(&:user).map(&:username).uniq.length.should == 1
163 167
         }
164  
-        agents(:bob_weather_agent).async_check
165  
-        agents(:jane_weather_agent).async_check
  168
+        Agent.async_check(agents(:bob_weather_agent).id)
  169
+        Agent.async_check(agents(:jane_weather_agent).id)
166 170
         Agent.receive!
167 171
       end
168 172
     end
6  spec/models/agents/digest_email_agent_spec.rb
@@ -23,20 +23,20 @@ def get_message_part(mail, content_type)
23 23
       event2.payload = "Something else you should know about"
24 24
       event2.save!
25 25
 
26  
-      @checker.async_receive([event1.id, event2.id])
  26
+      Agents::DigestEmailAgent.async_receive(@checker.id, [event1.id, event2.id])
27 27
       @checker.reload.memory[:queue].should == ["Something you should know about", "Something else you should know about"]
28 28
     end
29 29
   end
30 30
 
31 31
   describe "#check" do
32 32
     it "should send an email" do
33  
-      @checker.async_check
  33
+      Agents::DigestEmailAgent.async_check(@checker.id)
34 34
       ActionMailer::Base.deliveries.should == []
35 35
 
36 36
       @checker.memory[:queue] = ["Something you should know about", { :title => "Foo", :url => "http://google.com", :bar => 2 }, { "message" => "hi", :woah => "there" }]
37 37
       @checker.save!
38 38
 
39  
-      @checker.async_check
  39
+      Agents::DigestEmailAgent.async_check(@checker.id)
40 40
       ActionMailer::Base.deliveries.last.to.should == ["bob@example.com"]
41 41
       ActionMailer::Base.deliveries.last.subject.should == "something interesting"
42 42
       get_message_part(ActionMailer::Base.deliveries.last, /plain/).strip.should == "Something you should know about\n\nFoo (bar: 2 and url: http://google.com)\n\nhi (woah: there)"
2  spec/models/agents/trigger_agent_spec.rb
@@ -50,7 +50,7 @@
50 50
       @event.save!
51 51
 
52 52
       @checker.should_not be_working # no events have ever been received
53  
-      @checker.async_receive([@event.id])
  53
+      Agents::TriggerAgent.async_receive(@checker.id, [@event.id])
54 54
       @checker.reload.should be_working # no events have ever been received
55 55
       three_days_from_now = 3.days.from_now
56 56
       stub(Time).now { three_days_from_now }

0 notes on commit eead335

Please sign in to comment.
Something went wrong with that request. Please try again.