Skip to content

Commit

Permalink
Support prepared transactions/two-phase commit on PostgreSQL, MySQL, …
Browse files Browse the repository at this point in the history
…and H2

This commit enables you to use prepared transactions using the
:prepare option to Database#transaction.  The :prepare option
value should be some transaction id string, which is used
in the SQL query when preparing a transaction.  On PostgreSQL
and MySQL, it is literalized as an SQL string, while on H2,
it is used literally.

Setting up a prepared transaction is fairly easy:

  DB.transaction(:prepare=>'some_transaction_id_string') do
    ...
  end

One a prepared transaction has been set up, it can be commited
using commit_prepared_transaction:

  DB.commit_prepared_transaction('some_transaction_id_string')

or rolled back with rollback_prepared_transaction:

  DB.rollback_prepared_transaction('some_transaction_id_string')

Note that if the transaction block itself rolls back the
transaction (by raising an exception), neither
commit_prepared_transaction or rollback_prepared_transaction
will work.

In order to support prepared transactions, the Database private
instance methods _transaction, begin_transaction,
commit_transaction, and rollback_transaction all take an optional
options hash.  External adapters, even if they don't support
prepared transactions, must add an optional options hash to their
implementation of these methods or they will probably break.
This commit modifies all built-in adapters have already to support
this new API.

This commit also modifies the H2 JDBC subadapter to support
savepoints.
  • Loading branch information
jeremyevans committed Jun 22, 2010
1 parent b938370 commit dc1ac38
Show file tree
Hide file tree
Showing 14 changed files with 182 additions and 46 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG
@@ -1,5 +1,7 @@
=== HEAD

* Support prepared transactions/two-phase commit on PostgreSQL, MySQL, and H2 (jeremyevans)

* Allow NULLS FIRST/LAST when ordering using the :nulls=>:first/:last option to asc and desc (jeremyevans)

* On PostgreSQL, if no :schema option is provided for #tables, #table_exists?, or #schema, assume all schemas except the default non-public ones (jeremyevans) (#305)
Expand Down
2 changes: 1 addition & 1 deletion lib/sequel/adapters/ado.rb
Expand Up @@ -68,7 +68,7 @@ def execute(sql, opts={})
# The ADO adapter's default provider doesn't support transactions, since it
# creates a new native connection for each query. So Sequel only attempts
# to use transactions if an explicit :provider is given.
def _transaction(conn)
def _transaction(conn, o={})
return super if opts[:provider]
th = Thread.current
begin
Expand Down
6 changes: 3 additions & 3 deletions lib/sequel/adapters/do.rb
Expand Up @@ -118,7 +118,7 @@ def uri(opts={})
# transactions. Unfortunately, it tries to create a new connection
# to do a transaction. So we close the connection created and
# substitute our own.
def begin_transaction(conn)
def begin_transaction(conn, opts={})
return super if supports_savepoints?
log_yield(TRANSACTION_BEGIN) do
t = ::DataObjects::Transaction.create_for_uri(uri)
Expand All @@ -131,7 +131,7 @@ def begin_transaction(conn)

# DataObjects requires transactions be prepared before being
# committed, so we do that.
def commit_transaction(t)
def commit_transaction(t, opts={})
return super if supports_savepoints?
log_yield(TRANSACTION_ROLLBACK) do
t.prepare
Expand All @@ -156,7 +156,7 @@ def log_connection_execute(conn, sql)
end

# We use the transactions rollback method to rollback.
def rollback_transaction(t)
def rollback_transaction(t, opts={})
return super if supports_savepoints?
log_yield(TRANSACTION_COMMIT){t.rollback}
end
Expand Down
6 changes: 3 additions & 3 deletions lib/sequel/adapters/firebird.rb
Expand Up @@ -102,12 +102,12 @@ def auto_increment_sql()
AUTO_INCREMENT
end

def begin_transaction(conn)
def begin_transaction(conn, opts={})
log_yield(TRANSACTION_BEGIN){conn.transaction}
conn
end

def commit_transaction(conn)
def commit_transaction(conn, opts={})
log_yield(TRANSACTION_COMMIT){conn.commit}
end

Expand Down Expand Up @@ -182,7 +182,7 @@ def restart_sequence_sql(name, opts={})
"ALTER SEQUENCE #{seq_name} RESTART WITH #{opts[:restart_position]}"
end

def rollback_transaction(conn)
def rollback_transaction(conn, opts={})
log_yield(TRANSACTION_ROLLBACK){conn.rollback}
end

Expand Down
32 changes: 32 additions & 0 deletions lib/sequel/adapters/jdbc/h2.rb
Expand Up @@ -6,6 +6,12 @@ module H2
module DatabaseMethods
PRIMARY_KEY_INDEX_RE = /\Aprimary_key/i.freeze

