@@ -178,12 +178,11 @@ pm_wrapper_page_log_alloc_or_open(
178178 /* Part 2: DRAM structures*/
179179
180180 // In any case (new alloc or reused) we need to allocate below objects
181+
182+ /* init the per-line std::map */
183+ pm_ppl_init_in_mem (pmw->pop , pmw->ppl );
181184
182- // defined in my_pmemobj.h, implement in buf0flu.cc
183- pmw->ppl ->ckpt_lsn = 0 ;
184- pmw->ppl ->max_oldest_lsn = 0 ;
185- pmw->ppl ->min_oldest_lsn = ULONG_MAX;
186-
185+ /* defined in my_pmemobj.h, implement in buf0flu.cc */
187186 pmw->ppl ->flusher = pm_log_flusher_init (PMEM_N_LOG_FLUSH_THREADS, FLUSHER_LOG_BUF);
188187
189188 pmw->ppl ->free_log_pool_event = os_event_create (" pm_free_log_pool_event" );
@@ -857,6 +856,33 @@ void __realloc_TT_entry(
857856 }
858857}
859858
859+ /*
860+ * Init in-mem data structures and variables
861+ * Used for either new PPL or reused
862+ * Called from pm_wrapper_page_log_alloc_or_open()
863+ * */
864+ void
865+ pm_ppl_init_in_mem (
866+ PMEMobjpool* pop,
867+ PMEM_PAGE_PART_LOG* ppl)
868+ {
869+ uint64_t n, i;
870+ PMEM_PAGE_LOG_HASHED_LINE* pline;
871+
872+ ppl->ckpt_lsn = 0 ;
873+ ppl->max_oldest_lsn = 0 ;
874+ ppl->min_oldest_lsn = ULONG_MAX;
875+
876+ n = ppl->n_buckets ;
877+ /* per-line in-mem data structures*/
878+ for (i = 0 ; i < n; i++){
879+ pline = D_RW (D_RW (ppl->buckets )[i]);
880+
881+ /* the map*/
882+ pline->offset_map = new std::map<uint64_t , uint32_t >();
883+ }
884+ }
885+
860886void
861887pm_page_part_log_bucket_init (
862888 PMEMobjpool* pop,
@@ -1177,25 +1203,28 @@ pm_ppl_hash_add(
11771203}
11781204
11791205/*
1180- * Add key to hashtable
1206+ * Add key to hashtable if it is not in
11811207 * The caller reponse for holding the pline->lock
1208+ * @param[in] pop
1209+ * @param[in] ppl
1210+ * @param[in] pline
1211+ * @param[in] key
1212+ * @return pointer to the item in hashtable
11821213 * */
11831214plog_hash_t *
11841215pm_ppl_hash_check_and_add (
1185- PMEMobjpool* pop,
1186- PMEM_PAGE_PART_LOG* ppl,
1187- PMEM_PAGE_LOG_HASHED_LINE* pline,
1188- uint64_t key )
1216+ PMEMobjpool* pop,
1217+ PMEM_PAGE_PART_LOG* ppl,
1218+ PMEM_PAGE_LOG_HASHED_LINE* pline,
1219+ uint64_t key )
11891220{
1190-
11911221 uint64_t i, k;
11921222 int64_t n_try;
11931223 int64_t i_bit;
11941224
11951225 PMEM_PAGE_LOG_BLOCK* plog_block;
11961226 plog_hash_t * item;
11971227
1198-
11991228 item = pm_ppl_hash_get (pop, ppl, pline, key);
12001229
12011230 if (item == NULL ){
@@ -1541,12 +1570,12 @@ pm_ppl_write_rec(
15411570 byte* log_src,
15421571 uint32_t rec_size)
15431572{
1544- uint32_t n, n2;
1545- PMEM_PAGE_LOG_HASHED_LINE* pline;
1546- PMEM_PAGE_LOG_FREE_POOL* pfreepool;
1547- PMEM_PAGE_LOG_BUF* plogbuf;
1548- PMEM_PAGE_LOG_BLOCK* plog_block;
1549- plog_hash_t * item;
1573+ uint32_t n, n2;
1574+ PMEM_PAGE_LOG_HASHED_LINE* pline;
1575+ PMEM_PAGE_LOG_FREE_POOL* pfreepool;
1576+ PMEM_PAGE_LOG_BUF* plogbuf;
1577+ PMEM_PAGE_LOG_BLOCK* plog_block;
1578+ plog_hash_t * item;
15501579
15511580 ulint hashed;
15521581 ulint hashed2;
@@ -1575,11 +1604,6 @@ pm_ppl_write_rec(
15751604 // n2 = n - 1;
15761605
15771606 PMEM_LOG_HASH_KEY (hashed, key, n);
1578- // if (space == 0){
1579- // hashed = n - 1;
1580- // } else {
1581- // PMEM_LOG_HASH_KEY(hashed, key, n2);
1582- // }
15831607
15841608 assert (hashed < n);
15851609
@@ -1664,6 +1688,10 @@ Note: After pm_ppl_hash_add, plog_block->state = PMEM_IN_USED_BLOCK and is_free
16641688 if (pline->oldest_block_off == UINT32_MAX) {
16651689 pline->oldest_block_off = item->block_off ;
16661690 }
1691+ // test
1692+ /* insert the pair (offset, bid) into the set*/
1693+ write_off = plog_block->start_diskaddr + plog_block->start_off ;
1694+ pline->offset_map ->insert ( std::make_pair (write_off, item->block_off ));
16671695
16681696 }
16691697 plog_block->lastLSN = rec_lsn;
@@ -1723,6 +1751,10 @@ Note: After pm_ppl_hash_add, plog_block->state = PMEM_IN_USED_BLOCK and is_free
17231751 if (pline->oldest_block_off == UINT32_MAX) {
17241752 pline->oldest_block_off = item->block_off ;
17251753 }
1754+ // test
1755+ /* insert the pair (offset, bid) into the set*/
1756+ write_off = plog_block->start_diskaddr + plog_block->start_off ;
1757+ pline->offset_map ->insert ( std::make_pair (write_off, item->block_off ));
17261758 }
17271759
17281760 plog_block->lastLSN = rec_lsn;
@@ -1742,7 +1774,7 @@ Note: After pm_ppl_hash_add, plog_block->state = PMEM_IN_USED_BLOCK and is_free
17421774 /* IMPORTANT: always update offset after updating plog_block*/
17431775 plogbuf->cur_off += rec_size;
17441776
1745- // Call checkpoint (in necessary)
1777+ /* compute ckpt_lsn for this line (in necessary) */
17461778 if (!pline->is_req_checkpoint ){
17471779 pm_ppl_check_for_ckpt (pop, ppl, pline, plogbuf, rec_lsn);
17481780 }
@@ -1793,11 +1825,12 @@ pm_ppl_check_for_ckpt(
17931825 cur_off = pline->diskaddr + plogbuf->cur_off ;
17941826 age = cur_off - oldest_off;
17951827
1796- // we only set ckpt_lsn if it has not set yet
1828+ /* we only set ckpt_lsn if it has not set yet */
17971829 if ( age > PMEM_CKPT_MAX_OFFSET) {
17981830
17991831 pline->is_req_checkpoint = true ;
1800- // now compute the checkpoint lsn for this pline
1832+
1833+ /* now compute the checkpoint lsn for this pline */
18011834 oldest_lsn = plog_block_oldest->firstLSN ;
18021835 uint64_t delta = (uint64_t ) ((cur_lsn - oldest_lsn) * 1.0 * PMEM_CKPT_THRESHOLD);
18031836
@@ -3277,6 +3310,9 @@ pm_ppl_flush_page(
32773310
32783311 int64_t free_idx;
32793312 int64_t n_try;
3313+
3314+ uint64_t write_off;
3315+ uint64_t min_off;
32803316
32813317 plog_hash_t * item;
32823318
@@ -3323,18 +3359,58 @@ pm_ppl_flush_page(
33233359 pmemobj_rwlock_wrlock (pop, &pline->lock );
33243360
33253361 pmemobj_rwlock_wrlock (pop, &plog_block->lock );
3326-
3362+
3363+ /* save the write_off before reseting*/
3364+ write_off = plog_block->start_diskaddr + plog_block->start_off ;
3365+
33273366 if (USE_BIT_ARRAY) {
33283367 pm_bit_clear (pline->bit_arr , sizeof (long long ), plog_block->id );
33293368 }
33303369 __reset_page_log_block (plog_block);
33313370
33323371 pmemobj_rwlock_unlock (pop, &plog_block->lock );
33333372
3334-
3373+ /* remove item from map*/
3374+
3375+ std::map<uint64_t , uint32_t >::iterator it;
3376+ it = pline->offset_map ->find (write_off);
3377+
3378+ if (it != pline->offset_map ->end ()){
3379+ pline->offset_map ->erase (it);
3380+ }
3381+ else {
3382+ /* write_off is not found, logical error*/
3383+ assert (0 );
3384+ }
3385+
33353386 /* if the removed block is the oldest, update the new one */
33363387 if (item->block_off == pline->oldest_block_off ){
3337- pm_ppl_update_oldest (pop, ppl, pline);
3388+ /* Method 1: scan in the array (slow)*/
3389+ // pm_ppl_update_oldest(pop, ppl, pline);
3390+
3391+ /* Method 2: use std::map */
3392+ min_off = ULONG_MAX;
3393+ if (pline->offset_map ->size () > 0 ){
3394+ /* get the next min offset*/
3395+ it = pline->offset_map ->begin ();
3396+
3397+ min_off = it->second ;
3398+ PMEM_PAGE_LOG_BLOCK* pmin_log_block;
3399+ pmin_log_block = D_RW (D_RW (pline->arr )[min_off]);
3400+ /* the second smallest must larger than the smallest*/
3401+ assert (pmin_log_block->start_diskaddr + pmin_log_block->start_off > write_off);
3402+
3403+ if (pline->is_req_checkpoint ){
3404+ if (pmin_log_block->firstLSN > pline->ckpt_lsn ){
3405+ pline->is_req_checkpoint = false ;
3406+ }
3407+ }
3408+ } else {
3409+ pline->is_req_checkpoint = false ;
3410+ }
3411+
3412+ pline->oldest_block_off = min_off;
3413+
33383414 }
33393415
33403416 HASH_DELETE (plog_hash_t , addr_hash, pline->addr_hash , key, item);
@@ -3345,62 +3421,6 @@ pm_ppl_flush_page(
33453421 // space, page_no, pageLSN, pline->hashed_id, pline->oldest_block_off);
33463422 }
33473423
3348- return ;
3349- // //////////////////////////////////////////////////
3350- // find the log block by hashing key O(k)
3351- // n_try = k;
3352- n_try = pline->max_blocks ;
3353-
3354- // PMEM_LOG_HASH_KEY(i, key, k);
3355- PMEM_LOG_HASH_KEY (i, key, pline->max_blocks );
3356-
3357- // test
3358- // plog_block = D_RW(D_RW(pline->arr)[i]);
3359- // __reset_page_log_block(plog_block);
3360- // return;
3361- // end test
3362-
3363- while (n_try > 0 ){
3364- // for (i = 0; i < k; i++){}
3365- pmemobj_rwlock_wrlock (pop, &D_RW (D_RW (pline->arr )[i])->lock );
3366- plog_block = D_RW (D_RW (pline->arr )[i]);
3367-
3368- if (!plog_block->is_free &&
3369- plog_block->key == key){
3370- // Case A: found
3371- // (1) no need to check and reclaim the corresponding entries in TT
3372-
3373- // update the pageLSN on flush page
3374- plog_block->pageLSN = pageLSN;
3375-
3376- // we no longer assert here, new log recs are written on page during its flushing time
3377- // assert(plog_block->lastLSN <= pageLSN);
3378-
3379- /* Note 1: In InnoDB, changes of UNDO page has already captured in REDO log, we don't need to check the count variable to equal to reclaim. However, in other storage engine, count variable may needed
3380- * Note 2: Checkpoint in PPL is naturally done by this reclaim. By reclaiming a block of flush page, the low_watermark is increased.
3381- * */
3382- // (2) Check to reclaim this log block
3383- // printf("pm_ppl_flush_page (%zu, %zu) key %zu\n bid %zu count %zu", space, page_no, key, plog_block->bid, plog_block->count);
3384- // if (plog_block->count <= 0){
3385- if (plog_block->lastLSN <= pageLSN){
3386- __reset_page_log_block (plog_block);
3387- }
3388- // }
3389- pmemobj_rwlock_unlock (pop, &D_RW (D_RW (pline->arr )[i])->lock );
3390- return ;
3391-
3392- }// end found the right log block
3393-
3394- pmemobj_rwlock_unlock (pop, &D_RW (D_RW (pline->arr )[i])->lock );
3395- n_try--;
3396- i = (i + 1 ) % k;
3397- // next log block
3398- }// end find the log blcok by hashing key
3399-
3400- // if you reach here, then this flushed page doesn't have any transaction modified
3401- // it may be the DBMS's metadata page
3402-
3403- // Now we skip this case
34043424 return ;
34053425}
34063426
0 commit comments