@@ -3922,6 +3922,8 @@ FlushObserver::flush()
39223922#if defined (UNIV_PMEMOBJ_PART_PL)
39233923// Defined in my_pmemobj.h
39243924
3925+ // ///////// FLUSHER /////////////////////
3926+
39253927PMEM_LOG_FLUSHER*
39263928pm_log_flusher_init (
39273929 const size_t size) {
@@ -4136,8 +4138,230 @@ DECLARE_THREAD(pm_log_flusher_worker)(
41364138 OS_THREAD_DUMMY_RETURN;
41374139}
41384140
4141+ // ///////// END FLUSHER /////////////////////
4142+
4143+ // ///////// REDOER /////////////////////
4144+
4145+ /*
4146+ * Init the REDOER
4147+ * size: The input size, should equal to the number of hashed line
4148+ * */
4149+ PMEM_LOG_REDOER*
4150+ pm_log_redoer_init (
4151+ const size_t size) {
4152+ PMEM_LOG_REDOER* redoer;
4153+ ulint i;
4154+
4155+ redoer = static_cast <PMEM_LOG_REDOER*> (
4156+ ut_zalloc_nokey (sizeof (PMEM_LOG_REDOER)));
4157+
4158+ mutex_create (LATCH_ID_PM_LOG_REDOER, &redoer->mutex );
4159+
4160+ redoer->is_log_req_not_empty = os_event_create (" redoer_is_log_req_not_empty" );
4161+ redoer->is_log_req_full = os_event_create (" redoer_is_log_req_full" );
4162+ redoer->is_log_all_finished = os_event_create (" redoer_is_log_all_finished" );
4163+ redoer->is_log_all_closed = os_event_create (" redoer_is_log_all_closed" );
4164+ redoer->size = size;
4165+ redoer->tail = 0 ;
4166+ redoer->n_requested = 0 ;
4167+ redoer->is_running = false ;
4168+
4169+ redoer->hashed_line_arr = static_cast <PMEM_PAGE_LOG_HASHED_LINE**> ( calloc (size, sizeof (PMEM_PAGE_LOG_HASHED_LINE*)));
4170+ for (i = 0 ; i < size; i++) {
4171+ redoer->hashed_line_arr [i] = NULL ;
4172+ }
4173+
4174+ return redoer;
4175+ }
4176+
4177+ void
4178+ pm_log_redoer_close (
4179+ PMEM_LOG_REDOER* redoer) {
4180+ ulint i;
4181+
4182+ // wait for all workers finish their work
4183+ while (redoer->n_workers > 0 ) {
4184+ os_thread_sleep (10000 );
4185+ }
4186+
4187+ for (i = 0 ; i < redoer->size ; i++) {
4188+ if (redoer->hashed_line_arr [i]){
4189+ // free(buf->flusher->flush_list_arr[i]);
4190+ redoer->hashed_line_arr [i] = NULL ;
4191+ }
4192+
4193+ }
4194+
4195+ if (redoer->hashed_line_arr ){
4196+ free (redoer->hashed_line_arr );
4197+ redoer->hashed_line_arr = NULL ;
4198+ }
4199+
4200+ mutex_destroy (&redoer->mutex );
4201+
4202+ os_event_destroy (redoer->is_log_req_not_empty );
4203+ os_event_destroy (redoer->is_log_req_full );
4204+
4205+ os_event_destroy (redoer->is_log_all_finished );
4206+ os_event_destroy (redoer->is_log_all_closed );
4207+
4208+ if (redoer){
4209+ redoer = NULL ;
4210+ free (redoer);
4211+ }
4212+ // printf("free flusher ok\n");
4213+ }
4214+ /*
4215+ *The coordinator
4216+ Handle start/stop all workers
4217+ * */
4218+ extern " C"
4219+ os_thread_ret_t
4220+ DECLARE_THREAD (pm_log_redoer_coordinator)(
4221+ /* ===============================================*/
4222+ void * arg MY_ATTRIBUTE ((unused)))
4223+ /* !< in: a dummy parameter required by
4224+ os_thread_create */
4225+ {
4226+
4227+
4228+ my_thread_init ();
4229+
4230+ #ifdef UNIV_PFS_THREAD
4231+ pfs_register_thread (pm_log_redoer_thread_key);
4232+ #endif /* UNIV_PFS_THREAD */
4233+
4234+ #ifdef UNIV_DEBUG_THREAD_CREATION
4235+ ib::info () << " coordinator pm_log_flusher thread running, id "
4236+ << os_thread_pf (os_thread_get_curr_id ());
4237+ #endif /* UNIV_DEBUG_THREAD_CREATION */
4238+
4239+ PMEM_LOG_REDOER* redoer = gb_pmw->ppl ->redoer ;
4240+
4241+ redoer->is_running = true ;
4242+
4243+ while (!gb_pmw->ppl ->is_redoing_done ) {
4244+ os_event_wait (redoer->is_log_all_finished );
4245+
4246+ if (srv_shutdown_state != SRV_SHUTDOWN_NONE) {
4247+ break ;
4248+ }
4249+ // the workers are idle and the server is running, keep waiting
4250+ os_event_reset (redoer->is_log_all_finished );
4251+ } // end while thread
4252+
4253+ redoer->is_running = false ;
4254+ // trigger waiting workers to stop
4255+ os_event_set (redoer->is_log_req_not_empty );
4256+ // wait for all workers closed
4257+ printf (" wait all redoers close...\n " );
4258+ os_event_wait (redoer->is_log_all_closed );
4259+
4260+ printf (" all redoers closed\n " );
4261+ my_thread_end ();
4262+
4263+ os_thread_exit ();
4264+
4265+ OS_THREAD_DUMMY_RETURN;
4266+ }
4267+
4268+ /* Worker thread of log redoer.
4269+ * Managed by the coordinator thread
4270+ * number of threads are defined in header file
4271+ @return a dummy parameter */
4272+ extern " C"
4273+ os_thread_ret_t
4274+ DECLARE_THREAD (pm_log_redoer_worker)(
4275+ /* ==========================================*/
4276+ void * arg MY_ATTRIBUTE ((unused)))
4277+ /* !< in: a dummy parameter required by
4278+ os_thread_create */
4279+ {
4280+ ulint i;
4281+
4282+ PMEM_LOG_REDOER* redoer = gb_pmw->ppl ->redoer ;
4283+
4284+ PMEM_PAGE_LOG_HASHED_LINE* pline = NULL ;
4285+
4286+ my_thread_init ();
4287+
4288+ mutex_enter (&redoer->mutex );
4289+ redoer->n_workers ++;
4290+ os_event_reset (redoer->is_log_all_closed );
4291+ mutex_exit (&redoer->mutex );
4292+
4293+ while (true ) {
4294+ // worker thread wait until there is is_requested signal
4295+ retry:
4296+ os_event_wait (redoer->is_log_req_not_empty );
4297+
4298+ // waked up, looking for a hashed line and REDO it
4299+
4300+ if (redoer->n_remains == 0 ){
4301+ // do nothing
4302+ break ;
4303+ }
4304+
4305+ for (i = 0 ; i < redoer->size ; i++) {
4306+ mutex_enter (&redoer->mutex );
4307+
4308+ pline = redoer->hashed_line_arr [i];
4309+
4310+ if (pline != NULL && !pline->is_redoing )
4311+ {
4312+ pline->is_redoing = true ;
4313+ // do not hold the mutex during REDOing
4314+ mutex_exit (&redoer->mutex );
4315+
4316+ /* **this call REDOing for a line ***/
4317+ printf (" PMEM_REDO: start redoing line %zu ...\n " , pline->hashed_id );
4318+ bool is_err = pm_ppl_redo_line (gb_pmw->pop , gb_pmw->ppl , pline);
4319+ if (is_err){
4320+ printf (" PMEM_REDO: error redoing line %zu \n " , pline->hashed_id );
4321+ assert (0 );
4322+ }
4323+ printf (" PMEM_REDO: end redoing line %zu\n " , pline->hashed_id );
4324+ // test
4325+ // os_thread_sleep(10000);
4326+
4327+ mutex_enter (&redoer->mutex );
4328+ redoer->hashed_line_arr [i] = NULL ;
4329+ // redoer->n_requested--;
4330+ redoer->n_remains --;
4331+
4332+ if (redoer->n_remains == 0 ){
4333+ // this is the last REDO
4334+ mutex_exit (&redoer->mutex );
4335+ break ;
4336+ }
4337+ }
4338+ mutex_exit (&redoer->mutex );
4339+ } // end for
4340+
4341+ // after this for loop, all lines are either done REDO or REDOing by other threads, this thread has nothing to do
4342+ break ;
4343+ } // end while thread
4344+
4345+ mutex_enter (&redoer->mutex );
4346+ redoer->n_workers --;
4347+ if (redoer->n_workers == 0 ) {
4348+ printf (" The last log redoer is closing\n " );
4349+ // trigger the coordinator (the pm_ppl_redo) to wakeup
4350+ os_event_set (redoer->is_log_all_finished );
4351+ }
4352+ mutex_exit (&redoer->mutex );
4353+
4354+ my_thread_end ();
4355+
4356+ os_thread_exit ();
4357+
4358+ OS_THREAD_DUMMY_RETURN;
4359+ }
4360+ // ///////// END REDOER /////////////////////
4361+
41394362#ifdef UNIV_PFS_THREAD
41404363mysql_pfs_key_t pm_log_flusher_thread_key;
4364+ mysql_pfs_key_t pm_log_redoer_thread_key;
41414365#endif /* UNIV_PFS_THREAD */
41424366
41434367#endif // UNIV_PMEMOBJ_PART_PL
0 commit comments