Skip to content

Commit

Permalink
Add a new option to DelayAgent: emit_interval
Browse files Browse the repository at this point in the history
This specifies the interval in seconds between emitting events.

The `events_order` option is fixed so that it works without the `max_emitted_events` option.
  • Loading branch information
knu committed Jul 16, 2023
1 parent a22a707 commit a45c85c
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 45 deletions.
44 changes: 27 additions & 17 deletions app/models/agents/delay_agent.rb
Expand Up @@ -14,6 +14,8 @@ class DelayAgent < Agent
`expected_receive_period_in_days` is used to determine if the Agent is working. Set it to the maximum number of days
that you anticipate passing without this Agent receiving an incoming Event.
`emit_interval` specifies the interval in seconds between emitting events. This is zero (no interval) by default.
`max_emitted_events` is used to limit the number of the maximum events which should be created. If you omit this DelayAgent will create events for every event stored in the memory.
# Ordering Events
Expand All @@ -23,10 +25,11 @@ class DelayAgent < Agent

def default_options
{
'expected_receive_period_in_days' => '10',
'max_events' => '100',
'expected_receive_period_in_days' => 10,
'max_events' => 100,
'keep' => 'newest',
'max_emitted_events' => '',
'emit_interval' => 0,
'events_order' => [],
}
end
Expand All @@ -35,6 +38,7 @@ def default_options
form_configurable :max_events, type: :number, html_options: { min: 1 }
form_configurable :keep, type: :array, values: %w[newest oldest]
form_configurable :max_emitted_events, type: :number, html_options: { min: 0 }
form_configurable :emit_interval, type: :number, html_options: { min: 0, step: 0.001 }
form_configurable :events_order, type: :json

def validate_options
Expand All @@ -56,6 +60,10 @@ def validate_options
errors.add(:base, "The 'max_emitted_events' option is optional and should be an integer greater than 0")
end
end

unless interpolated['emit_interval'] in nil | 0.. | /\A\d(?:\.\d+)?\z/
errors.add(:base, "The 'emit_interval' option should be a non-negative number if set")
end
end

def working?
Expand All @@ -77,22 +85,24 @@ def receive(incoming_events)
end

def check
if memory['event_ids'].present?
events = received_events.where(id: memory['event_ids']).reorder('events.id asc')
if interpolated['max_emitted_events'].present?
limit = interpolated['max_emitted_events'].to_i
events =
if options[SortableEvents::EVENTS_ORDER_KEY].present?
sort_events(events).first(limit)
else
events.limit(limit)
end
end
return if memory['event_ids'].blank?

events.each do |event|
create_event payload: event.payload
memory['event_ids'].delete(event.id)
end
events = received_events.where(id: memory['event_ids']).reorder(:id).to_a

if interpolated[SortableEvents::EVENTS_ORDER_KEY].present?
events = sort_events(events)
end

if interpolated['max_emitted_events'].present?
events[interpolated['max_emitted_events'].to_i..] = []
end

interval = (options['emit_interval'].presence&.to_f || 0).clamp(0..)

events.each_with_index do |event, i|
sleep interval unless i.zero?
create_event payload: event.payload
memory['event_ids'].delete(event.id)
end
end
end
Expand Down
102 changes: 74 additions & 28 deletions spec/models/agents/delay_agent_spec.rb
@@ -1,25 +1,24 @@
require 'rails_helper'

describe Agents::DelayAgent do
let(:agent) do
_agent = Agents::DelayAgent.new(name: 'My DelayAgent')
_agent.options = _agent.default_options.merge('max_events' => 2)
_agent.user = users(:bob)
_agent.sources << agents(:bob_website_agent)
_agent.save!
_agent
let(:agent) {
Agents::DelayAgent.create!(
name: 'My DelayAgent',
user: users(:bob),
options: default_options.merge('max_events' => 2),
sources: [agents(:bob_website_agent)]
)
}

let(:default_options) { Agents::DelayAgent.new.default_options }

def create_event(value)
Event.create!(payload: { value: }, agent: agents(:bob_website_agent))
end

def create_event
_event = Event.new(payload: { random: rand })
_event.agent = agents(:bob_website_agent)
_event.save!
_event
end

let(:first_event) { create_event }
let(:second_event) { create_event }
let(:third_event) { create_event }
let(:first_event) { create_event("one") }
let(:second_event) { create_event("two") }
let(:third_event) { create_event("three") }

describe "#working?" do
it "checks if events have been received within expected receive period" do
Expand Down Expand Up @@ -48,6 +47,21 @@ def create_event
expect(agent).to be_valid
end

it "should validate emit_interval" do
agent.options.delete('emit_interval')
expect(agent).to be_valid
agent.options['emit_interval'] = "0"
expect(agent).to be_valid
agent.options['emit_interval'] = "0.5"
expect(agent).to be_valid
agent.options['emit_interval'] = 0.5
expect(agent).to be_valid
agent.options['emit_interval'] = ''
expect(agent).not_to be_valid
agent.options['emit_interval'] = nil
expect(agent).to be_valid
end

it "should validate presence of expected_receive_period_in_days" do
agent.options['expected_receive_period_in_days'] = ""
expect(agent).not_to be_valid
Expand Down Expand Up @@ -98,30 +112,62 @@ def create_event
agent.receive([first_event, second_event, third_event])
expect(agent.memory['event_ids']).to eq [second_event.id, third_event.id]

expect(agent).to receive(:sleep).with(0).once

expect {
agent.check
}.to change { agent.events.count }.by(2)

events = agent.events.reorder('events.id desc')
events = agent.events.reorder(id: :desc)
expect(events.first.payload).to eq third_event.payload
expect(events.second.payload).to eq second_event.payload

expect(agent.memory['event_ids']).to eq []
end

it "re-emits max_emitted_events and clears just them from the memory" do
agent.options['max_emitted_events'] = 1
agent.receive([first_event, second_event, third_event])
expect(agent.memory['event_ids']).to eq [second_event.id, third_event.id]
context "with events_order and emit_interval" do
before do
agent.update!(options: agent.options.merge(
'events_order' => ['{{ value }}'],
'emit_interval' => 1,
))
end

expect {
agent.check
}.to change { agent.events.count }.by(1)
it "re-emits Events in that order and clears the memory with that interval" do
agent.receive([first_event, second_event, third_event])
expect(agent.memory['event_ids']).to eq [second_event.id, third_event.id]

expect(agent).to receive(:sleep).with(1).once

expect {
agent.check
}.to change { agent.events.count }.by(2)

events = agent.events.reorder(id: :desc)
expect(events.first.payload).to eq second_event.payload
expect(events.second.payload).to eq third_event.payload

expect(agent.memory['event_ids']).to eq []
end
end

context "with max_emitted_events" do
before do
agent.update!(options: agent.options.merge('max_emitted_events' => 1))
end

it "re-emits max_emitted_events and clears just them from the memory" do
agent.receive([first_event, second_event, third_event])
expect(agent.memory['event_ids']).to eq [second_event.id, third_event.id]

events = agent.events.reorder('events.id desc')
expect(agent.memory['event_ids']).to eq [third_event.id]
expect(events.first.payload).to eq second_event.payload
expect {
agent.check
}.to change { agent.events.count }.by(1)

events = agent.events.reorder(id: :desc)
expect(agent.memory['event_ids']).to eq [third_event.id]
expect(events.first.payload).to eq second_event.payload
end
end
end
end

0 comments on commit a45c85c

Please sign in to comment.