Skip to content

Commit

Permalink
Mysql Support
Browse files Browse the repository at this point in the history
  • Loading branch information
Kareem Kouddous committed Jul 24, 2014
1 parent a41ac8f commit afe361d
Show file tree
Hide file tree
Showing 14 changed files with 185 additions and 90 deletions.
10 changes: 2 additions & 8 deletions .travis.yml
@@ -1,24 +1,18 @@
language: ruby
bundler_args: --without development
script: bundle exec rspec spec
env: JRUBY_OPTS="--server --debug -J-Xms512m -J-Xmx1024m" TIMEOUT=10
env: TIMEOUT=10
rvm:
- ruby-1.9.3
- ruby-2.0.0
- ruby-2.1.0
# - jruby
# - jruby-head
# - ruby-head
gemfile:
- gemfiles/mongoid31.gemfile
- gemfiles/mongoid40.gemfile
- gemfiles/active_record32.gemfile
- gemfiles/active_record40.gemfile
- gemfiles/active_record40_mysql.gemfile
services:
- mongodb
- rabbitmq
- redis-server
matrix:
allow_failures:
- rvm: ruby-head
- rvm: jruby-head
27 changes: 27 additions & 0 deletions gemfiles/active_record40_mysql.gemfile
@@ -0,0 +1,27 @@
source 'https://rubygems.org'
gemspec

gem 'rake'
gem 'rspec', '~> 2.99.0'
gem 'rspec-retry', :require => false
gem 'mocha', :require => false

gem 'activerecord', '~> 4.0'
gem 'database_cleaner'
gem 'simplecov', :require => false
gem 'coveralls', :require => false

platforms :ruby do
gem 'mysql2'
end

platforms :jruby do
gem 'jruby-openssl'
gem 'activerecord-jdbcpostgresql-adapter'
gem 'hot_bunnies'
end

group :development do
gem 'pry'
gem 'colorize'
end
1 change: 1 addition & 0 deletions lib/promiscuous/publisher/model/base.rb
Expand Up @@ -42,6 +42,7 @@ def id

def sync(options={}, &block)
raise "Model cannot be dirty (have changes) when syncing" if @instance.changed?
raise "Model has to be reloaded if it was saved" if @instance.previous_changes.present?

# We can use the ephemeral because both are mongoid and ephemerals are atomic operations.
Promiscuous::Publisher::Operation::Ephemeral.new(:instance => @instance, :operation => :update).execute
Expand Down
65 changes: 53 additions & 12 deletions lib/promiscuous/publisher/operation/active_record.rb
Expand Up @@ -59,6 +59,11 @@ def release_savepoint
with_promiscuous_transaction_context { |tx| tx.commit }
end

def supports_returning_statments?
@supports_returning_statments ||= ["ActiveRecord::ConnectionAdapters::PostgreSQLAdapter",
"ActiveRecord::ConnectionAdapters::OracleEnhancedAdapter"].include?(self.class.name)
end

alias_method :insert_without_promiscuous, :insert
alias_method :update_without_promiscuous, :update
alias_method :delete_without_promiscuous, :delete
Expand Down Expand Up @@ -152,10 +157,21 @@ def initialize(arel, name, pk, id_value, sequence_name, binds, options={})
def db_operation_and_select
# XXX This is only supported by Postgres and should be in the postgres driver
@connection.transaction do
@connection.exec_insert("#{@connection.to_sql(@arel, @binds)} RETURNING *", @operation_name, @binds).tap do |result|
@instances = result.map do |row|
instance = model.instantiate(row)
instance
if @connection.supports_returning_statments?
@connection.exec_insert("#{@connection.to_sql(@arel, @binds)} RETURNING *", @operation_name, @binds).tap do |result|
@instances = result.map do |row|
instance = model.instantiate(row)
instance
end
end
else
@connection.exec_insert("#{@connection.to_sql(@arel, @binds)}", @operation_name, @binds)

id = @binds.select { |k,v| k.name == 'id' }.first.last rescue nil
id ||= @connection.instance_eval { @connection.last_id }
id.tap do |last_id|
result = @connection.exec_query("SELECT * FROM #{model.table_name} WHERE #{@pk} = #{last_id}")
@instances = result.map { |row| model.instantiate(row) }
end
end
end
Expand All @@ -168,6 +184,7 @@ class PromiscuousUpdateOperation < PromiscousOperation
def initialize(arel, name, binds, options={})
super
@operation = :update
return if Promiscuous.disabled?
raise unless @arel.is_a?(Arel::UpdateManager)
end

