Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion config/outboxer.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
---
:batch_size: 1000
:buffer_size: 10
:concurrency: 1
:tick_interval: 0.1
:poll_interval: 5.0
Expand Down
1 change: 0 additions & 1 deletion db/migrate/create_outboxer_messages.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ def up
t.datetime :updated_at, precision: 6, null: false

t.datetime :queued_at, precision: 6, null: false
t.datetime :buffered_at, precision: 6
t.datetime :publishing_at, precision: 6
t.datetime :published_at, precision: 6
t.datetime :failed_at, precision: 6
Expand Down
18 changes: 5 additions & 13 deletions lib/outboxer/message.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module Outboxer
# Message lifecycle management including queuing, buffering, and publishing of messages.
# Message lifecycle management including queuing and publishing of messages.
module Message
module_function

Expand Down Expand Up @@ -61,7 +61,7 @@ def publish(logger: nil, time: ::Time)
nil
else
logger&.info(
"Outboxer message publishing id=#{message[:id]}" \
"Outboxer message publishing id=#{message[:id]} " \
"messageable_type=#{message[:messageable_type]} " \
"messageable_id=#{message[:messageable_id]}")

Expand Down Expand Up @@ -228,16 +228,14 @@ def publishing_failed(id:, error: nil, time: ::Time)
# @param messageable_id [String] ID of the messageable entity.
# @param updated_at [Time] the timestamp of last update.
# @param queued_at [Time, nil] optional timestamp when message was queued.
# @param buffered_at [Time, nil] optional timestamp when message was buffered.
# @param publishing_at [Time, nil] optional timestamp when message was marked publishing.
# @param published_at [Time, nil] optional timestamp when message was marked published.
# @param failed_at [Time, nil] optional timestamp when message was marked failed.
# @param publisher_id [Integer, nil] optional publisher ID.
# @param publisher_name [String, nil] optional publisher name.
# @return [Hash] serialized message details.
def serialize(id:, status:, messageable_type:, messageable_id:, updated_at:,
queued_at: nil, buffered_at: nil,
publishing_at: nil, published_at: nil, failed_at: nil,
queued_at: nil, publishing_at: nil, published_at: nil, failed_at: nil,
publisher_id: nil, publisher_name: nil)
{
id: id,
Expand All @@ -246,7 +244,6 @@ def serialize(id:, status:, messageable_type:, messageable_id:, updated_at:,
messageable_id: messageable_id,
updated_at: updated_at,
queued_at: queued_at,
buffered_at: buffered_at,
publishing_at: publishing_at,
published_at: published_at,
failed_at: failed_at,
Expand Down Expand Up @@ -276,7 +273,6 @@ def find_by_id(id:)
messageable_id: message.messageable_id,
updated_at: message.updated_at.utc,
queued_at: message.queued_at.utc,
buffered_at: message&.buffered_at&.utc, # TODO: is &.buffered_at necessary?
publishing_at: message&.publishing_at&.utc,
published_at: message&.published_at&.utc,
failed_at: message&.failed_at&.utc,
Expand Down Expand Up @@ -325,7 +321,7 @@ def delete(id:)
end
end

REQUEUE_STATUSES = [:buffered, :publishing, :failed]
REQUEUE_STATUSES = [:publishing, :failed]

# Determines if a message in a certain status can be requeued.
# @param status [Symbol] the status to check.
Expand All @@ -352,7 +348,6 @@ def requeue(id:, publisher_id: nil, publisher_name: nil,
status: Message::Status::QUEUED,
updated_at: current_utc_time,
queued_at: current_utc_time,
buffered_at: nil,
publishing_at: nil,
published_at: nil,
failed_at: nil,
Expand All @@ -364,7 +359,7 @@ def requeue(id:, publisher_id: nil, publisher_name: nil,
end
end

LIST_STATUS_OPTIONS = [nil, :queued, :buffered, :publishing, :published, :failed]
LIST_STATUS_OPTIONS = [nil, :queued, :publishing, :published, :failed]
LIST_STATUS_DEFAULT = nil

LIST_SORT_OPTIONS = [:id, :status, :messageable, :queued_at, :updated_at, :publisher_name]
Expand Down Expand Up @@ -447,7 +442,6 @@ def list(status: LIST_STATUS_DEFAULT,
messageable_id: message.messageable_id,
updated_at: message.updated_at.utc.in_time_zone(time_zone),
queued_at: message.queued_at.utc.in_time_zone(time_zone),
buffered_at: message&.buffered_at&.utc&.in_time_zone(time_zone),
publishing_at: message&.publishing_at&.utc&.in_time_zone(time_zone),
published_at: message&.published_at&.utc&.in_time_zone(time_zone),
failed_at: message&.failed_at&.utc&.in_time_zone(time_zone),
Expand Down Expand Up @@ -508,7 +502,6 @@ def requeue_all(status:, batch_size: 100, time: ::Time,
status: Message::Status::QUEUED,
updated_at: current_utc_time,
queued_at: current_utc_time,
buffered_at: nil,
publishing_at: nil,
published_at: nil,
failed_at: nil,
Expand Down Expand Up @@ -548,7 +541,6 @@ def requeue_by_ids(ids:, publisher_id: nil, publisher_name: nil, time: Time)
status: Message::Status::QUEUED,
updated_at: time.now.utc,
queued_at: current_utc_time, # TODO: confirm this
buffered_at: nil,
publishing_at: nil,
published_at: nil,
failed_at: nil,
Expand Down
6 changes: 0 additions & 6 deletions lib/outboxer/models/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@ class Message < ::ActiveRecord::Base

module Status
QUEUED = "queued"
BUFFERED = "buffered"
PUBLISHING = "publishing"
PUBLISHED = "published"
FAILED = "failed"
end

STATUSES = [
Status::QUEUED,
Status::BUFFERED,
Status::PUBLISHING,
Status::PUBLISHED,
Status::FAILED
Expand All @@ -32,9 +30,6 @@ module Status
# @!attribute [rw] queued_at
# @return [DateTime] The date and time when the message was queued.

# @!attribute [rw] buffered_at
# @return [DateTime] The date and time when the message was buffered.

# @!attribute [rw] publishing_at
# @return [DateTime] The date and time when the message began publishing.

Expand All @@ -50,7 +45,6 @@ module Status
validates :status, inclusion: { in: STATUSES }, length: { maximum: 255 }

scope :queued, -> { where(status: Status::QUEUED) }
scope :buffered, -> { where(status: Status::BUFFERED) }
scope :publishing, -> { where(status: Status::PUBLISHING) }
scope :published, -> { where(status: Status::PUBLISHED) }
scope :failed, -> { where(status: Status::FAILED) }
Expand Down
2 changes: 1 addition & 1 deletion lib/outboxer/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def terminating?
# Parses command line arguments to configure the publisher.
# @param args [Array<String>] The arguments passed via the command line.
# @return [Hash] The parsed options including configuration path, environment,
# buffer size, batch_size, and intervals.
# batch_size, and intervals.
def self.parse_cli_options(args)
options = {}

Expand Down
3 changes: 0 additions & 3 deletions lib/outboxer/web/views/error.erb
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@
<li class="nav-item">
<a class="nav-link" href="<%= outboxer_path('/messages?status=publishing') %>">Publishing</a>
</li>
<li class="nav-item">
<a class="nav-link" href="<%= outboxer_path('/messages?status=buffered') %>">Buffered</a>
</li>
<li class="nav-item">
<a class="nav-link" href="<%= outboxer_path('/messages?status=queued') %>">Queued</a>
</li>
Expand Down
2 changes: 1 addition & 1 deletion lib/outboxer/web/views/home.erb
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@
</tr>
</thead>
<tbody>
<% ['queued', 'buffered', 'publishing', 'published', 'failed'].each do |status| %>
<% ['queued', 'publishing', 'published', 'failed'].each do |status| %>
<tr>
<td class="text-capitalize">
<a class="custom-link" href="<%= outboxer_path("/messages?status=#{status}") %>">
Expand Down
1 change: 0 additions & 1 deletion lib/outboxer/web/views/layout.erb
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
<ul class="navbar-nav">
<% statuses = [
{ name: 'Queued', key: 'queued' },
{ name: 'Buffered', key: 'buffered' },
{ name: 'Publishing', key: 'publishing' },
{ name: 'Published', key: 'published' },
{ name: 'Failed', key: 'failed' }
Expand Down
15 changes: 3 additions & 12 deletions lib/outboxer/web/views/message.erb
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@
<div class="container my-4">
<h4>Latency</h4>
<%
queue_latency = message[:buffered_at] && message[:queued_at] ? (message[:buffered_at] - message[:queued_at]) : 0
buffer_latency = message[:publishing_at] && message[:buffered_at] ? (message[:publishing_at] - message[:buffered_at]) : 0
queue_latency = message[:publishing_at] && message[:queued_at] ? (message[:publishing_at] - message[:queued_at]) : 0
publish_latency =
case message[:status]
when Outboxer::Message::Status::PUBLISHED
Expand All @@ -87,10 +86,9 @@
else
0
end
total_latency = queue_latency + buffer_latency + publish_latency
total_latency = queue_latency + publish_latency

queue_percent = total_latency > 0 ? (queue_latency / total_latency * 100).round(2) : 0
buffer_percent = total_latency > 0 ? (buffer_latency / total_latency * 100).round(2) : 0
publish_percent = total_latency > 0 ? (publish_latency / total_latency * 100).round(2) : 0
%>
<table class="table table-bordered">
Expand All @@ -104,18 +102,11 @@
<tbody>
<!-- Queue Latency -->
<tr>
<td>Queued to Buffered</td>
<td>Queued to Publishing</td>
<td><%= Outboxer::Web.pretty_duration_from_seconds_to_milliseconds(seconds: queue_latency) %></td>
<td><%= queue_percent %></td>
</tr>

<!-- Buffer Latency -->
<tr>
<td>Buffered to Publishing</td>
<td><%= Outboxer::Web.pretty_duration_from_seconds_to_milliseconds(seconds: buffer_latency) %></td>
<td><%= buffer_percent %></td>
</tr>

<% if message[:status] == Outboxer::Message::Status::PUBLISHED %>
<!-- Publish Latency for Published -->
<tr>
Expand Down
4 changes: 0 additions & 4 deletions lib/outboxer/web/views/publisher.erb
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,6 @@
<th scope="row">Batch Size</th>
<td><%= publisher[:settings]['batch_size'] %></td>
</tr>
<tr>
<th scope="row">Buffer Size</th>
<td><%= publisher[:settings]['buffer_size'] %></td>
</tr>
<tr>
<th scope="row">Concurrency</th>
<td><%= publisher[:settings]['concurrency'] %></td>
Expand Down
13 changes: 0 additions & 13 deletions spec/factories/outboxer_messages.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

status { Outboxer::Message::Status::PUBLISHED }
queued_at { 10.seconds.ago }
buffered_at { 9.seconds.ago }
publishing_at { 8.seconds.ago }
updated_at { 7.seconds.ago }

Expand All @@ -19,20 +18,10 @@
updated_at { 10.seconds.ago }
end

trait :buffered do
status { Outboxer::Message::Status::BUFFERED }

queued_at { 10.seconds.ago }
buffered_at { 9.second.ago }
publishing_at { nil }
updated_at { 9.second.ago }
end

trait :publishing do
status { Outboxer::Message::Status::PUBLISHING }

queued_at { 10.seconds.ago }
buffered_at { 9.seconds.ago }
publishing_at { 8.seconds.ago }
updated_at { 8.seconds.ago }
end
Expand All @@ -41,7 +30,6 @@
status { Outboxer::Message::Status::PUBLISHED }

queued_at { 10.seconds.ago }
buffered_at { 9.seconds.ago }
publishing_at { 8.seconds.ago }
updated_at { 7.seconds.ago }
end
Expand All @@ -50,7 +38,6 @@
status { Outboxer::Message::Status::FAILED }

queued_at { 10.seconds.ago }
buffered_at { 9.seconds.ago }
publishing_at { 8.seconds.ago }
updated_at { 7.seconds.ago }
end
Expand Down
1 change: 0 additions & 1 deletion spec/factories/outboxer_publishers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
status { Outboxer::Publisher::Status::PUBLISHING }
settings do
{
"buffer_size" => 1000,
"concurrency" => 3,
"tick_interval" => 0.1,
"poll_interval" => 5.0,
Expand Down
8 changes: 1 addition & 7 deletions spec/lib/outboxer/message/can_requeue_all_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module Outboxer
RSpec.describe Message do
describe ".can_requeue?" do
let!(:message_1) { create(:outboxer_message, :queued) }
let!(:message_2) { create(:outboxer_message, :buffered) }
let!(:message_2) { create(:outboxer_message, :queued) }
let!(:message_3) { create(:outboxer_message, :failed) }
let!(:message_4) { create(:outboxer_message, :failed) }
let!(:message_5) { create(:outboxer_message, :publishing) }
Expand All @@ -15,12 +15,6 @@ module Outboxer
end
end

context "when status is buffered" do
it "returns true" do
expect(Message.can_requeue?(status: Message::Status::BUFFERED)).to eq true
end
end

context "when status is publishing" do
it "returns true" do
expect(
Expand Down
8 changes: 1 addition & 7 deletions spec/lib/outboxer/message/can_requeue_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module Outboxer
RSpec.describe Message do
describe ".can_requeue?" do
let!(:message_1) { create(:outboxer_message, :queued) }
let!(:message_2) { create(:outboxer_message, :buffered) }
let!(:message_2) { create(:outboxer_message, :queued) }
let!(:message_3) { create(:outboxer_message, :failed) }
let!(:message_4) { create(:outboxer_message, :failed) }
let!(:message_5) { create(:outboxer_message, :publishing) }
Expand All @@ -15,12 +15,6 @@ module Outboxer
end
end

context "when status is buffered" do
it "returns true" do
expect(Message.can_requeue?(status: Message::Status::BUFFERED)).to eq true
end
end

context "when status is publishing" do
it "returns true" do
expect(Message.can_requeue?(status: Message::Status::PUBLISHING)).to eq true
Expand Down
14 changes: 1 addition & 13 deletions spec/lib/outboxer/message/delete_all_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ module Outboxer
end

let!(:message_1) { create(:outboxer_message, :queued) }
let!(:message_2) { create(:outboxer_message, :buffered) }
let!(:message_2) { create(:outboxer_message, :queued) }

let!(:message_3) { create(:outboxer_message, :failed) }
let!(:exception_1) { create(:outboxer_exception, message: message_3) }
Expand Down Expand Up @@ -55,18 +55,6 @@ module Outboxer
end
end

context "when status is buffered" do
before do
Message.delete_all(status: Message::Status::BUFFERED, batch_size: 1)
end

it "deletes queued messages" do
expect(Models::Message.all.pluck(:id)).to match_array([
message_1.id, message_3.id, message_4.id, message_5.id, message_6.id, message_7.id
])
end
end

context "when status is publishing" do
before do
Message.delete_all(status: Message::Status::PUBLISHING, batch_size: 1)
Expand Down
2 changes: 1 addition & 1 deletion spec/lib/outboxer/message/delete_by_ids_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ module Outboxer
let!(:exception_1) { create(:outboxer_exception, message: message_1) }
let!(:frame_1) { create(:outboxer_frame, exception: exception_1) }

let!(:message_2) { create(:outboxer_message, :buffered) }
let!(:message_2) { create(:outboxer_message, :queued) }
let!(:exception_2) { create(:outboxer_exception, message: message_2) }
let!(:frame_2) { create(:outboxer_frame, exception: exception_2) }

Expand Down
Loading