Skip to content

Commit

Permalink
More precise dependency tracking of XA XID in parallel replication
Browse files Browse the repository at this point in the history
Keep track of each recently active XID, recording which worker it was queued
on. If an XID might still be active, choose the same worker to queue event
groups that refer to the same XID to avoid conflicts.

Otherwise, schedule the XID freely in the next round-robin slot.

This way, XA PREPARE can normally be scheduled without restrictions (unless
duplicate XID transactions come close together). This improves scheduling
and parallelism over the old method, where the worker thread to schedule XA
PREPARE on was fixed based on a hash value of the XID.

XA COMMIT will normally be scheduled on the same worker as XA PREPARE, but
can be a different one if the XA PREPARE is far back in the event history.

Signed-off-by: Kristian Nielsen <knielsen@knielsen-hq.org>
  • Loading branch information
knielsen committed Feb 27, 2024
1 parent 66d6cce commit 3a32fb5
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 11 deletions.
126 changes: 115 additions & 11 deletions sql/rpl_parallel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2324,6 +2324,75 @@ rpl_parallel_thread_pool::copy_pool_for_pfs(Relay_log_info *rli)
}
}


/*
Check when we have done a complete round of scheduling for workers
0, 1, ..., (rpl_thread_max-1), in this order.
This often occurs every rpl_thread_max event group, but XA XID dependency
restrictions can cause insertion of extra out-of-order worker scheduling
in-between the normal round-robin scheduling.
*/
void
rpl_parallel_entry::check_scheduling_generation(sched_bucket *cur)
{
uint32 idx= cur - rpl_threads;
DBUG_ASSERT(cur >= rpl_threads);
DBUG_ASSERT(cur < rpl_threads + rpl_thread_max);
if (idx == current_generation_idx)
{
++idx;
if (idx >= rpl_thread_max)
{
/* A new generation; all workers have been scheduled at least once. */
idx= 0;
++current_generation;
}
current_generation_idx= idx;
}
}


rpl_parallel_entry::sched_bucket *
rpl_parallel_entry::check_xa_xid_dependency(xid_t *xid)
{
uint64 cur_gen= current_generation;
my_off_t i= 0;
while (i < maybe_active_xid.elements)
{
/*
Purge no longer active XID from the list:
- In generation N, XID might have been scheduled for worker W.
- Events in generation (N+1) might run freely in parallel with W.
- Events in generation (N+2) will have done wait_for_prior_commit for
the event group with XID (or a later one), but the XID might still be
active for a bit longer after wakeup_prior_commit().
- Events in generation (N+3) will have done wait_for_prior_commit() for
an event in W _after_ the XID, so are sure not to see the XID active.
Therefore, XID can be safely scheduled to a different worker in
generation (N+3) when last prior use was in generation N (or earlier).
*/
xid_active_generation *a=
dynamic_element(&maybe_active_xid, i, xid_active_generation *);
if (a->generation + 3 <= cur_gen)
{
*a= *((xid_active_generation *)pop_dynamic(&maybe_active_xid));
continue;
}
if (xid->eq(&a->xid))
{
/* Update the last used generation and return the match. */
a->generation= cur_gen;
return a->thr;
}
++i;
}
/* No matching XID conflicts. */
return nullptr;
}


/*
Obtain a worker thread that we can queue an event to.
Expand Down Expand Up @@ -2368,17 +2437,36 @@ rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond,
if (gtid_ev->flags2 &
(Gtid_log_event::FL_COMPLETED_XA | Gtid_log_event::FL_PREPARED_XA))
{
/*
For XA COMMIT/ROLLBACK, choose the same bucket as the XA PREPARE,
overriding the round-robin scheduling.
*/
uint32 idx= my_hash_sort(&my_charset_bin, gtid_ev->xid.key(),
gtid_ev->xid.key_length()) % rpl_thread_max;
rpl_threads[idx].unlink();
thread_sched_fifo->append(rpl_threads + idx);
if ((cur_thr= check_xa_xid_dependency(&gtid_ev->xid)))
{
/*
A previously scheduled event group with the same XID might still be
active in a worker, so schedule this event group in the same worker
to avoid a conflict.
*/
cur_thr->unlink();
thread_sched_fifo->append(cur_thr);
}
else
{
/* Record this XID now active. */
xid_active_generation *a=
(xid_active_generation *)alloc_dynamic(&maybe_active_xid);
if (!a)
return NULL;
a->thr= cur_thr= thread_sched_fifo->head();
a->generation= current_generation;
a->xid.set(&gtid_ev->xid);
}
}
else
cur_thr= thread_sched_fifo->head();

