@@ -871,6 +871,7 @@ pm_ppl_init_in_mem(
871871{
872872 uint64_t n, i;
873873 PMEM_PAGE_LOG_HASHED_LINE* pline;
874+ char sbuf[256 ];
874875
875876 ppl->ckpt_lsn = 0 ;
876877 ppl->max_oldest_lsn = 0 ;
@@ -882,8 +883,9 @@ pm_ppl_init_in_mem(
882883 pline = D_RW (D_RW (ppl->buckets )[i]);
883884
884885 pline->is_flushing = false ;
885- /* os events*/
886- pline->log_flush_event = os_event_create (" pm_line_log_flush_event" );
886+ /* Note that each pline must have distinct os event*/
887+ sprintf (sbuf," pm_line_log_flush_event%zu" , i);
888+ pline->log_flush_event = os_event_create (sbuf);
887889 /* the map*/
888890 pline->offset_map = new std::map<uint64_t , uint32_t >();
889891 }
@@ -1294,7 +1296,7 @@ pm_ppl_hash_check_and_add(
12941296
12951297 if (i >= 0 && i < pline->max_blocks ){
12961298 // pmemobj_rwlock_wrlock(pop, &D_RW(D_RW(pline->arr)[i])->lock);
1297- pmemobj_rwlock_wrlock (pop, &pline->meta_lock );
1299+ // pmemobj_rwlock_wrlock(pop, &pline->meta_lock);
12981300 plog_block = D_RW (D_RW (pline->arr )[i]);
12991301 assert (plog_block->is_free );
13001302
@@ -1308,7 +1310,7 @@ pm_ppl_hash_check_and_add(
13081310 // (2) Insert
13091311 HASH_INSERT (plog_hash_t , addr_hash, pline->addr_hash , key, item);
13101312
1311- pmemobj_rwlock_unlock (pop, &pline->meta_lock );
1313+ // pmemobj_rwlock_unlock(pop, &pline->meta_lock);
13121314 return item;
13131315 }
13141316
@@ -1611,7 +1613,7 @@ pm_ppl_parse_entry_id(
16111613 * */
16121614// void
16131615uint64_t
1614- pm_ppl_write_rec_v1 (
1616+ pm_ppl_write_rec (
16151617 PMEMobjpool* pop,
16161618 PMEM_PAGE_PART_LOG* ppl,
16171619 uint64_t key,
@@ -1621,7 +1623,10 @@ pm_ppl_write_rec_v1(
16211623 uint32_t n, n2;
16221624 PMEM_PAGE_LOG_HASHED_LINE* pline;
16231625 PMEM_PAGE_LOG_FREE_POOL* pfreepool;
1626+
1627+ TOID (PMEM_PAGE_LOG_BUF) logbuf;
16241628 PMEM_PAGE_LOG_BUF* plogbuf;
1629+
16251630 PMEM_PAGE_LOG_BLOCK* plog_block;
16261631 plog_hash_t * item;
16271632
@@ -1658,13 +1663,16 @@ pm_ppl_write_rec_v1(
16581663
16591664retry:
16601665 pline = D_RW (D_RW (ppl->buckets )[hashed]);
1666+ TOID_ASSIGN (logbuf, (pline->logbuf ).oid );
16611667 plogbuf = D_RW (pline->logbuf );
16621668
16631669 /* WARNING this lock may become bottle neck*/
16641670 pmemobj_rwlock_wrlock (pop, &pline->lock );
16651671
16661672 // if (plogbuf->state == PMEM_LOG_BUF_IN_FLUSH)
1667- if (pline->is_flushing )
1673+ // if (pline->is_flushing)
1674+ if (pline->is_flushing ||
1675+ plogbuf->state == PMEM_LOG_BUF_IN_FLUSH)
16681676 {
16691677 pmemobj_rwlock_unlock (pop, &pline->lock );
16701678
@@ -1703,19 +1711,23 @@ pm_ppl_write_rec_v1(
17031711 assert (D_RW (free_buf)->cur_off == PMEM_LOG_BUF_HEADER_SIZE);
17041712 assert (D_RW (free_buf)->n_recs == 0 );
17051713
1706- /* (1.2) insert free logbuf into the head*/
1707- TOID_ASSIGN (D_RW (free_buf)->prev , pline->logbuf .oid );
1708- TOID_ASSIGN (D_RW (pline->logbuf )->next , free_buf.oid );
1709- TOID_ASSIGN (pline->logbuf , free_buf.oid );
1710-
17111714 // test
17121715 if ( D_RW (plogbuf->prev ) != NULL ){
17131716 printf (" PMEM_WARN: there is another in-flushing logbuf id %zu before this full logbuf %zu pline %zu\n " , D_RW (plogbuf->prev )->id , plogbuf->id , plogbuf->hashed_id );
17141717 }
1718+
1719+ /* (1.2) insert free logbuf into the head*/
1720+ // TOID_ASSIGN(D_RW(free_buf)->prev, pline->logbuf.oid);
1721+ // TOID_ASSIGN(D_RW(pline->logbuf)->next, free_buf.oid);
1722+ // TOID_ASSIGN(pline->logbuf, free_buf.oid);
1723+
1724+ TOID_ASSIGN (D_RW (free_buf)->prev , logbuf.oid );
1725+ TOID_ASSIGN (plogbuf->next , free_buf.oid );
1726+ TOID_ASSIGN (pline->logbuf , free_buf.oid );
1727+
17151728 // move the diskaddr on the line ahead, the written size should be aligned with 512B for DIRECT_IO works
17161729 pline->diskaddr += plogbuf->size ;
17171730
1718-
17191731 // (1.4) write log rec on new buf
17201732 D_RW (free_buf)->hashed_id = pline->hashed_id ;
17211733
@@ -1738,6 +1750,20 @@ pm_ppl_write_rec_v1(
17381750 old_off = D_RW (free_buf)->cur_off ;
17391751 D_RW (free_buf)->cur_off += rec_size;
17401752
1753+ // (1.5) write the header. This header is needed when recovery,
1754+ byte* header = ppl->p_align + plogbuf->pmemaddr + 0 ;
1755+ byte* ptr = header;
1756+ mach_write_to_4 (ptr, plogbuf->cur_off );
1757+ ptr += 4 ;
1758+
1759+ mach_write_to_4 (ptr, plogbuf->n_recs );
1760+ ptr += 4 ;
1761+
1762+ // fill zero the un-used len
1763+ uint64_t dif_len = plogbuf->size - plogbuf->cur_off ;
1764+ if (dif_len > 0 ) {
1765+ memset (header + plogbuf->cur_off , 0 , dif_len);
1766+ }
17411767 /* wakeup who is waiting and early release the general lock*/
17421768 pline->is_flushing = false ;
17431769 os_event_set (pline->log_flush_event );
@@ -1778,20 +1804,6 @@ pm_ppl_write_rec_v1(
17781804 }
17791805 plog_block->lastLSN = rec_lsn;
17801806
1781- // (1.5) write the header. This header is needed when recovery,
1782- byte* header = ppl->p_align + plogbuf->pmemaddr + 0 ;
1783- byte* ptr = header;
1784- mach_write_to_4 (ptr, plogbuf->cur_off );
1785- ptr += 4 ;
1786-
1787- mach_write_to_4 (ptr, plogbuf->n_recs );
1788- ptr += 4 ;
1789-
1790- // fill zero the un-used len
1791- uint64_t dif_len = plogbuf->size - plogbuf->cur_off ;
1792- if (dif_len > 0 ) {
1793- memset (header + plogbuf->cur_off , 0 , dif_len);
1794- }
17951807
17961808 // (1.6) assign a pointer in the flusher to the full log buf, this function return immediately
17971809 pm_log_buf_assign_flusher (ppl, plogbuf);
@@ -1821,8 +1833,12 @@ pm_ppl_write_rec_v1(
18211833
18221834 old_off = plogbuf->cur_off ;
18231835 plogbuf->cur_off += rec_size;
1836+
1837+ if (!pline->is_req_checkpoint ){
1838+ pm_ppl_check_for_ckpt (pop, ppl, pline, plogbuf, rec_lsn);
1839+ }
18241840 /* early realease the general lock*/
1825- // pmemobj_rwlock_unlock(pop, &pline->lock);
1841+ pmemobj_rwlock_unlock (pop, &pline->lock );
18261842
18271843 /* (2) Get the plogblock*/
18281844 pmemobj_rwlock_wrlock (pop, &pline->meta_lock );
@@ -1836,6 +1852,7 @@ pm_ppl_write_rec_v1(
18361852 plog_block = D_RW (D_RW (pline->arr )[item->block_off ]);
18371853 assert (plog_block);
18381854
1855+ // pmemobj_rwlock_wrlock(pop, &plog_block->lock);
18391856 // update plog_block
18401857 if (plog_block->firstLSN == 0 ){
18411858 // first write
@@ -1854,19 +1871,22 @@ pm_ppl_write_rec_v1(
18541871 // test
18551872 /* insert the pair (offset, bid) into the set*/
18561873 write_off = plog_block->start_diskaddr + plog_block->start_off ;
1874+ pmemobj_rwlock_wrlock (pop, &pline->meta_lock );
18571875 pline->offset_map ->insert ( std::make_pair (write_off, item->block_off ));
1876+ pmemobj_rwlock_unlock (pop, &pline->meta_lock );
18581877 }
18591878
18601879 plog_block->lastLSN = rec_lsn;
18611880
1862- pmemobj_rwlock_unlock (pop, &pline->lock );
1881+ // pmemobj_rwlock_unlock(pop, &plog_block->lock);
1882+ // pmemobj_rwlock_unlock(pop, &pline->lock);
18631883 return rec_lsn;
18641884 } // end handle regular logbuf
18651885 // pmemobj_rwlock_unlock(pop, &pline->lock);
18661886}
18671887
18681888uint64_t
1869- pm_ppl_write_rec (
1889+ pm_ppl_write_rec_v2 (
18701890 PMEMobjpool* pop,
18711891 PMEM_PAGE_PART_LOG* ppl,
18721892 uint64_t key,
@@ -3923,7 +3943,7 @@ pm_ppl_flush_page(
39233943 // plog_block->firstLSN may a little greater than bpage->oldest_modification, we don't assert here
39243944 // assert(plog_block->firstLSN == bpage->oldest_modification);
39253945
3926- pmemobj_rwlock_wrlock (pop, &pline->lock );
3946+ pmemobj_rwlock_wrlock (pop, &pline->meta_lock );
39273947
39283948 pmemobj_rwlock_wrlock (pop, &plog_block->lock );
39293949
@@ -3982,7 +4002,7 @@ pm_ppl_flush_page(
39824002
39834003 HASH_DELETE (plog_hash_t , addr_hash, pline->addr_hash , key, item);
39844004
3985- pmemobj_rwlock_unlock (pop, &pline->lock );
4005+ pmemobj_rwlock_unlock (pop, &pline->meta_lock );
39864006
39874007 // printf("pm_ppl_flush space %zu page %zu pageLSN %zu pline %zu oldest_block_off %zu\n",
39884008 // space, page_no, pageLSN, pline->hashed_id, pline->oldest_block_off);
0 commit comments