1
1
/* ****************************************************************************
2
2
3
3
Copyright (c) 2014, 2019, Oracle and/or its affiliates. All Rights Reserved.
4
- Copyright (c) 2017, 2019 , MariaDB Corporation.
4
+ Copyright (c) 2017, 2020 , MariaDB Corporation.
5
5
6
6
This program is free software; you can redistribute it and/or modify it under
7
7
the terms of the GNU General Public License as published by the Free Software
@@ -29,6 +29,7 @@ Created 03/11/2014 Shaohua Wang
29
29
#include " btr0cur.h"
30
30
#include " btr0pcur.h"
31
31
#include " ibuf0ibuf.h"
32
+ #include " page0page.h"
32
33
#include " trx0trx.h"
33
34
34
35
/* * Innodb B-tree index fill factor for bulk load. */
@@ -140,7 +141,6 @@ PageBulk::init()
140
141
}
141
142
142
143
m_block = new_block;
143
- m_block->skip_flush_check = true ;
144
144
m_page = new_page;
145
145
m_page_zip = new_page_zip;
146
146
m_page_no = new_page_no;
@@ -160,7 +160,11 @@ PageBulk::init()
160
160
srv_page_size - dict_index_zip_pad_optimal_page_size (m_index);
161
161
m_heap_top = page_header_get_ptr (new_page, PAGE_HEAP_TOP);
162
162
m_rec_no = page_header_get_field (new_page, PAGE_N_RECS);
163
-
163
+ /* Temporarily reset PAGE_DIRECTION_B from PAGE_NO_DIRECTION to 0,
164
+ without writing redo log, to ensure that needs_finish() will hold
165
+ on an empty page. */
166
+ ut_ad (m_page[PAGE_HEADER + PAGE_DIRECTION_B] == PAGE_NO_DIRECTION);
167
+ m_page[PAGE_HEADER + PAGE_DIRECTION_B] = 0 ;
164
168
ut_d (m_total_data = 0 );
165
169
/* See page_copy_rec_list_end_to_created_page() */
166
170
ut_d (page_header_set_field (m_page, NULL , PAGE_HEAP_TOP,
@@ -186,7 +190,7 @@ PageBulk::insert(
186
190
187
191
#ifdef UNIV_DEBUG
188
192
/* Check whether records are in order. */
189
- if (!page_rec_is_infimum ( m_cur_rec)) {
193
+ if (!page_rec_is_infimum_low ( page_offset ( m_cur_rec) )) {
190
194
rec_t * old_rec = m_cur_rec;
191
195
rec_offs* old_offsets = rec_get_offsets (
192
196
old_rec, m_index, NULL , is_leaf,
@@ -204,18 +208,21 @@ PageBulk::insert(
204
208
rec_offs_make_valid (insert_rec, m_index, is_leaf, offsets);
205
209
206
210
/* 2. Insert the record in the linked list. */
207
- rec_t * next_rec = page_rec_get_next (m_cur_rec);
208
-
209
- page_rec_set_next (insert_rec, next_rec);
210
- page_rec_set_next (m_cur_rec, insert_rec);
211
-
212
211
/* 3. Set the n_owned field in the inserted record to zero,
213
212
and set the heap_no field. */
214
213
if (m_is_comp) {
214
+ ulint next_offs = rec_get_next_offs (m_cur_rec, TRUE );
215
+ rec_set_next_offs_new (insert_rec, next_offs);
216
+ rec_set_next_offs_new (m_cur_rec, page_offset (insert_rec));
217
+
215
218
rec_set_n_owned_new (insert_rec, NULL , 0 );
216
219
rec_set_heap_no_new (insert_rec,
217
220
PAGE_HEAP_NO_USER_LOW + m_rec_no);
218
221
} else {
222
+ ulint next_offs = rec_get_next_offs (m_cur_rec, FALSE );
223
+ rec_set_next_offs_old (insert_rec, next_offs);
224
+ rec_set_next_offs_old (m_cur_rec, page_offset (insert_rec));
225
+
219
226
rec_set_n_owned_old (insert_rec, 0 );
220
227
rec_set_heap_no_old (insert_rec,
221
228
PAGE_HEAP_NO_USER_LOW + m_rec_no);
@@ -243,17 +250,54 @@ PageBulk::insert(
243
250
m_cur_rec = insert_rec;
244
251
}
245
252
253
+ inline bool PageBulk::needs_finish () const
254
+ {
255
+ ut_ad (page_align (m_cur_rec) == m_block->frame );
256
+ ut_ad (m_page == m_block->frame );
257
+ if (!m_page[PAGE_HEADER + PAGE_DIRECTION_B])
258
+ return true ;
259
+ ulint heap_no, n_heap= page_header_get_field (m_page, PAGE_N_HEAP);
260
+ ut_ad ((n_heap & 0x7fff ) >= PAGE_HEAP_NO_USER_LOW);
261
+ if (n_heap & 0x8000 )
262
+ {
263
+ n_heap&= 0x7fff ;
264
+ heap_no= rec_get_heap_no_new (m_cur_rec);
265
+ if (heap_no == PAGE_HEAP_NO_INFIMUM &&
266
+ page_header_get_field (m_page, PAGE_HEAP_TOP) == PAGE_NEW_SUPREMUM_END)
267
+ return false ;
268
+ }
269
+ else
270
+ {
271
+ heap_no= rec_get_heap_no_old (m_cur_rec);
272
+ if (heap_no == PAGE_HEAP_NO_INFIMUM &&
273
+ page_header_get_field (m_page, PAGE_HEAP_TOP) == PAGE_OLD_SUPREMUM_END)
274
+ return false ;
275
+ }
276
+ return heap_no != n_heap - 1 ;
277
+ }
278
+
246
279
/* * Mark end of insertion to the page. Scan all records to set page dirs,
247
280
and set page header members.
248
281
Note: we refer to page_copy_rec_list_end_to_created_page. */
249
282
void
250
283
PageBulk::finish ()
251
284
{
252
- ut_ad (m_rec_no > 0 );
285
+ ut_ad (!dict_index_is_spatial (m_index));
286
+
287
+ if (!needs_finish ()) {
288
+ return ;
289
+ }
290
+
253
291
ut_ad (m_total_data + page_dir_calc_reserved_space (m_rec_no)
254
292
<= page_get_free_space_of_empty (m_is_comp));
293
+ #ifdef UNIV_DEBUG
255
294
/* See page_copy_rec_list_end_to_created_page() */
256
- ut_d (page_dir_set_n_slots (m_page, NULL , srv_page_size / 2 ));
295
+ if (m_rec_no) {
296
+ page_dir_set_n_slots (m_page, NULL , srv_page_size / 2 );
297
+ }
298
+ mach_write_to_2 (PAGE_HEADER + PAGE_HEAP_TOP + m_page,
299
+ ulint (m_heap_top - m_page));
300
+ #endif
257
301
258
302
ulint count = 0 ;
259
303
ulint n_recs = 0 ;
@@ -262,8 +306,7 @@ PageBulk::finish()
262
306
page_dir_slot_t * slot = NULL ;
263
307
264
308
/* Set owner & dir. */
265
- do {
266
-
309
+ while (!page_rec_is_supremum (insert_rec)) {
267
310
count++;
268
311
n_recs++;
269
312
@@ -280,7 +323,7 @@ PageBulk::finish()
280
323
}
281
324
282
325
insert_rec = page_rec_get_next (insert_rec);
283
- } while (! page_rec_is_supremum (insert_rec));
326
+ }
284
327
285
328
if (slot_index > 0
286
329
&& (count + 1 + (PAGE_DIR_SLOT_MAX_N_OWNED + 1 ) / 2
@@ -303,10 +346,14 @@ PageBulk::finish()
303
346
page_dir_slot_set_rec (slot, page_get_supremum_rec (m_page));
304
347
page_dir_slot_set_n_owned (slot, NULL , count + 1 );
305
348
306
- ut_ad (!dict_index_is_spatial (m_index));
307
349
ut_ad (!page_get_instant (m_page));
308
350
309
- if (!m_flush_observer && !m_page_zip) {
351
+ if (!m_rec_no) {
352
+ /* Restore PAGE_DIRECTION_B from 0 to
353
+ PAGE_NO_DIRECTION like it should be on an empty page,
354
+ again without writing redo log. */
355
+ m_page[PAGE_HEADER + PAGE_DIRECTION_B] = PAGE_NO_DIRECTION;
356
+ } else if (!m_flush_observer && !m_page_zip) {
310
357
mlog_write_ulint (PAGE_HEADER + PAGE_N_DIR_SLOTS + m_page,
311
358
2 + slot_index, MLOG_2BYTES, &m_mtr);
312
359
mlog_write_ulint (PAGE_HEADER + PAGE_HEAP_TOP + m_page,
@@ -343,26 +390,18 @@ PageBulk::finish()
343
390
mach_write_to_2 (PAGE_HEADER + PAGE_N_DIRECTION + m_page, 0 );
344
391
}
345
392
346
- m_block->skip_flush_check = false ;
393
+ ut_ad (!needs_finish ());
394
+ ut_ad (page_validate (m_page, m_index));
347
395
}
348
396
349
397
/* * Commit inserts done to the page
350
398
@param[in] success Flag whether all inserts succeed. */
351
- void
352
- PageBulk::commit (
353
- bool success)
399
+ void PageBulk::commit (bool success)
354
400
{
355
- if (success) {
356
- ut_ad (page_validate (m_page, m_index));
357
-
358
- /* Set no free space left and no buffered changes in ibuf. */
359
- if (!dict_index_is_clust (m_index) && page_is_leaf (m_page)) {
360
- ibuf_set_bitmap_for_bulk_load (
361
- m_block, innobase_fill_factor == 100 );
362
- }
363
- }
364
-
365
- m_mtr.commit ();
401
+ finish ();
402
+ if (success && !dict_index_is_clust (m_index) && page_is_leaf (m_page))
403
+ ibuf_set_bitmap_for_bulk_load (m_block, innobase_fill_factor == 100 );
404
+ m_mtr.commit ();
366
405
}
367
406
368
407
/* * Compress a page of compressed table
@@ -604,7 +643,9 @@ PageBulk::storeExt(
604
643
const big_rec_t * big_rec,
605
644
rec_offs* offsets)
606
645
{
607
- /* Note: not all fileds are initialized in btr_pcur. */
646
+ finish ();
647
+
648
+ /* Note: not all fields are initialized in btr_pcur. */
608
649
btr_pcur_t btr_pcur;
609
650
btr_pcur.pos_state = BTR_PCUR_IS_POSITIONED;
610
651
btr_pcur.latch_mode = BTR_MODIFY_LEAF;
@@ -633,7 +674,7 @@ Note: log_free_check requires holding no lock/latch in current thread. */
633
674
void
634
675
PageBulk::release ()
635
676
{
636
- ut_ad (! dict_index_is_spatial (m_index) );
677
+ finish ( );
637
678
638
679
/* We fix the block because we will re-pin it soon. */
639
680
buf_block_buf_fix_inc (m_block, __FILE__, __LINE__);
@@ -691,32 +732,30 @@ BtrBulk::pageSplit(
691
732
{
692
733
ut_ad (page_bulk->getPageZip () != NULL );
693
734
694
- /* 1. Check if we have only one user record on the page. */
695
735
if (page_bulk->getRecNo () <= 1 ) {
696
736
return (DB_TOO_BIG_RECORD);
697
737
}
698
738
699
- /* 2. create a new page. */
739
+ /* Initialize a new page */
700
740
PageBulk new_page_bulk (m_index, m_trx->id , FIL_NULL,
701
741
page_bulk->getLevel (), m_flush_observer);
702
742
dberr_t err = new_page_bulk.init ();
703
743
if (err != DB_SUCCESS) {
704
744
return (err);
705
745
}
706
746
707
- /* 3. copy the upper half to new page. */
747
+ /* Copy the upper half to the new page. */
708
748
rec_t * split_rec = page_bulk->getSplitRec ();
709
749
new_page_bulk.copyIn (split_rec);
710
750
page_bulk->copyOut (split_rec);
711
751
712
- /* 4. commit the splitted page . */
752
+ /* Commit the pages after split . */
713
753
err = pageCommit (page_bulk, &new_page_bulk, true );
714
754
if (err != DB_SUCCESS) {
715
755
pageAbort (&new_page_bulk);
716
756
return (err);
717
757
}
718
758
719
- /* 5. commit the new page. */
720
759
err = pageCommit (&new_page_bulk, next_page_bulk, true );
721
760
if (err != DB_SUCCESS) {
722
761
pageAbort (&new_page_bulk);
@@ -942,11 +981,9 @@ BtrBulk::insert(
942
981
ut_ad (page_bulk->getLevel () == 0 );
943
982
ut_ad (page_bulk == m_page_bulks.at (0 ));
944
983
945
- /* Release all latched but leaf node. */
984
+ /* Release all pages above the leaf level */
946
985
for (ulint level = 1 ; level <= m_root_level; level++) {
947
- PageBulk* page_bulk = m_page_bulks.at (level);
948
-
949
- page_bulk->release ();
986
+ m_page_bulks.at (level)->release ();
950
987
}
951
988
952
989
err = page_bulk->storeExt (big_rec, offsets);
@@ -1032,6 +1069,7 @@ BtrBulk::finish(dberr_t err)
1032
1069
return (err);
1033
1070
}
1034
1071
root_page_bulk.copyIn (first_rec);
1072
+ root_page_bulk.finish ();
1035
1073
1036
1074
/* Remove last page. */
1037
1075
btr_page_free (m_index, last_block, &mtr);
0 commit comments