Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 0 additions & 25 deletions db/migrate/create_outboxer_counters.rb

This file was deleted.

20 changes: 13 additions & 7 deletions db/migrate/create_outboxer_message_counts.rb
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
class CreateOutboxerMessageCounts < ActiveRecord::Migration[7.1]
class CreateOutboxerMessageCounts < ActiveRecord::Migration[6.1]
def up
create_table :outboxer_message_counts do |t|
t.string :status, limit: 255, null: false
t.integer :partition, null: false
t.bigint :value, null: false
t.timestamps
t.string :hostname, limit: 255, null: false
t.integer :process_id, null: false
t.integer :thread_id, null: false

t.integer :queued, null: false
t.integer :publishing, null: false
t.integer :published, null: false
t.integer :failed, null: false

t.timestamps null: false
end

add_index :outboxer_message_counts, [:status, :partition],
unique: true, name: :idx_outboxer_counts_status_partition
add_index :outboxer_message_counts, [:hostname, :process_id, :thread_id],
unique: true, name: "idx_outboxer_message_counts_identity"
end

def down
Expand Down
17 changes: 0 additions & 17 deletions db/migrate/create_outboxer_message_totals.rb

This file was deleted.

4 changes: 0 additions & 4 deletions generators/install_generator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ def copy_migrations
"db/migrate/create_outboxer_message_counts.rb",
"db/migrate/create_outboxer_message_counts.rb")

migration_template(
"db/migrate/create_outboxer_message_totals.rb",
"db/migrate/create_outboxer_message_totals.rb")

migration_template(
"db/migrate/create_outboxer_exceptions.rb",
"db/migrate/create_outboxer_exceptions.rb")
Expand Down
5 changes: 1 addition & 4 deletions lib/outboxer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,12 @@
require_relative "outboxer/models/frame"
require_relative "outboxer/models/exception"
require_relative "outboxer/models/message"
require_relative "outboxer/models/message_count"
require_relative "outboxer/models/message_total"
require_relative "outboxer/models/counter"
require_relative "outboxer/models/message/count"

require_relative "outboxer/models/publisher"
require_relative "outboxer/models/signal"

require_relative "outboxer/database"
require_relative "outboxer/counter"
require_relative "outboxer/message"
require_relative "outboxer/publisher"

Expand Down
86 changes: 0 additions & 86 deletions lib/outboxer/counter.rb

This file was deleted.

6 changes: 2 additions & 4 deletions lib/outboxer/database.rb
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,9 @@ def truncate(logger: nil)
if connection.adapter_name.downcase.include?("postgres")
connection.execute(<<~SQL)
TRUNCATE TABLE
outboxer_message_counts,
outboxer_message_totals,
outboxer_frames,
outboxer_exceptions,
outboxer_message_counts,
outboxer_messages,
outboxer_signals,
outboxer_publishers
Expand All @@ -97,10 +96,9 @@ def truncate(logger: nil)

begin
connection.execute("SET FOREIGN_KEY_CHECKS = 0;")
connection.execute("TRUNCATE TABLE outboxer_message_counts;")
connection.execute("TRUNCATE TABLE outboxer_message_totals;")
connection.execute("TRUNCATE TABLE outboxer_frames;")
connection.execute("TRUNCATE TABLE outboxer_exceptions;")
connection.execute("TRUNCATE TABLE outboxer_message_counts;")
connection.execute("TRUNCATE TABLE outboxer_messages;")
connection.execute("TRUNCATE TABLE outboxer_signals;")
connection.execute("TRUNCATE TABLE outboxer_publishers;")
Expand Down
63 changes: 20 additions & 43 deletions lib/outboxer/loader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ module Loader
LOAD_DEFAULTS = {
batch_size: 1_000,
concurrency: 5,
size: 1_000_000,
size: 1,
tick_interval: 1
}

Expand All @@ -35,7 +35,7 @@ def parse_cli_options(argv)
options[:concurrency] = v
end

opts.on("--size SIZE", Integer, "Number of messages to load (default: 1M)") do |v|
opts.on("--size SIZE", Integer, "Number of messages to load (default: 1)") do |v|
options[:size] = v
end

Expand Down Expand Up @@ -65,24 +65,28 @@ def load(batch_size: LOAD_DEFAULTS[:batch_size],
size: LOAD_DEFAULTS[:size],
tick_interval: LOAD_DEFAULTS[:tick_interval],
logger: Outboxer::Logger.new($stdout))
status = :loading
reader, _writer = trap_signals
Thread.main[:status] = :loading

Signal.trap("INT") { Thread.main[:status] = :terminating }
Signal.trap("TERM") { Thread.main[:status] = :terminating }

queue = Queue.new
threads = spawn_workers(concurrency, queue, logger)

enqueued = 0
started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)

while enqueued < size
status = read_status(reader) || status

case status
case Thread.main[:status]
when :terminating
break
when :stopped
sleep tick_interval
when :loading
messageables = Array.new(batch_size) do
remaining = size - enqueued
count = [batch_size, remaining].min

messageables = Array.new(count) do
OpenStruct.new(class: OpenStruct.new(name: "Event"), id: SecureRandom.hex(3))
end

Expand All @@ -91,8 +95,6 @@ def load(batch_size: LOAD_DEFAULTS[:batch_size],

# logger.info "[main] enqueued #{enqueued}/#{size}" if (enqueued % (batch_size * 2)).zero?
end

# display_metrics(logger)
end

queue.close
Expand All @@ -108,41 +110,16 @@ def load(batch_size: LOAD_DEFAULTS[:batch_size],
logger.info "[main] done"
end

def trap_signals
reader, writer = IO.pipe

%w[INT TERM TSTP CONT].each do |signal|
Signal.trap(signal) { writer.puts(signal) }
end

[reader, writer]
end

def read_status(reader)
line = reader.ready? ? reader.gets&.strip : nil

case line
when "INT", "TERM" then :terminating
when "TSTP" then :stopped
when "CONT" then :loading
end
end

def spawn_workers(concurrency, queue, logger)
Array.new(concurrency) do |index|
def spawn_workers(concurrency, queue, _logger)
Array.new(concurrency) do |_index|
Thread.new do
begin
while (messageables = queue.pop)
messageables.each do |messageable|
Outboxer::Message.queue(messageable: messageable)
rescue StandardError => error
logger.error "[thread-#{index}] #{error.class}: #{error.message}"

sleep 1
end
while Thread.main[:status] != :terminating
messageables = queue.pop
break if messageables.nil?

messageables.each do |messageable|
Outboxer::Message.queue(messageable: messageable)
end
rescue ClosedQueueError
# Queue closed and empty — exit gracefully
end
end
end
Expand Down
Loading