Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ROM on CI #302

Merged
merged 7 commits into from
Apr 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ matrix:
env: TARGET=ruby_event_store-rom SCRIPT=test DATABASE_URL=sqlite:db.sqlite3
- rvm: 2.4.3
env: TARGET=ruby_event_store-rom SCRIPT=test DATABASE_URL=postgres://localhost/rails_event_store_active_record?pool=5
- rvm: 2.4.3
env: TARGET=ruby_event_store-rom SCRIPT=test DATABASE_URL=mysql2://root:@127.0.0.1/rails_event_store_active_record?pool=5

before_script:
- psql -c 'create extension pgcrypto;' -U postgres
Expand Down
3 changes: 2 additions & 1 deletion ruby_event_store-rom/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ IGNORE = RubyEventStore::ROM::EventRepository\#handle_not_found_errors \
RubyEventStore::ROM::SQL::SpecHelper\#load_gateway_schema \
RubyEventStore::ROM::SQL::SpecHelper\#establish_gateway_connection \
RubyEventStore::ROM::SQL::SpecHelper\#drop_gateway_schema \
RubyEventStore::ROM::SQL::SpecHelper\#close_gateway_connection
RubyEventStore::ROM::SQL::SpecHelper\#close_gateway_connection \
RubyEventStore::ROM::SQL::UnitOfWork\#commit!
SUBJECT ?= RubyEventStore::ROM*
DATABASE_URL ?= sqlite::memory:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,25 @@

run 'CREATE EXTENSION IF NOT EXISTS pgcrypto;' if postgres

create_table :event_store_events_in_streams do
primary_key :id, type: :Bignum
create_table? :event_store_events_in_streams do
primary_key :id, type: :Bignum, null: false

column :stream, String, null: false
column :position, Integer, null: true
column :position, Integer

if postgres
column :event_id, :uuid, null: false, index: true
column :event_id, :uuid, null: false
else
column :event_id, String, size: 36, null: false, index: true
column :event_id, String, size: 36, null: false
end

column :created_at, DateTime, null: false, index: true
column :created_at, DateTime, null: false, index: 'index_event_store_events_in_streams_on_created_at'

index %i[stream position], unique: true
index %i[stream position], unique: true, name: 'index_event_store_events_in_streams_on_stream_and_position'
index %i[stream event_id], unique: true, name: 'index_event_store_events_in_streams_on_stream_and_event_id'
end

create_table :event_store_events do
create_table? :event_store_events do
if postgres
column :id, :uuid, default: Sequel.function(:gen_random_uuid), primary_key: true
else
Expand All @@ -35,7 +35,7 @@
column :event_type, String, null: false
column :metadata, String, text: true
column :data, String, text: true, null: false
column :created_at, DateTime, null: false, index: true
column :created_at, DateTime, null: false, index: 'index_event_store_events_on_created_at'

if sqlite # TODO: Is this relevant without ActiveRecord?
index :id, unique: true
Expand Down
8 changes: 6 additions & 2 deletions ruby_event_store-rom/lib/ruby_event_store/rom.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,18 @@
module RubyEventStore
module ROM
class Env
attr_accessor :container, :lock
attr_accessor :container

def initialize(container)
@container = container
@lock = Mutex.new

container.register(:unique_violation_error_handlers, Set.new)
container.register(:not_found_error_handlers, Set.new)
container.register(:logger, Logger.new(STDOUT).tap { |logger| logger.level = Logger::WARN })
end

def logger
container[:logger]
end

def transaction(&block)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,33 @@ module RubyEventStore
module ROM
module SQL
class UnitOfWork < ROM::UnitOfWork
def commit!(gateway, queue, **options)
if gateway.connection.database_type =~ /mysql/
env.lock.synchronize { super }
else
super
def commit!(gateway, changesets, **options)
# Committing changesets concurrently causes MySQL deadlocks
# which are not caught and retried by Sequel's built-in
# :retry_on option. This appears to be a result of how ROM
# handles exceptions which don't bubble up so that Sequel
# can retry transactions with the :retry_on option when there's
# a deadlock.
#
# This is exacerbated by the fact that changesets insert multiple
# tuples with individual INSERT statements because ROM specifies
# to Sequel to return a list of primary keys created. The likelihood
# of a deadlock is reduced with batched INSERT statements.
#
# For this reason we need to manually insert changeset records to avoid
# MySQL deadlocks or to allow Sequel to retry transactions
# when the :retry_on option is specified.
options.merge!(
retry_on: Sequel::SerializationFailure,
before_retry: -> (num, ex) {
env.logger.warn("RETRY TRANSACTION [#{self.class.name} => #{ex.class.name}] #{ex.message}")
}
)