Expand All @@ -191,16 +208,31 @@ def any_published_field_changed?
(updated_fields_in_query.keys & model.published_db_fields).present?
end

def sql_select_statment
arel = @arel.dup
arel.instance_eval { @ast = @ast.dup }
arel.ast.values = []
arel.to_sql.sub(/^UPDATE /, 'SELECT * FROM ')
end

def db_operation_and_select
# TODO this should be in the postgres driver (to also leverage the cache)
@arel.ast.values << Arel::Nodes::SqlLiteral.new("\"#{Promiscuous::Config.version_field}\" = COALESCE(\"#{Promiscuous::Config.version_field}\", 0) + 1")

@connection.exec_query("#{@connection.to_sql(@arel, @binds)} RETURNING *", @operation_name, @binds).tap do |result|
@instances = result.map { |row| model.instantiate(row) }
end.rows.size
@arel.ast.values << Arel::Nodes::SqlLiteral.new("#{Promiscuous::Config.version_field} = COALESCE(#{Promiscuous::Config.version_field}, 0) + 1")

if @connection.supports_returning_statments?
@connection.exec_query("#{@connection.to_sql(@arel, @binds)} RETURNING *", @operation_name, @binds).tap do |result|
@instances = result.map { |row| model.instantiate(row) }
end.rows.size
else
@connection.exec_update(@connection.to_sql(@arel, @binds), @operation_name, @binds).tap do
result = @connection.exec_query(sql_select_statment, @operation_name)
@instances = result.map { |row| model.instantiate(row) }
end
end
end

def execute(&db_operation)
return db_operation.call if Promiscuous.disabled?
return db_operation.call unless model
return db_operation.call unless any_published_field_changed?
super
Expand All @@ -214,11 +246,20 @@ def initialize(arel, name, binds, options={})
raise unless @arel.is_a?(Arel::DeleteManager)
end

def sql_select_statment
@connection.to_sql(@arel.dup, @binds.dup).sub(/^DELETE /, 'SELECT * ')
end

def db_operation_and_select
# XXX This is only supported by Postgres.
@connection.exec_query("#{@connection.to_sql(@arel, @binds)} RETURNING *", @operation_name, @binds).tap do |result|
if @connection.supports_returning_statments?
@connection.exec_query("#{@connection.to_sql(@arel, @binds)} RETURNING *", @operation_name, @binds).tap do |result|
@instances = result.map { |row| model.instantiate(row) }
end.rows.size
else
result = @connection.exec_query(sql_select_statment, @operation_name, @binds)
@instances = result.map { |row| model.instantiate(row) }
end.rows.size
@connection.exec_delete(@connection.to_sql(@arel, @binds), @operation_name, @binds)
end
end
end

Expand Down
14 changes: 6 additions & 8 deletions lib/promiscuous/publisher/transport/persistence/active_record.rb
Expand Up @@ -2,19 +2,17 @@ class Promiscuous::Publisher::Transport::Persistence::ActiveRecord
def save(batch)
check_schema

q = "INSERT INTO #{table} (\"batch\") " +
"VALUES ('#{batch.dump}') RETURNING id"
q = "INSERT INTO #{table} (batch) " +
"VALUES ('#{batch.dump}')"

result = connection.exec_query(q, 'Promiscuous Recovery Save')

batch.id = result.rows.first.first.to_i
batch.id = connection.insert_sql(q, 'Promiscuous Recovery Save')
end

def expired
check_schema

q = "SELECT id, p.batch FROM #{table} p " +
"WHERE at < current_timestamp - #{Promiscuous::Config.recovery_timeout} * INTERVAL '1 second'"
"WHERE at < CURRENT_TIMESTAMP - INTERVAL '#{Promiscuous::Config.recovery_timeout}' second"

connection.exec_query(q, 'Promiscuous Recovery Expired').rows
end
Expand All @@ -24,7 +22,7 @@ def delete(batch)

q = "DELETE FROM #{table} WHERE id = #{batch.id}"

connection.exec_query(q, 'Promiscuous Recovery Delete')
connection.exec_delete(q, 'Promiscuous Recovery Delete', [])
end

private
Expand All @@ -37,7 +35,7 @@ def check_schema
Promiscuous requires the following migration to be run:
create_table :_promiscuous do |t|
t.string :batch
t.timestamp :at, :default => :now
t.timestamp :at, 'TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP'
end
help
end
Expand Down
3 changes: 2 additions & 1 deletion lib/promiscuous/publisher/transport/worker.rb
Expand Up @@ -30,7 +30,8 @@ def main_loop
Promiscuous::Config.error_notifier.call(e)
end
end
ActiveRecord::Base.connection.close
ensure
ActiveRecord::Base.clear_active_connections!
end
end

