From fda850f47da5bbc86f8a00e899da1280e7c5b10f Mon Sep 17 00:00:00 2001 From: Akinori MUSHA Date: Mon, 17 Jul 2023 01:00:39 +0900 Subject: [PATCH] Add a new option to DelayAgent: emit_interval This specifies the interval in seconds between emitting events. The `events_order` option is fixed so that it works without the `max_emitted_events` option. --- app/models/agents/delay_agent.rb | 44 ++++++----- spec/models/agents/delay_agent_spec.rb | 102 ++++++++++++++++++------- 2 files changed, 101 insertions(+), 45 deletions(-) diff --git a/app/models/agents/delay_agent.rb b/app/models/agents/delay_agent.rb index e1c2de6445..7b7b9e01da 100644 --- a/app/models/agents/delay_agent.rb +++ b/app/models/agents/delay_agent.rb @@ -14,6 +14,8 @@ class DelayAgent < Agent `expected_receive_period_in_days` is used to determine if the Agent is working. Set it to the maximum number of days that you anticipate passing without this Agent receiving an incoming Event. + `emit_interval` specifies the interval in seconds between emitting events. This is zero (no interval) by default. + `max_emitted_events` is used to limit the number of the maximum events which should be created. If you omit this DelayAgent will create events for every event stored in the memory. # Ordering Events @@ -23,10 +25,11 @@ class DelayAgent < Agent def default_options { - 'expected_receive_period_in_days' => '10', - 'max_events' => '100', + 'expected_receive_period_in_days' => 10, + 'max_events' => 100, 'keep' => 'newest', 'max_emitted_events' => '', + 'emit_interval' => 0, 'events_order' => [], } end @@ -35,6 +38,7 @@ def default_options form_configurable :max_events, type: :number, html_options: { min: 1 } form_configurable :keep, type: :array, values: %w[newest oldest] form_configurable :max_emitted_events, type: :number, html_options: { min: 0 } + form_configurable :emit_interval, type: :number, html_options: { min: 0, step: 0.001 } form_configurable :events_order, type: :json def validate_options @@ -56,6 +60,10 @@ def validate_options errors.add(:base, "The 'max_emitted_events' option is optional and should be an integer greater than 0") end end + + unless interpolated['emit_interval'] in nil | 0.. | /\A\d+(?:\.\d+)?\z/ + errors.add(:base, "The 'emit_interval' option should be a non-negative number if set") + end end def working? @@ -77,22 +85,24 @@ def receive(incoming_events) end def check - if memory['event_ids'].present? - events = received_events.where(id: memory['event_ids']).reorder('events.id asc') - if interpolated['max_emitted_events'].present? - limit = interpolated['max_emitted_events'].to_i - events = - if options[SortableEvents::EVENTS_ORDER_KEY].present? - sort_events(events).first(limit) - else - events.limit(limit) - end - end + return if memory['event_ids'].blank? - events.each do |event| - create_event payload: event.payload - memory['event_ids'].delete(event.id) - end + events = received_events.where(id: memory['event_ids']).reorder(:id).to_a + + if interpolated[SortableEvents::EVENTS_ORDER_KEY].present? + events = sort_events(events) + end + + if interpolated['max_emitted_events'].present? + events[interpolated['max_emitted_events'].to_i..] = [] + end + + interval = (options['emit_interval'].presence&.to_f || 0).clamp(0..) + + events.each_with_index do |event, i| + sleep interval unless i.zero? + create_event payload: event.payload + memory['event_ids'].delete(event.id) end end end diff --git a/spec/models/agents/delay_agent_spec.rb b/spec/models/agents/delay_agent_spec.rb index b869d6c810..4495166674 100644 --- a/spec/models/agents/delay_agent_spec.rb +++ b/spec/models/agents/delay_agent_spec.rb @@ -1,25 +1,24 @@ require 'rails_helper' describe Agents::DelayAgent do - let(:agent) do - _agent = Agents::DelayAgent.new(name: 'My DelayAgent') - _agent.options = _agent.default_options.merge('max_events' => 2) - _agent.user = users(:bob) - _agent.sources << agents(:bob_website_agent) - _agent.save! - _agent + let(:agent) { + Agents::DelayAgent.create!( + name: 'My DelayAgent', + user: users(:bob), + options: default_options.merge('max_events' => 2), + sources: [agents(:bob_website_agent)] + ) + } + + let(:default_options) { Agents::DelayAgent.new.default_options } + + def create_event(value) + Event.create!(payload: { value: }, agent: agents(:bob_website_agent)) end - def create_event - _event = Event.new(payload: { random: rand }) - _event.agent = agents(:bob_website_agent) - _event.save! - _event - end - - let(:first_event) { create_event } - let(:second_event) { create_event } - let(:third_event) { create_event } + let(:first_event) { create_event("one") } + let(:second_event) { create_event("two") } + let(:third_event) { create_event("three") } describe "#working?" do it "checks if events have been received within expected receive period" do @@ -48,6 +47,21 @@ def create_event expect(agent).to be_valid end + it "should validate emit_interval" do + agent.options.delete('emit_interval') + expect(agent).to be_valid + agent.options['emit_interval'] = "0" + expect(agent).to be_valid + agent.options['emit_interval'] = "0.5" + expect(agent).to be_valid + agent.options['emit_interval'] = 0.5 + expect(agent).to be_valid + agent.options['emit_interval'] = '' + expect(agent).not_to be_valid + agent.options['emit_interval'] = nil + expect(agent).to be_valid + end + it "should validate presence of expected_receive_period_in_days" do agent.options['expected_receive_period_in_days'] = "" expect(agent).not_to be_valid @@ -98,30 +112,62 @@ def create_event agent.receive([first_event, second_event, third_event]) expect(agent.memory['event_ids']).to eq [second_event.id, third_event.id] + expect(agent).to receive(:sleep).with(0).once + expect { agent.check }.to change { agent.events.count }.by(2) - events = agent.events.reorder('events.id desc') + events = agent.events.reorder(id: :desc) expect(events.first.payload).to eq third_event.payload expect(events.second.payload).to eq second_event.payload expect(agent.memory['event_ids']).to eq [] end - it "re-emits max_emitted_events and clears just them from the memory" do - agent.options['max_emitted_events'] = 1 - agent.receive([first_event, second_event, third_event]) - expect(agent.memory['event_ids']).to eq [second_event.id, third_event.id] + context "with events_order and emit_interval" do + before do + agent.update!(options: agent.options.merge( + 'events_order' => ['{{ value }}'], + 'emit_interval' => 1, + )) + end - expect { - agent.check - }.to change { agent.events.count }.by(1) + it "re-emits Events in that order and clears the memory with that interval" do + agent.receive([first_event, second_event, third_event]) + expect(agent.memory['event_ids']).to eq [second_event.id, third_event.id] + + expect(agent).to receive(:sleep).with(1).once + + expect { + agent.check + }.to change { agent.events.count }.by(2) + + events = agent.events.reorder(id: :desc) + expect(events.first.payload).to eq second_event.payload + expect(events.second.payload).to eq third_event.payload + + expect(agent.memory['event_ids']).to eq [] + end + end + + context "with max_emitted_events" do + before do + agent.update!(options: agent.options.merge('max_emitted_events' => 1)) + end + + it "re-emits max_emitted_events and clears just them from the memory" do + agent.receive([first_event, second_event, third_event]) + expect(agent.memory['event_ids']).to eq [second_event.id, third_event.id] - events = agent.events.reorder('events.id desc') - expect(agent.memory['event_ids']).to eq [third_event.id] - expect(events.first.payload).to eq second_event.payload + expect { + agent.check + }.to change { agent.events.count }.by(1) + events = agent.events.reorder(id: :desc) + expect(agent.memory['event_ids']).to eq [third_event.id] + expect(events.first.payload).to eq second_event.payload + end end end end