check_scheduling_generation(cur_thr);
}
cur_thr= thread_sched_fifo->head();
else
cur_thr= thread_sched_fifo->head();


thr= cur_thr->thr;
if (thr)
Expand Down Expand Up @@ -2469,6 +2557,7 @@ free_rpl_parallel_entry(void *element)
dealloc_gco(e->current_gco);
e->current_gco= prev_gco;
}
delete_dynamic(&e->maybe_active_xid);
mysql_cond_destroy(&e->COND_parallel_entry);
mysql_mutex_destroy(&e->LOCK_parallel_entry);
my_free(e);
Expand Down Expand Up @@ -2522,11 +2611,26 @@ rpl_parallel::find(uint32 domain_id)
my_error(ER_OUTOFMEMORY, MYF(0), (int)(sizeof(*e)+count*sizeof(*p)));
return NULL;
}
/* Initialize a FIFO of scheduled worker threads. */
e->thread_sched_fifo = new (fifo) I_List<rpl_parallel_entry::sched_bucket>;
for (ulong i= 0; i < count; ++i)
e->thread_sched_fifo->push_back(::new (p+i) rpl_parallel_entry::sched_bucket);
/*
(We cycle the FIFO _before_ allocating next entry in
rpl_parallel_entry::choose_thread(). So initialize the FIFO with the
highest element at the front, just so that the first event group gets
scheduled on entry 0).
*/
e->thread_sched_fifo->
push_back(::new (p+count-1) rpl_parallel_entry::sched_bucket);
for (ulong i= 0; i < count-1; ++i)
e->thread_sched_fifo->
push_back(::new (p+i) rpl_parallel_entry::sched_bucket);
e->rpl_threads= p;
e->rpl_thread_max= count;
e->current_generation = 0;
e->current_generation_idx = 0;
init_dynamic_array2(PSI_INSTRUMENT_ME, &e->maybe_active_xid,
sizeof(rpl_parallel_entry::xid_active_generation),
0, count, 0, MYF(0));
e->domain_id= domain_id;
e->stop_on_error_sub_id= (uint64)ULONGLONG_MAX;
e->pause_sub_id= (uint64)ULONGLONG_MAX;
Expand Down
35 changes: 35 additions & 0 deletions sql/rpl_parallel.h
Original file line number Diff line number Diff line change
Expand Up @@ -326,10 +326,26 @@ struct rpl_parallel_thread_pool {


struct rpl_parallel_entry {
/*
A small struct to put worker threads references into a FIFO (using an
I_List) for round-robin scheduling.
*/
struct sched_bucket : public ilink {
sched_bucket() : thr(nullptr) { }
rpl_parallel_thread *thr;
};
/*
A struct to keep track of into which "generation" an XA XID was last
scheduled. A "generation" means that we know that every worker thread
slot in the rpl_parallel_entry was scheduled at least once. When more
that two generations have passed, we can safely reuse the XID in a
different worker.
*/
struct xid_active_generation {
uint64 generation;
sched_bucket *thr;
xid_t xid;
};

mysql_mutex_t LOCK_parallel_entry;
mysql_cond_t COND_parallel_entry;
Expand Down Expand Up @@ -373,6 +389,23 @@ struct rpl_parallel_entry {
sched_bucket *rpl_threads;
I_List<sched_bucket> *thread_sched_fifo;
uint32 rpl_thread_max;
/*
Keep track of all XA XIDs that may still be active in a worker thread.
The elements are of type xid_active_generation.
*/
DYNAMIC_ARRAY maybe_active_xid;
/*
Keeping track of the current scheduling generation.
A new generation means that every worker thread in the rpl_threads array
have been scheduled at least one event group.
When we have scheduled to slot current_generation_idx= 0, 1, ..., N-1 in this
order, we know that (at least) one generation has passed.
*/
uint64 current_generation;
uint32 current_generation_idx;

/*
The sub_id of the last transaction to commit within this domain_id.
Must be accessed under LOCK_parallel_entry protection.
Expand Down Expand Up @@ -426,6 +459,8 @@ struct rpl_parallel_entry {
/* The group_commit_orderer object for the events currently being queued. */
group_commit_orderer *current_gco;

void check_scheduling_generation(sched_bucket *cur);
sched_bucket *check_xa_xid_dependency(xid_t *xid);
rpl_parallel_thread * choose_thread(rpl_group_info *rgi, bool *did_enter_cond,
PSI_stage_info *old_stage,
Gtid_log_event *gtid_ev);
Expand Down

0 comments on commit 3a32fb5

Please sign in to comment.