Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a new option to DelayAgent: emit_interval #3301

Merged
merged 1 commit into from Jul 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
44 changes: 27 additions & 17 deletions app/models/agents/delay_agent.rb
Expand Up @@ -14,6 +14,8 @@ class DelayAgent < Agent
`expected_receive_period_in_days` is used to determine if the Agent is working. Set it to the maximum number of days
that you anticipate passing without this Agent receiving an incoming Event.

`emit_interval` specifies the interval in seconds between emitting events. This is zero (no interval) by default.

`max_emitted_events` is used to limit the number of the maximum events which should be created. If you omit this DelayAgent will create events for every event stored in the memory.

# Ordering Events
Expand All @@ -23,10 +25,11 @@ class DelayAgent < Agent

def default_options
{
'expected_receive_period_in_days' => '10',
'max_events' => '100',
'expected_receive_period_in_days' => 10,
'max_events' => 100,
'keep' => 'newest',
'max_emitted_events' => '',
'emit_interval' => 0,
'events_order' => [],
}
end
Expand All @@ -35,6 +38,7 @@ def default_options
form_configurable :max_events, type: :number, html_options: { min: 1 }
form_configurable :keep, type: :array, values: %w[newest oldest]
form_configurable :max_emitted_events, type: :number, html_options: { min: 0 }
form_configurable :emit_interval, type: :number, html_options: { min: 0, step: 0.001 }
form_configurable :events_order, type: :json

def validate_options
Expand All @@ -56,6 +60,10 @@ def validate_options
errors.add(:base, "The 'max_emitted_events' option is optional and should be an integer greater than 0")
end
end

unless interpolated['emit_interval'] in nil | 0.. | /\A\d+(?:\.\d+)?\z/
errors.add(:base, "The 'emit_interval' option should be a non-negative number if set")
end
end

def working?
Expand All @@ -77,22 +85,24 @@ def receive(incoming_events)
end

def check
if memory['event_ids'].present?
events = received_events.where(id: memory['event_ids']).reorder('events.id asc')
if interpolated['max_emitted_events'].present?
limit = interpolated['max_emitted_events'].to_i
events =
if options[SortableEvents::EVENTS_ORDER_KEY].present?
sort_events(events).first(limit)
else
events.limit(limit)
end
end
return if memory['event_ids'].blank?

events.each do |event|
create_event payload: event.payload
memory['event_ids'].delete(event.id)
end
events = received_events.where(id: memory['event_ids']).reorder(:id).to_a

if interpolated[SortableEvents::EVENTS_ORDER_KEY].present?
events = sort_events(events)
end

if interpolated['max_emitted_events'].present?
events[interpolated['max_emitted_events'].to_i..] = []
end

interval = (options['emit_interval'].presence&.to_f || 0).clamp(0..)

events.each_with_index do |event, i|
sleep interval unless i.zero?
create_event payload: event.payload
memory['event_ids'].delete(event.id)
end
end
end
Expand Down
102 changes: 74 additions & 28 deletions spec/models/agents/delay_agent_spec.rb
@@ -1,25 +1,24 @@
require 'rails_helper'

describe Agents::DelayAgent do
let(:agent) do
_agent = Agents::DelayAgent.new(name: 'My DelayAgent')
_agent.options = _agent.default_options.merge('max_events' => 2)
_agent.user = users(:bob)
_agent.sources << agents(:bob_website_agent)
_agent.save!
_agent
let(:agent) {
Agents::DelayAgent.create!(
name: 'My DelayAgent',
user: users(:bob),
options: default_options.merge('max_events' => 2),
sources: [agents(:bob_website_agent)]
)
}

let(:default_options) { Agents::DelayAgent.new.default_options }

def create_event(value)
Event.create!(payload: { value: }, agent: agents(:bob_website_agent))
end

def create_event
_event = Event.new(payload: { random: rand })
_event.agent = agents(:bob_website_agent)
_event.save!
_event
end

let(:first_event) { create_event }
let(:second_event) { create_event }
let(:third_event) { create_event }
let(:first_event) { create_event("one") }
let(:second_event) { create_event("two") }
let(:third_event) { create_event("three") }

describe "#working?" do
it "checks if events have been received within expected receive period" do
Expand Down Expand Up @@ -48,6 +47,21 @@ def create_event
expect(agent).to be_valid
end

it "should validate emit_interval" do
agent.options.delete('emit_interval')
expect(agent).to be_valid
agent.options['emit_interval'] = "0"
expect(agent).to be_valid
agent.options['emit_interval'] = "0.5"
expect(agent).to be_valid
agent.options['emit_interval'] = 0.5
expect(agent).to be_valid
agent.options['emit_interval'] = ''
expect(agent).not_to be_valid
agent.options['emit_interval'] = nil
expect(agent).to be_valid
end

it "should validate presence of expected_receive_period_in_days" do
agent.options['expected_receive_period_in_days'] = ""
expect(agent).not_to be_valid
Expand Down Expand Up @@ -98,30 +112,62 @@ def create_event
agent.receive([first_event, second_event, third_event])
expect(agent.memory['event_ids']).to eq [second_event.id, third_event.id]

expect(agent).to receive(:sleep).with(0).once

expect {
agent.check
}.to change { agent.events.count }.by(2)

events = agent.events.reorder('events.id desc')
events = agent.events.reorder(id: :desc)
expect(events.first.payload).to eq third_event.payload
expect(events.second.payload).to eq second_event.payload

expect(agent.memory['event_ids']).to eq []
end

it "re-emits max_emitted_events and clears just them from the memory" do
agent.options['max_emitted_events'] = 1
agent.receive([first_event, second_event, third_event])
expect(agent.memory['event_ids']).to eq [second_event.id, third_event.id]
context "with events_order and emit_interval" do
before do
agent.update!(options: agent.options.merge(
'events_order' => ['{{ value }}'],
'emit_interval' => 1,
))
end

expect {
agent.check
}.to change { agent.events.count }.by(1)
it "re-emits Events in that order and clears the memory with that interval" do
agent.receive([first_event, second_event, third_event])
expect(agent.memory['event_ids']).to eq [second_event.id, third_event.id]

expect(agent).to receive(:sleep).with(1).once

expect {
agent.check
}.to change { agent.events.count }.by(2)

events = agent.events.reorder(id: :desc)
expect(events.first.payload).to eq second_event.payload
expect(events.second.payload).to eq third_event.payload

expect(agent.memory['event_ids']).to eq []
end
end

context "with max_emitted_events" do
before do
agent.update!(options: agent.options.merge('max_emitted_events' => 1))
end

it "re-emits max_emitted_events and clears just them from the memory" do
agent.receive([first_event, second_event, third_event])
expect(agent.memory['event_ids']).to eq [second_event.id, third_event.id]

events = agent.events.reorder('events.id desc')
expect(agent.memory['event_ids']).to eq [third_event.id]
expect(events.first.payload).to eq second_event.payload
expect {
agent.check
}.to change { agent.events.count }.by(1)

events = agent.events.reorder(id: :desc)
expect(agent.memory['event_ids']).to eq [third_event.id]
expect(events.first.payload).to eq second_event.payload
end
end
end
end