# Commit an existing prepared transaction with the given transaction
# identifier string.
def commit_prepared_transaction(transaction_id)
run("COMMIT TRANSACTION #{transaction_id}")
end

# H2 uses the :h2 database type.
def database_type
:h2
Expand All @@ -16,13 +22,39 @@ def dataset(opts=nil)
Sequel::JDBC::H2::Dataset.new(self, opts)
end

# Rollback an existing prepared transaction with the given transaction
# identifier string.
def rollback_prepared_transaction(transaction_id)
run("ROLLBACK TRANSACTION #{transaction_id}")
end

# H2 uses an IDENTITY type
def serial_primary_key_options
{:primary_key => true, :type => :identity}
end

# H2 supports prepared transactions
def supports_prepared_transactions?
true
end

# H2 supports savepoints
def supports_savepoints?
true
end

private

# If the :prepare option is given and we aren't in a savepoint,
# prepare the transaction for a two-phase commit.
def commit_transaction(conn, opts={})
if opts[:prepare] && Thread.current[:sequel_transaction_depth] <= 1
log_connection_execute(conn, "PREPARE COMMIT #{opts[:prepare]}")
else
super
end
end

# H2 needs to add a primary key column as a constraint
def alter_table_sql(table, op)
case op[:op]
Expand Down
6 changes: 3 additions & 3 deletions lib/sequel/adapters/jdbc/oracle.rb
Expand Up @@ -19,13 +19,13 @@ def dataset(opts=nil)
private

# Use JDBC connection's setAutoCommit to false to start transactions
def begin_transaction(conn)
def begin_transaction(conn, opts={})
log_yield(TRANSACTION_BEGIN){conn.setAutoCommit(false)}
conn
end

# Use JDBC connection's commit method to commit transactions
def commit_transaction(conn)
def commit_transaction(conn, opts={})
log_yield(TRANSACTION_COMMIT){conn.commit}
end

Expand All @@ -36,7 +36,7 @@ def remove_transaction(conn)
end

# Use JDBC connection's rollback method to rollback transactions
def rollback_transaction(conn)
def rollback_transaction(conn, opts={})
log_yield(TRANSACTION_ROLLBACK){conn.rollback}
end
end
Expand Down
6 changes: 3 additions & 3 deletions lib/sequel/adapters/oracle.rb
Expand Up @@ -74,12 +74,12 @@ def execute(sql, opts={})

private

def begin_transaction(conn)
def begin_transaction(conn, opts={})
log_yield(TRANSACTION_BEGIN){conn.autocommit = false}
conn
end

def commit_transaction(conn)
def commit_transaction(conn, opts={})
log_yield(TRANSACTION_COMMIT){conn.commit}
end

Expand All @@ -92,7 +92,7 @@ def remove_transaction(conn)
super
end

def rollback_transaction(conn)
def rollback_transaction(conn, opts={})
log_yield(TRANSACTION_ROLLBACK){conn.rollback}
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/sequel/adapters/shared/mssql.rb
Expand Up @@ -91,7 +91,7 @@ def begin_transaction_sql

# Commit the active transaction on the connection, does not commit/release
# savepoints.
def commit_transaction(conn)
def commit_transaction(conn, opts={})
log_connection_execute(conn, commit_transaction_sql) unless Thread.current[:sequel_transaction_depth] > 1
end

Expand Down
50 changes: 50 additions & 0 deletions lib/sequel/adapters/shared/mysql.rb
Expand Up @@ -30,6 +30,12 @@ def cast_type_literal(type)
CAST_TYPES[type] || super
end

# Commit an existing prepared transaction with the given transaction
# identifier string.
def commit_prepared_transaction(transaction_id)
run("XA COMMIT #{literal(transaction_id)}")
end

# MySQL uses the :mysql database type
def database_type
:mysql
Expand All @@ -52,6 +58,12 @@ def indexes(table, opts={})
indexes.reject{|k,v| remove_indexes.include?(k)}
end

# Rollback an existing prepared transaction with the given transaction
# identifier string.
def rollback_prepared_transaction(transaction_id)
run("XA ROLLBACK #{literal(transaction_id)}")
end

# Get version of MySQL server, used for determined capabilities.
def server_version
m = /(\d+)\.(\d+)\.(\d+)/.match(get(SQL::Function.new(:version)))
Expand All @@ -67,6 +79,11 @@ def tables(opts={})
metadata_dataset.with_sql('SHOW TABLES').server(opts[:server]).map{|r| m.call(r.values.first)}
end

# MySQL supports prepared transactions (two-phase commit) using XA
def supports_prepared_transactions?
true
end

