Skip to content

Commit a202371

Browse files
committed
MDEV-33757 Get rid of TrxUndoRsegs code
Post-push fix: purge queue array can't be fixed size, because the elements of the array is the analogue of undo logs, which must be processed in the order of transaction commits, and the array can contain more elements, than trx_sys.rseg_array. Also it's necessary to maintain min-heap property by the trx_no of transaction, which produced the first non-purged undo log in all rsegs. That's why the element of purge queue aray must contain not only trx_sys.rseg_array index, but also trx_no of committed transacion, i.e. the pair (trx_no, trx_sys.rseg_array index), which is encoded as uint64_t((trx_no << 8) | (trx_sys.rseg_array index)). Reviewed by: Marko Mäkelä
1 parent 9a4991a commit a202371

File tree

4 files changed

+67
-65
lines changed

4 files changed

+67
-65
lines changed

storage/innobase/include/trx0purge.h

Lines changed: 60 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -58,76 +58,71 @@ ulint trx_purge(ulint n_tasks, ulint history_size);
5858
/** The control structure used in the purge operation */
5959
class purge_sys_t
6060
{
61-
/** Min-heap based priority queue over fixed size array */
61+
/** Min-heap based priority queue of (trx_no, trx_sys.rseg_array index)
62+
pairs, ordered on trx_no. The highest 64-TRX_NO_SHIFT bits of each element is
63+
trx_no, the lowest 8 bits is rseg's index in trx_sys.rseg_array. */
6264
class purge_queue
6365
{
64-
/** Array of indexes in trx_sys.rseg_array. */
65-
alignas(CPU_LEVEL1_DCACHE_LINESIZE) byte m_array[TRX_SYS_N_RSEGS];
66-
/** Pointer to the end of m_array. */
67-
byte *m_end= m_array;
68-
6966
public:
70-
struct trx_rseg_cmp
71-
{
72-
/** Compare two trx_rseg_t* based on trx_no.
73-
@param lhs first index in trx_sys.rseg_array to compare
74-
@param rhs second index in trx_sys.rseg_array to compare
75-
@return whether lhs>rhs */
76-
bool operator()(const byte lhs, const byte rhs)
77-
{
78-
ut_ad(lhs < TRX_SYS_N_RSEGS);
79-
ut_ad(rhs < TRX_SYS_N_RSEGS);
80-
/* We can compare without trx_rseg_t::latch, because rseg last
81-
commit is always set before pushing rseg to purge queue. */
82-
return trx_sys.rseg_array[lhs].last_commit_and_offset >
83-
trx_sys.rseg_array[rhs].last_commit_and_offset;
84-
}
85-
};
86-
byte *begin() { return m_array; }
87-
byte *end() { return m_end; }
88-
const byte *c_begin() const { return m_array; }
89-
const byte *c_end() const { return m_end; }
90-
size_t size() const
91-
{
92-
size_t s= c_end() - c_begin();
93-
ut_ad(s <= TRX_SYS_N_RSEGS);
94-
return s;
95-
}
96-
bool empty() const { return !size(); }
97-
void clear() { m_end= m_array; }
67+
typedef std::vector<uint64_t, ut_allocator<uint64_t>> container_type;
68+
/** Number of bits reseved to shift trx_no in purge queue element */
69+
static constexpr unsigned TRX_NO_SHIFT= 8;
70+
71+
bool empty() const { return m_array.empty(); }
72+
void clear() { m_array.clear(); }
9873

99-
/** Push index of trx_sys.rseg_array into min-heap.
100-
@param i index to push */
101-
void push_rseg_index(byte i)
74+
/** Push (trx_no, trx_sys.rseg_array index) into min-heap.
75+
@param trx_no_rseg (trx_no << TRX_NO_SHIFT | (trx_sys.rseg_array index)) */
76+
void push_trx_no_rseg(container_type::value_type trx_no_rseg)
10277
{
103-
ut_ad(i < TRX_SYS_N_RSEGS);
104-
ut_ad(size() + 1 <= TRX_SYS_N_RSEGS);
105-
*m_end++= i;
106-
std::push_heap(begin(), end(), trx_rseg_cmp());
78+
m_array.push_back(trx_no_rseg);
79+
std::push_heap(m_array.begin(), m_array.end(),
80+
std::greater<container_type::value_type>());
10781
}
10882

10983
/** Push rseg to priority queue.
110-
@param rseg trx_rseg_t pointer to push */
111-
void push(const trx_rseg_t *rseg)
84+
@param trx_no trx_no of committed transaction
85+
@param rseg rseg of committed transaction*/
86+
void push(trx_id_t trx_no, const trx_rseg_t &rseg)
11287
{
113-
ut_ad(rseg >= trx_sys.rseg_array);
114-
ut_ad(rseg < trx_sys.rseg_array + TRX_SYS_N_RSEGS);
115-
byte i= byte(rseg - trx_sys.rseg_array);
116-
push_rseg_index(i);
88+
ut_ad(trx_no < 1ULL << (DATA_TRX_ID_LEN * CHAR_BIT));
89+
ut_ad(&rseg >= trx_sys.rseg_array);
90+
ut_ad(&rseg < trx_sys.rseg_array + TRX_SYS_N_RSEGS);
91+
push_trx_no_rseg(trx_no << TRX_NO_SHIFT |
92+
byte(&rseg - trx_sys.rseg_array));
93+
}
94+
95+
/** Extracts rseg from (trx_no, trx_sys.rseg_array index) pair.
96+
@param trx_no_rseg (trx_no << TRX_NO_SHIFT | (trx_sys.rseg_array index)
97+
@return pointer to rseg in trx_sys.rseg_array */
98+
static trx_rseg_t *rseg(container_type::value_type trx_no_rseg) {
99+
byte i= static_cast<byte>(trx_no_rseg);
100+
ut_ad(i < TRX_SYS_N_RSEGS);
101+
return &trx_sys.rseg_array[i];
117102
}
118103

119104
/** Pop rseg from priority queue.
120105
@return pointer to popped trx_rseg_t object */
121106
trx_rseg_t *pop()
122107
{
123108
ut_ad(!empty());
124-
std::pop_heap(begin(), end(), trx_rseg_cmp());
125-
byte i= *--m_end;
126-
ut_ad(i < TRX_SYS_N_RSEGS);
127-
return &trx_sys.rseg_array[i];
109+
std::pop_heap(m_array.begin(), m_array.end(),
110+
std::greater<container_type::value_type>());
111+
trx_rseg_t *r = rseg(m_array.back());
112+
m_array.pop_back();
113+
return r;
128114
}
115+
116+
/** Clone m_array.
117+
@return m_array clone */
118+
container_type clone_container() const{ return m_array; }
119+
120+
private:
121+
/** Array of (trx_no, trx_sys.rseg_array index) pairs. */
122+
container_type m_array;
129123
};
130124

125+
131126
public:
132127
/** latch protecting view, m_enabled */
133128
alignas(CPU_LEVEL1_DCACHE_LINESIZE) mutable srw_spin_lock latch;
@@ -243,20 +238,29 @@ class purge_sys_t
243238
record */
244239
uint16_t hdr_offset; /*!< Header byte offset on the page */
245240

246-
/** Binary min-heap of indexes in trx_sys.rseg_array, ordered on
247-
rseg_t::last_trx_no(). It is protected by the pq_mutex */
241+
/** Binary min-heap of (trx_no, trx_sys.rseg_array index) pairs, ordered on
242+
trx_no. It is protected by the pq_mutex */
248243
purge_queue purge_queue;
249244

250245
/** Mutex protecting purge_queue */
251246
mysql_mutex_t pq_mutex;
252247

253248
public:
249+
250+
void enqueue(trx_id_t trx_no, const trx_rseg_t &rseg) {
251+
mysql_mutex_assert_owner(&pq_mutex);
252+
purge_queue.push(trx_no, rseg);
253+
}
254+
254255
/** Push to purge queue without acquiring pq_mutex.
255256
@param rseg rseg to push */
256-
void enqueue(trx_rseg_t &rseg)
257-
{
257+
void enqueue(const trx_rseg_t &rseg) { enqueue(rseg.last_trx_no(), rseg); }
258+
259+
/** Clone purge queue container.
260+
@return purge queue container clone */
261+
purge_queue::container_type clone_queue_container() const {
258262
mysql_mutex_assert_owner(&pq_mutex);
259-
purge_queue.push(&rseg);
263+
return purge_queue.clone_container();
260264
}
261265

262266
/** Acquare purge_queue_mutex */

storage/innobase/include/trx0rseg.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ struct alignas(CPU_LEVEL1_DCACHE_LINESIZE) trx_rseg_t
179179
/** @return header offset of the last committed transaction */
180180
uint16_t last_offset() const
181181
{
182-
return static_cast<uint16_t>(last_commit_and_offset & ((1ULL << 16) - 1));
182+
return static_cast<uint16_t>(last_commit_and_offset);
183183
}
184184

185185
void set_last_commit(uint16_t last_offset, trx_id_t trx_no)

storage/innobase/trx/trx0purge.cc

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -495,14 +495,12 @@ inline dberr_t purge_sys_t::iterator::free_history_rseg(trx_rseg_t &rseg) const
495495

496496
void purge_sys_t::cleanse_purge_queue(const fil_space_t &space)
497497
{
498-
byte purge_elem_list[TRX_SYS_N_RSEGS];
499498
mysql_mutex_lock(&pq_mutex);
500-
std::copy(purge_queue.c_begin(), purge_queue.c_end(), purge_elem_list);
501-
byte *purge_list_end = purge_elem_list + purge_queue.size();
499+
auto purge_elem_list= clone_queue_container();
502500
purge_queue.clear();
503-
for (byte *elem = purge_elem_list; elem < purge_list_end; ++elem)
504-
if (trx_sys.rseg_array[*elem].space != &space)
505-
purge_queue.push_rseg_index(*elem);
501+
for (auto elem : purge_elem_list)
502+
if (purge_queue::rseg(elem)->space != &space)
503+
purge_queue.push_trx_no_rseg(elem);
506504
mysql_mutex_unlock(&pq_mutex);
507505
}
508506

@@ -814,7 +812,7 @@ bool purge_sys_t::rseg_get_next_history_log()
814812
can never produce events from an empty rollback segment. */
815813

816814
mysql_mutex_lock(&pq_mutex);
817-
purge_queue.push(rseg);
815+
enqueue(*rseg);
818816
mysql_mutex_unlock(&pq_mutex);
819817
}
820818
}

storage/innobase/trx/trx0trx.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1153,7 +1153,7 @@ inline void trx_t::write_serialisation_history(mtr_t *mtr)
11531153
/* end cannot be less than anything in rseg. User threads only
11541154
produce events when a rollback segment is empty. */
11551155
rseg->set_last_commit(undo->hdr_offset, end);
1156-
purge_sys.enqueue(*rseg);
1156+
purge_sys.enqueue(end, *rseg);
11571157
purge_sys.queue_unlock();
11581158
}
11591159
else

0 commit comments

Comments
 (0)