Skip to content

Commit

Permalink
Improve DelayAgent
Browse files Browse the repository at this point in the history
- Reduce repeated calls to interpolated() and hash reference
- Improve tests and cover the last fix
  • Loading branch information
knu committed Apr 28, 2024
1 parent b54974c commit e65f25c
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 19 deletions.
17 changes: 10 additions & 7 deletions app/models/agents/delay_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,16 @@ def working?

def receive(incoming_events)
incoming_events.each do |event|
memory['event_ids'] ||= []
memory['event_ids'] << event.id
if memory['event_ids'].length > interpolated['max_events'].to_i
event_ids = memory['event_ids'] || []
event_ids << event.id
if event_ids.length > interpolated['max_events'].to_i
if options['keep'] == 'newest'
memory['event_ids'].shift
event_ids.shift
else
memory['event_ids'].pop
event_ids.pop
end
end
memory['event_ids'] = event_ids
end
end

Expand All @@ -93,8 +94,10 @@ def check
events = sort_events(events)
end

if interpolated['max_emitted_events'].present? and interpolated['max_emitted_events'].to_i < events.length
events[interpolated['max_emitted_events'].to_i..] = []
max_emitted_events = interpolated['max_emitted_events'].presence&.to_i

if max_emitted_events&.< events.length
events[max_emitted_events..] = []
end

interval = (options['emit_interval'].presence&.to_f || 0).clamp(0..)
Expand Down
32 changes: 20 additions & 12 deletions spec/models/agents/delay_agent_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,10 @@ def create_event(value)
expect {
agent.check
}.to change { agent.events.count }.by(2)

events = agent.events.reorder(id: :desc)
expect(events.first.payload).to eq third_event.payload
expect(events.second.payload).to eq second_event.payload

expect(agent.events.take(2).map(&:payload)).to eq [
third_event,
second_event,
].map(&:payload)
expect(agent.memory['event_ids']).to eq []
end

Expand All @@ -143,9 +142,10 @@ def create_event(value)
agent.check
}.to change { agent.events.count }.by(2)

events = agent.events.reorder(id: :desc)
expect(events.first.payload).to eq second_event.payload
expect(events.second.payload).to eq third_event.payload
expect(agent.events.take(2).map(&:payload)).to eq [
second_event,
third_event,
].map(&:payload)

expect(agent.memory['event_ids']).to eq []
end
Expand All @@ -156,17 +156,25 @@ def create_event(value)
agent.update!(options: agent.options.merge('max_emitted_events' => 1))
end

it "re-emits max_emitted_events and clears just them from the memory" do
it "re-emits max_emitted_events per run" do
agent.receive([first_event, second_event, third_event])
expect(agent.memory['event_ids']).to eq [second_event.id, third_event.id]

expect {
agent.check
}.to change { agent.events.count }.by(1)

events = agent.events.reorder(id: :desc)
expect(agent.events.take.payload).to eq second_event.payload
expect(agent.memory['event_ids']).to eq [third_event.id]
expect(events.first.payload).to eq second_event.payload

expect {
agent.check
}.to change { agent.events.count }.by(1)
expect(agent.events.take.payload).to eq third_event.payload
expect(agent.memory['event_ids']).to eq []

expect {
agent.check
}.not_to(change { agent.events.count })
end
end
end
Expand Down

0 comments on commit e65f25c

Please sign in to comment.