@@ -65,6 +65,8 @@ static const int buf_flush_page_cleaner_priority = -20;
6565extern ulint gb_flush_time;
6666#endif
6767#if defined(UNIV_PMEMOBJ_BUF) || defined (UNIV_PMEMOBJ_PART_PL)
68+ #include < sys/syscall.h>
69+ #include < sys/types.h> // for gettid()
6870#include " my_pmemobj.h"
6971#include < libpmemobj.h>
7072extern PMEM_WRAPPER* gb_pmw;
@@ -4337,6 +4339,13 @@ DECLARE_THREAD(pm_log_redoer_worker)(
43374339 os_thread_create */
43384340{
43394341 ulint i;
4342+ pid_t thread_id;
4343+ ulint idx;
4344+ ulint lines_per_thread;
4345+ // int dist_mode = 2;
4346+ int dist_mode = 1 ;
4347+
4348+ ulint start_time, end_time, e_time;
43404349
43414350 PMEM_LOG_REDOER* redoer = gb_pmw->ppl ->redoer ;
43424351
@@ -4346,9 +4355,18 @@ DECLARE_THREAD(pm_log_redoer_worker)(
43464355 my_thread_init ();
43474356
43484357 mutex_enter (&redoer->mutex );
4358+ idx = redoer->n_workers ;
43494359 redoer->n_workers ++;
43504360 os_event_reset (redoer->is_log_all_closed );
43514361 mutex_exit (&redoer->mutex );
4362+
4363+ // thread_id = os_thread_pf(os_thread_get_curr_id());
4364+ lines_per_thread = redoer->size / (srv_ppl_n_redoer_threads - 1 );
4365+
4366+ // thread_id = syscall(SYS_gettid);
4367+ // idx = thread_id % srv_ppl_n_redoer_threads;
4368+
4369+ printf (" Redoers thread %zu idx %zu created\n " ,thread_id, idx);
43524370
43534371 while (true ) {
43544372 // worker thread wait until there is is_requested signal
@@ -4361,23 +4379,44 @@ DECLARE_THREAD(pm_log_redoer_worker)(
43614379 // do nothing
43624380 break ;
43634381 }
4364-
4365- for (i = 0 ; i < redoer->size ; i++) {
4366- mutex_enter (&redoer->mutex );
4382+ /* Method 1: sequential distribute*/
4383+ // for (i = 0; i < redoer->size; i++)
4384+ /* Method 2: segment distribute*/
4385+ for (i = idx * lines_per_thread;
4386+ i < (idx + 1 ) * lines_per_thread &&
4387+ i < redoer->size
4388+ ; i++)
4389+ /* Method 3: evently distribute*/
4390+ // for (i = idx ;
4391+ // i < redoer->size
4392+ // ; i+= srv_ppl_n_redoer_threads)
4393+ {
4394+ if (dist_mode ==1 )
4395+ mutex_enter (&redoer->mutex );
43674396
43684397 pline = redoer->hashed_line_arr [i];
43694398
43704399 if (pline != NULL && !pline->is_redoing )
43714400 {
43724401 pline->is_redoing = true ;
4402+ recv_line = pline->recv_line ;
43734403 // do not hold the mutex during REDOing
4374- mutex_exit (&redoer->mutex );
4404+ if (dist_mode ==1 )
4405+ mutex_exit (&redoer->mutex );
43754406
43764407 /* **this call REDOing for a line ***/
43774408 if (redoer->phase == PMEM_REDO_PHASE1){
43784409 // printf("PMEM_REDO: start REDO_PHASE1 (scan and parse) line %zu ...\n", pline->hashed_id);
43794410
4411+
4412+ // start_time = ut_time_us(NULL);
43804413 bool is_err = pm_ppl_redo_line (gb_pmw->pop , gb_pmw->ppl , pline);
4414+ // end_time = ut_time_us(NULL);
4415+
4416+ // recv_line->redo1_thread_id = idx;
4417+ // recv_line->redo1_start_time = start_time;
4418+ // recv_line->redo1_end_time = end_time;
4419+ // recv_line->redo1_elapse_time = (end_time - start_time);
43814420
43824421 if (is_err){
43834422 printf (" PMEM_REDO: error redoing line %zu \n " , pline->hashed_id );
@@ -4389,27 +4428,37 @@ DECLARE_THREAD(pm_log_redoer_worker)(
43894428#if defined (UNIV_PMEMOBJ_PART_PL_DEBUG)
43904429 printf (" PMEM_REDO: start REDO_PHASE2 (applying) line %zu ...\n " , pline->hashed_id );
43914430#endif
4431+ // start_time = ut_time_us(NULL);
43924432 pm_ppl_recv_apply_hashed_line (
43934433 gb_pmw->pop , gb_pmw->ppl ,
43944434 pline, pline->recv_line ->is_ibuf_avail );
4435+ // end_time = ut_time_us(NULL);
43954436
4437+ // recv_line->redo2_thread_id = idx;
4438+ // recv_line->redo2_start_time = start_time;
4439+ // recv_line->redo2_end_time = end_time;
4440+ // recv_line->redo2_elapse_time = (end_time - start_time);
43964441#if defined (UNIV_PMEMOBJ_PART_PL_DEBUG)
43974442 printf (" PMEM_REDO: end REDO_PHASE2 (applying) line %zu\n " , pline->hashed_id );
43984443#endif
43994444 }
44004445
4401- mutex_enter (&redoer->mutex );
4446+ if (dist_mode ==1 )
4447+ mutex_enter (&redoer->mutex );
4448+
44024449 redoer->hashed_line_arr [i] = NULL ;
44034450 // redoer->n_requested--;
44044451 redoer->n_remains --;
44054452
44064453 if (redoer->n_remains == 0 ){
44074454 // this is the last REDO
4408- mutex_exit (&redoer->mutex );
4455+ if (dist_mode ==1 )
4456+ mutex_exit (&redoer->mutex );
44094457 break ;
44104458 }
44114459 }
4412- mutex_exit (&redoer->mutex );
4460+ if (dist_mode ==1 )
4461+ mutex_exit (&redoer->mutex );
44134462 } // end for
44144463
44154464 // after this for loop, all lines are either done REDO or REDOing by other threads, this thread has nothing to do
@@ -4419,7 +4468,8 @@ DECLARE_THREAD(pm_log_redoer_worker)(
44194468 mutex_enter (&redoer->mutex );
44204469 redoer->n_workers --;
44214470 if (redoer->n_workers == 0 ) {
4422- printf (" The last log redoer is closing\n " );
4471+ printf (" The last log redoer is closing. Redo phase %zu redoer->n_remains %zu ppl->n_redoing_lines %zu\n " ,
4472+ redoer->phase , redoer->n_remains , gb_pmw->ppl ->n_redoing_lines );
44234473 // trigger the coordinator (the pm_ppl_redo) to wakeup
44244474 os_event_set (redoer->is_log_all_finished );
44254475 }
0 commit comments