gateway.transaction(options) do
changesets.each do |changeset|
changeset.relation.multi_insert(changeset.to_a)
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@ def append_to_stream(events, stream, expected_version)
events = normalize_to_array(events)
event_ids = events.map(&:event_id)

@rom.transaction do |queue|
queue << @events.create_changeset(events)
queue << @stream_entries.create_changeset(event_ids, stream, expected_version, global_stream: true)
@rom.transaction do |changesets|
# Create changesets inside transaction because
# we want to find the last position (a.k.a. version)
# again if the transaction is retried due to a
# deadlock in MySQL
changesets << @events.create_changeset(events)
changesets << @stream_entries.create_changeset(event_ids, stream, expected_version, global_stream: true)
end

self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@ class Create < ::ROM::Changeset::Create
map do
rename_keys event_id: :id
accept_keys %i[id data metadata event_type]
add_timestamps
end
end

def create(serialized_records)
create_changeset(serialized_records).commit
map do |tuple|
Hash(created_at: Time.now).merge(tuple)
end
end

def create_changeset(serialized_records)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ module ROM
module Repositories
class StreamEntries < ::ROM::Repository[:stream_entries]
class Create < ::ROM::Changeset::Create
map do
add_timestamps
map do |tuple|
Hash(created_at: Time.now).merge(tuple)
end
end

Expand Down
9 changes: 4 additions & 5 deletions ruby_event_store-rom/lib/ruby_event_store/rom/sql.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ def configure(env)
# See: https://github.com/jeremyevans/sequel/blob/master/doc/transactions.rdoc
env.register_unit_of_work_options(
class: UnitOfWork,
savepoint: true,
retry_on: Sequel::SerializationFailure
savepoint: true
)

env.register_error_handler :unique_violation, -> ex {
Expand Down Expand Up @@ -58,17 +57,17 @@ def establish_gateway_connection
# seems to lose the "preconnect concurrently" setting
gateway.connection.pool.send(:preconnect, true)
end

def load_gateway_schema
gateway.run_migrations
end

def drop_gateway_schema
gateway.connection.drop_table?('event_store_events')
gateway.connection.drop_table?('event_store_events_in_streams')
gateway.connection.drop_table?('schema_migrations')
end

# See: https://github.com/rom-rb/rom-sql/blob/master/spec/shared/database_setup.rb
def close_gateway_connection
gateway.connection.disconnect
Expand Down
8 changes: 4 additions & 4 deletions ruby_event_store-rom/lib/ruby_event_store/rom/unit_of_work.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ def initialize(rom: ROM.env)
def call(**options)
gateway = @env.container.gateways.fetch(options.delete(:gateway){:default})

yield(queue = [])
yield(changesets = [])

commit!(gateway, queue, options)
commit!(gateway, changesets, options)
end

def commit!(gateway, queue, **options)
gateway.connection.transaction(options) { queue.each(&:commit) }
def commit!(gateway, changesets, **options)
gateway.transaction(options) { changesets.each(&:commit) }
end
end
end
Expand Down
28 changes: 28 additions & 0 deletions ruby_event_store-rom/spec/rom/env_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
require 'spec_helper'

module RubyEventStore::ROM
RSpec.describe Env do
include SchemaHelper

around(:each) do |example|
begin
establish_database_connection
load_database_schema
example.run
ensure
drop_database
end
end

let(:container) { ROM.container }
let(:instance) { Env.new(container) }

specify '#container gives access to ROM container' do
expect(instance.container).to be_a(::ROM::Container)
end

specify '#logger gives access to Logger' do
expect(instance.logger).to be_a(Logger)
end
end
end
6 changes: 2 additions & 4 deletions ruby_event_store-rom/spec/rom/event_repository_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ module RubyEventStore::ROM
]

repo = Repositories::Events.new(container)

events.each(&repo.method(:create))
repo.create_changeset(events).commit

expect(repo.events.to_a.size).to eq(3)

Expand Down Expand Up @@ -149,8 +148,7 @@ module RubyEventStore::ROM
]

repo = Repositories::Events.new(container)

events.each(&repo.method(:create))
repo.create_changeset(events).commit

expect(repo.events.to_a.size).to eq(3)

Expand Down
4 changes: 3 additions & 1 deletion ruby_event_store-rom/spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
adapter_name, # :sql, :memory
ENV['DATABASE_URL'],
max_connections: ENV['DATABASE_URL'] =~ /sqlite/ ? 1 : 5,
preconnect: :concurrently
preconnect: :concurrently,
# sql_mode: %w[NO_AUTO_VALUE_ON_ZERO STRICT_ALL_TABLES]
)
# $stdout.sync = true
# rom.default.use_logger Logger.new(STDOUT)
rom.default.run_migrations if adapter_name == :sql

Expand Down