3 changes: 2 additions & 1 deletion lib/promiscuous/subscriber/model/active_record.rb
Expand Up @@ -33,7 +33,8 @@ def __promiscuous_fetch_existing(id)
end

def __promiscuous_with_pooled_connection
ActiveRecord::Base.connection_pool.with_connection { yield }
yield
ActiveRecord::Base.clear_active_connections!
end
end
end
12 changes: 10 additions & 2 deletions spec/integration/manual_sync_spec.rb
Expand Up @@ -12,17 +12,25 @@
without_promiscuous { pub_update = PublisherModel.first; pub_update.update_attributes(:field_1 => 'hello') }

# Ensure ordering
pub_update.promiscuous.sync; pub_create.promiscuous.sync
PublisherModel.find(pub_update.id).promiscuous.sync; PublisherModel.find(pub_create.id).reload.promiscuous.sync

eventually { SubscriberModel.first.field_1.should == 'hello' }

PublisherModel.first.update_attributes(:field_1 => 'ohai')

eventually { SubscriberModel.first.field_1.should == 'ohai' }
end

it 'ignores changes to object if they are not persisted' do
it 'prevents syncing an object with changes' do
pub = without_promiscuous { PublisherModel.create(:field_1 => 'hello') }
pub.field_1 = 'bye'

expect { pub.promiscuous.sync }.to raise_error
end

it 'prevents syncing an object that was persisted' do
pub = without_promiscuous { PublisherModel.create(:field_1 => 'hello') }

expect { pub.promiscuous.sync }.to raise_error
end
end
1 change: 0 additions & 1 deletion spec/integration/observer_spec.rb
Expand Up @@ -3,7 +3,6 @@
describe Promiscuous do
before { load_models; load_observers }
before { use_real_backend }

before { run_subscriber_worker! }

context 'when creating' do
Expand Down
52 changes: 1 addition & 51 deletions spec/spec_helper/active_record.rb
Expand Up @@ -18,54 +18,4 @@
ActiveRecord::Base.connection.drop_database(db_settings[:database]) rescue nil
ActiveRecord::Base.connection.create_database(db_settings[:database])

class PromiscuousMigration < ActiveRecord::Migration
def change
[:publisher_models, :publisher_model_others,
:subscriber_models, :subscriber_model_others,
:publisher_dsl_models, :subscriber_dsl_models,
:publisher_another_dsl_models, :subscriber_another_dsl_models,
:publisher_model_without_subscribers].each do |table|
create_table table, :force => true do |t|
t.string :unpublished
t.string :field_1
t.string :field_1_derived
t.string :field_2
t.string :field_3
t.string :child_field_1
t.string :child_field_2
t.string :child_field_3
t.integer :publisher_id
t.integer :_v, :limit => 8, :default => 1
end

create_table :publisher_model_belongs_tos, :force => true do |t|
t.integer :publisher_model_id
t.integer :_v, :limit => 8, :default => 1
end

create_table :subscriber_model_belongs_tos, :force => true do |t|
t.integer :publisher_model_id
t.integer :_v, :limit => 8
end

create_table :_promiscuous, :force => true do |t|
t.string :batch
t.timestamp :at, :default => :now
end
end
end

migrate :up
end

DatabaseCleaner.strategy = :truncation

RSpec.configure do |config|
config.before(:each) do
DatabaseCleaner.start
end

config.after(:each) do
DatabaseCleaner.clean rescue nil
end
end
load 'spec/spec_helper/sql.rb'
1 change: 1 addition & 0 deletions spec/spec_helper/active_record40_mysql.rb
22 changes: 22 additions & 0 deletions spec/spec_helper/active_record_mysql.rb
@@ -0,0 +1,22 @@
require 'active_record'

if ENV['LOGGER_LEVEL']
ActiveRecord::Base.logger = Logger.new(STDERR)
ActiveRecord::Base.logger.level = ENV['LOGGER_LEVEL'].to_i
end

db_settings = {
:adapter => "mysql2",
:database => "promiscuous",
:username => "root",
:password => nil,
:encoding => "utf8",
:pool => 20,
}

ActiveRecord::Base.establish_connection(db_settings.merge('database' => 'mysql'))
ActiveRecord::Base.connection.drop_database(db_settings[:database]) rescue nil
ActiveRecord::Base.connection.create_database(db_settings[:database])
ActiveRecord::Base.establish_connection(db_settings)

load 'spec/spec_helper/sql.rb'

0 comments on commit afe361d

Please sign in to comment.