# MySQL supports savepoints
def supports_savepoints?
true
Expand Down Expand Up @@ -117,13 +134,35 @@ def auto_increment_sql
AUTO_INCREMENT
end

# Use XA START to start a new prepared transaction if the :prepare
# option is given.
def begin_transaction(conn, opts={})
if s = opts[:prepare]
log_connection_execute(conn, "XA START #{literal(s)}")
conn
else
super
end
end

# MySQL doesn't allow default values on text columns, so ignore if it the
# generic text type is used
def column_definition_sql(column)
column.delete(:default) if column[:type] == File || (column[:type] == String && column[:text] == true)
super
end

# Prepare the XA transaction for a two-phase commit if the
# :prepare option is given.
def commit_transaction(conn, opts={})
if s = opts[:prepare]
log_connection_execute(conn, "XA END #{literal(s)}")
log_connection_execute(conn, "XA PREPARE #{literal(s)}")
else
super
end
end

# Use MySQL specific syntax for engine type and character encoding
def create_table_sql(name, generator, options = {})
engine = options.fetch(:engine, Sequel::MySQL.default_engine)
Expand Down Expand Up @@ -162,6 +201,17 @@ def index_definition_sql(table_name, index)
"CREATE #{index_type}INDEX #{index_name}#{using} ON #{quote_schema_table(table_name)} #{literal(index[:columns])}"
end

# Rollback the currently open XA transaction
def rollback_transaction(conn, opts={})
if s = opts[:prepare]
log_connection_execute(conn, "XA END #{literal(s)}")
log_connection_execute(conn, "XA PREPARE #{literal(s)}")
log_connection_execute(conn, "XA ROLLBACK #{literal(s)}")
else
super
end
end

# MySQL treats integer primary keys as autoincrementing.
def schema_autoincrementing_primary_key?(schema)
super and schema[:db_type] =~ /int/io
Expand Down
29 changes: 29 additions & 0 deletions lib/sequel/adapters/shared/postgres.rb
Expand Up @@ -162,6 +162,12 @@ module DatabaseMethods
RE_CURRVAL_ERROR = /currval of sequence "(.*)" is not yet defined in this session|relation "(.*)" does not exist/.freeze
SYSTEM_TABLE_REGEXP = /^pg|sql/.freeze

# Commit an existing prepared transaction with the given transaction
# identifier string.
def commit_prepared_transaction(transaction_id)
run("COMMIT PREPARED #{literal(transaction_id)}")
end

# Creates the function in the database. Arguments:
# * name : name of the function to create
# * definition : string definition of the function, or object file for a dynamically loaded C function.
Expand Down Expand Up @@ -319,6 +325,12 @@ def reset_primary_key_sequence(table)
get{setval(seq, db[table].select{coalesce(max(pk)+seq_ds.select{:increment_by}, seq_ds.select(:min_value))}, false)}
end

# Rollback an existing prepared transaction with the given transaction
# identifier string.
def rollback_prepared_transaction(transaction_id)
run("ROLLBACK PREPARED #{literal(transaction_id)}")
end

# PostgreSQL uses SERIAL psuedo-type instead of AUTOINCREMENT for
# managing incrementing primary keys.
def serial_primary_key_options
Expand All @@ -338,6 +350,13 @@ def server_version(server=nil)
@server_version
end

# PostgreSQL supports prepared transactions (two-phase commit) if
# max_prepared_transactions is greater than 0.
def supports_prepared_transactions?
return @supports_prepared_transactions if defined?(@supports_prepared_transactions)
@supports_prepared_transactions = self['SHOW max_prepared_transactions'].get.to_i > 0
end

# PostgreSQL supports savepoints
def supports_savepoints?
true
Expand Down Expand Up @@ -371,6 +390,16 @@ def tables(opts={})

private

# If the :prepare option is given and we aren't in a savepoint,
# prepare the transaction for a two-phase commit.
def commit_transaction(conn, opts={})
if opts[:prepare] && Thread.current[:sequel_transaction_depth] <= 1
log_connection_execute(conn, "PREPARE TRANSACTION #{literal(opts[:prepare])}")
else
super
end
end

# SQL statement to create database function.
def create_function_sql(name, definition, opts={})
args = opts[:args]
Expand Down
6 changes: 6 additions & 0 deletions lib/sequel/database/misc.rb
Expand Up @@ -79,6 +79,12 @@ def serial_primary_key_options
{:primary_key => true, :type => Integer, :auto_increment => true}
end

# Whether the database and adapter support prepared transactions
# (two-phase commit), false by default
def supports_prepared_transactions?
false
end

# Whether the database and adapter support savepoints, false by default
def supports_savepoints?
false
Expand Down

0 comments on commit dc1ac38

Please sign in to comment.