Skip to content
Permalink
Browse files
MDEV-18631 Fix streaming replication with wsrep_gtid_mode=ON
With wsrep_gtid_mode=ON, the appropriate commit hooks were not
called in all cases for applied streaming transactions.

As a fix, removed all special handling of commit order critical
section from Wsrep_high_priority_service and Wsrep_storage_service.
Now commit order critical section is always entered in ha_commit_trans().

Check for wsrep_run_commit_hook is now done in handler.cc, log.cc.
This makes it explicit that the transaction is an active wsrep
transaction which must go through commit hooks.
  • Loading branch information
temeo authored and Jan Lindström committed Mar 4, 2019
1 parent fa52846 commit a8ff4f2
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 125 deletions.
@@ -1207,8 +1207,9 @@ void trans_register_ha(THD *thd, bool all, handlerton *ht_arg)

static int prepare_or_error(handlerton *ht, THD *thd, bool all)
{
#ifdef WITH_WSREP
if (WSREP(thd) && ht->flags & HTON_WSREP_REPLICATION &&
#ifdef WITH_WSREP
const bool run_wsrep_hooks= wsrep_run_commit_hook(thd, all);
if (run_wsrep_hooks && ht->flags & HTON_WSREP_REPLICATION &&
wsrep_before_prepare(thd, all))
{
return(1);
@@ -1222,7 +1223,7 @@ static int prepare_or_error(handlerton *ht, THD *thd, bool all)
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
}
#ifdef WITH_WSREP
if (WSREP(thd) && ht->flags & HTON_WSREP_REPLICATION &&
if (run_wsrep_hooks && !err && ht->flags & HTON_WSREP_REPLICATION &&
wsrep_after_prepare(thd, all))
{
err= 1;
@@ -1369,6 +1370,9 @@ int ha_commit_trans(THD *thd, bool all)
Ha_trx_info *ha_info= trans->ha_list;
bool need_prepare_ordered, need_commit_ordered;
my_xid xid;
#ifdef WITH_WSREP
const bool run_wsrep_hooks= wsrep_run_commit_hook(thd, all);
#endif /* WITH_WSREP */
DBUG_ENTER("ha_commit_trans");
DBUG_PRINT("info",("thd: %p option_bits: %lu all: %d",
thd, (ulong) thd->variables.option_bits, all));
@@ -1424,7 +1428,7 @@ int ha_commit_trans(THD *thd, bool all)
if (is_real_trans)
thd->transaction.cleanup();
#ifdef WITH_WSREP
if (WSREP(thd) && all && !error)
if (wsrep_is_active(thd) && is_real_trans && !error)
{
wsrep_commit_empty(thd, all);
}
@@ -1519,11 +1523,7 @@ int ha_commit_trans(THD *thd, bool all)
This commit will not go through log_and_order() where wsrep commit
ordering is normally done. Commit ordering must be done here.
*/
bool run_wsrep_commit= (WSREP(thd) &&
rw_ha_count &&
wsrep_thd_is_local(thd) &&
wsrep_has_changes(thd, all));
if (run_wsrep_commit)
if (run_wsrep_hooks)
error= wsrep_before_commit(thd, all);
if (error)
{
@@ -1533,8 +1533,8 @@ int ha_commit_trans(THD *thd, bool all)
#endif /* WITH_WSREP */
error= ha_commit_one_phase(thd, all);
#ifdef WITH_WSREP
if (run_wsrep_commit)
error= wsrep_after_commit(thd, all);
if (run_wsrep_hooks)
error= error || wsrep_after_commit(thd, all);
#endif /* WITH_WSREP */
goto done;
}
@@ -1567,7 +1567,7 @@ int ha_commit_trans(THD *thd, bool all)
DBUG_EXECUTE_IF("crash_commit_after_prepare", DBUG_SUICIDE(););

#ifdef WITH_WSREP
if (!error && WSREP_ON)
if (run_wsrep_hooks && !error)
{
wsrep::seqno const s= wsrep_xid_seqno(thd->wsrep_xid);
if (!s.is_undefined())
@@ -1584,7 +1584,7 @@ int ha_commit_trans(THD *thd, bool all)
goto done;
}
#ifdef WITH_WSREP
if (wsrep_before_commit(thd, all))
if (run_wsrep_hooks && (error = wsrep_before_commit(thd, all)))
goto wsrep_err;
#endif /* WITH_WSREP */
DEBUG_SYNC(thd, "ha_commit_trans_before_log_and_order");
@@ -1600,10 +1600,10 @@ int ha_commit_trans(THD *thd, bool all)

error= commit_one_phase_2(thd, all, trans, is_real_trans) ? 2 : 0;
#ifdef WITH_WSREP
if (error || wsrep_after_commit(thd, all))
if (run_wsrep_hooks && (error || (error = wsrep_after_commit(thd, all))))
{
mysql_mutex_lock(&thd->LOCK_thd_data);
if (thd->wsrep_trx().state() == wsrep::transaction::s_must_abort)
if (wsrep_must_abort(thd))
{
mysql_mutex_unlock(&thd->LOCK_thd_data);
(void)tc_log->unlog(cookie, xid);
@@ -1636,7 +1636,7 @@ int ha_commit_trans(THD *thd, bool all)
#ifdef WITH_WSREP
wsrep_err:
mysql_mutex_lock(&thd->LOCK_thd_data);
if (thd->wsrep_trx().state() == wsrep::transaction::s_must_abort)
if (run_wsrep_hooks && wsrep_must_abort(thd))
{
WSREP_DEBUG("BF abort has happened after prepare & certify");
mysql_mutex_unlock(&thd->LOCK_thd_data);
@@ -1672,7 +1672,8 @@ int ha_commit_trans(THD *thd, bool all)
thd->mdl_context.release_lock(mdl_request.ticket);
}
#ifdef WITH_WSREP
if (WSREP(thd) && all && !error && (rw_ha_count == 0))
if (wsrep_is_active(thd) && is_real_trans && !error && (rw_ha_count == 0) &&
wsrep_not_committed(thd))
{
wsrep_commit_empty(thd, all);
}
@@ -7686,7 +7686,7 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
{
int is_leader= queue_for_group_commit(entry);
#ifdef WITH_WSREP
if (is_leader >= 0 &&
if (wsrep_run_commit_hook(entry->thd, true) && is_leader >= 0 &&
wsrep_ordered_commit(entry->thd, entry->all, wsrep_apply_error()))
return true;
#endif /* WITH_WSREP */
@@ -195,7 +195,8 @@ int Wsrep_high_priority_service::start_transaction(
const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta)
{
DBUG_ENTER(" Wsrep_high_priority_service::start_transaction");
DBUG_RETURN(m_thd->wsrep_cs().start_transaction(ws_handle, ws_meta));
DBUG_RETURN(m_thd->wsrep_cs().start_transaction(ws_handle, ws_meta) ||
trans_begin(m_thd));
}

const wsrep::transaction& Wsrep_high_priority_service::transaction() const
@@ -204,14 +205,23 @@ const wsrep::transaction& Wsrep_high_priority_service::transaction() const
DBUG_RETURN(m_thd->wsrep_trx());
}

void Wsrep_high_priority_service::adopt_transaction(const wsrep::transaction& transaction)
int Wsrep_high_priority_service::adopt_transaction(
const wsrep::transaction& transaction)
{
DBUG_ENTER(" Wsrep_high_priority_service::adopt_transaction");
/* Adopt transaction first to set up transaction meta data for
trans begin. If trans_begin() fails for some reason, roll back
the wsrep transaction before return. */
m_thd->wsrep_cs().adopt_transaction(transaction);
DBUG_VOID_RETURN;
int ret= trans_begin(m_thd);
if (ret)
{
m_thd->wsrep_cs().before_rollback();
m_thd->wsrep_cs().after_rollback();
}
DBUG_RETURN(ret);
}


int Wsrep_high_priority_service::append_fragment_and_commit(
const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta,
@@ -254,23 +264,8 @@ int Wsrep_high_priority_service::append_fragment_and_commit(
ws_meta, true);
}

if (!ret)
{
DBUG_ASSERT(wsrep_thd_trx_seqno(m_thd) > 0);
if (!do_binlog_commit)
{
ret= wsrep_before_commit(m_thd, true);
}
ret= ret || trans_commit(m_thd);
if (!do_binlog_commit)
{
if (opt_log_slave_updates)
{
ret= ret || wsrep_ordered_commit(m_thd, true, wsrep_apply_error());
}
ret= ret || wsrep_after_commit(m_thd, true);
}
}
ret= ret || trans_commit(m_thd);

m_thd->wsrep_cs().after_applying();
m_thd->mdl_context.release_transactional_locks();

@@ -298,47 +293,39 @@ int Wsrep_high_priority_service::commit(const wsrep::ws_handle& ws_handle,
thd->wsrep_cs().prepare_for_ordering(ws_handle, ws_meta, true);
thd_proc_info(thd, "committing");

int ret= 0;
const bool is_ordered= !ws_meta.seqno().is_undefined();
/* If opt_log_slave_updates is not on, applier does not write
anything to binlog cache and neither wsrep_before_commit()
nor wsrep_after_commit() we be reached from binlog code
path for applier. Therefore run wsrep_before_commit()
and wsrep_after_commit() here. wsrep_ordered_commit()
will be called from wsrep_ordered_commit_if_no_binlog(). */
if (!opt_log_slave_updates && !opt_bin_log && is_ordered)
{
if (m_thd->transaction.all.no_2pc == false)
{
ret= wsrep_before_prepare(thd, true);
ret= ret || wsrep_after_prepare(thd, true);
}
ret= ret || wsrep_before_commit(thd, true);
}
ret= ret || trans_commit(thd);
int ret= trans_commit(thd);

if (ret == 0)
{
m_rgi->cleanup_context(thd, 0);
}

if (ret == 0 && !opt_log_slave_updates && !opt_bin_log && is_ordered)
{
ret= wsrep_after_commit(thd, true);
}

m_thd->mdl_context.release_transactional_locks();

thd_proc_info(thd, "wsrep applier committed");

if (!is_ordered)
{
/* Wsrep commit was not ordered so it does not go through commit time
hooks and remains active. Roll it back to make cleanup happen
in after_applying() call. */
m_thd->wsrep_cs().before_rollback();
m_thd->wsrep_cs().after_rollback();
}
else if (m_thd->wsrep_trx().state() == wsrep::transaction::s_executing)
{
/*
Wsrep commit was ordered but it did not go through commit time
hooks and remains active. Cycle through commit hooks to release
commit order and to make cleanup happen in after_applying() call.
This is a workaround for CTAS with empty result set.
*/
WSREP_DEBUG("Commit not finished for applier %llu", thd->thread_id);
ret= ret || m_thd->wsrep_cs().before_commit() ||
m_thd->wsrep_cs().ordered_commit() ||
m_thd->wsrep_cs().after_commit();
}

thd->lex->sql_command= SQLCOM_END;

must_exit_= check_exit_status();
DBUG_RETURN(ret);
@@ -380,6 +367,8 @@ int Wsrep_high_priority_service::apply_toi(const wsrep::ws_meta& ws_meta,
trans_commit(thd);

thd->close_temporary_tables();
thd->lex->sql_command= SQLCOM_END;

wsrep_set_SE_checkpoint(client_state.toi_meta().gtid());

must_exit_= check_exit_status();
@@ -36,7 +36,7 @@ class Wsrep_high_priority_service :
int start_transaction(const wsrep::ws_handle&,
const wsrep::ws_meta&);
const wsrep::transaction& transaction() const;
void adopt_transaction(const wsrep::transaction&);
int adopt_transaction(const wsrep::transaction&);
int apply_write_set(const wsrep::ws_meta&, const wsrep::const_buffer&) = 0;
int append_fragment_and_commit(const wsrep::ws_handle&,
const wsrep::ws_meta&,
@@ -147,39 +147,14 @@ int Wsrep_storage_service::commit(const wsrep::ws_handle& ws_handle,
WSREP_DEBUG("Storage service commit: %llu, %lld",
ws_meta.transaction_id().get(), ws_meta.seqno().get());
int ret= 0;
const bool do_binlog_commit= (opt_log_slave_updates && wsrep_gtid_mode);
const bool is_ordered= !ws_meta.seqno().is_undefined();
/*
Write skip event into binlog if gtid_mode is on. This is to
maintain gtid continuity.
*/
if (do_binlog_commit && is_ordered)
{
ret= wsrep_write_skip_event(m_thd);
}

if (!ret && is_ordered)
if (is_ordered)
{
ret= m_thd->wsrep_cs().prepare_for_ordering(ws_handle,
ws_meta, true);
ret= m_thd->wsrep_cs().prepare_for_ordering(ws_handle, ws_meta, true);
}

if (!ret)
{
if (!do_binlog_commit && is_ordered)
{
ret= wsrep_before_commit(m_thd, true);
}
ret= ret || trans_commit(m_thd);
if (!do_binlog_commit && is_ordered)
{
if (opt_log_slave_updates)
{
ret= ret || wsrep_ordered_commit(m_thd, true, wsrep_apply_error());
}
ret= ret || wsrep_after_commit(m_thd, true);
}
}
ret= ret || trans_commit(m_thd);

if (!is_ordered)
{

0 comments on commit a8ff4f2

Please sign in to comment.