@@ -116,18 +116,13 @@ typedef struct wrk_itm
116
116
/* Thread syncronization data */
117
117
typedef struct thread_sync
118
118
{
119
+ ulint n_threads; /* !< Number of threads */
119
120
os_thread_id_t wthread_id; /* !< Identifier */
120
121
os_thread_t wthread; /* !< Thread id */
121
122
ib_wqueue_t *wq; /* !< Work Queue */
122
123
ib_wqueue_t *wr_cq; /* !< Write Completion Queue */
123
124
ib_wqueue_t *rd_cq; /* !< Read Completion Queue */
124
125
wthr_status_t wt_status; /* !< Worker thread status */
125
- ulint stat_universal_num_processed;
126
- /* !< Total number of pages
127
- processed by this thread */
128
- ulint stat_cycle_num_processed;
129
- /* !< Number of pages processed
130
- on this cycle */
131
126
mem_heap_t * wheap; /* !< Work heap where memory
132
127
is allocated */
133
128
wrk_t * work_item; /* !< Work items to be processed */
@@ -231,6 +226,7 @@ buf_mtflu_flush_pool_instance(
231
226
work_item->wr .min ,
232
227
work_item->wr .lsn_limit );
233
228
229
+
234
230
buf_flush_end (work_item->wr .buf_pool , work_item->wr .flush_type );
235
231
buf_flush_common (work_item->wr .flush_type , work_item->n_flushed );
236
232
@@ -239,28 +235,29 @@ buf_mtflu_flush_pool_instance(
239
235
240
236
#ifdef UNIV_DEBUG
241
237
/* *****************************************************************/ /* *
242
- Output work item list status,
238
+ Print flush statistics of work items.
243
239
*/
244
240
static
245
241
void
246
- mtflu_print_work_list (
247
- /* ==================*/
248
- wrk_t * wi_list ) /* !< in: Work item list */
242
+ mtflu_print_thread_stat (
243
+ /* ==================== */
244
+ wrk_t * work_item ) /* !< in: Work items */
249
245
{
250
- wrk_t * wi = wi_list ;
246
+ ulint stat_tot= 0 ;
251
247
ulint i=0 ;
252
248
253
- if (!wi_list) {
254
- fprintf (stderr, " list NULL\n " );
255
- }
249
+ for (i=0 ; i< MTFLUSH_MAX_WORKER; i++) {
250
+ stat_tot+=work_item[i].n_flushed ;
256
251
257
- while (wi) {
258
- fprintf (stderr, " -\t [%p]\t [%s]\t [%lu] > %p\n " ,
259
- wi, (wi->id_usr == -1 )?" free" :" Busy" , wi->n_flushed , wi->next );
260
- wi = wi->next ;
261
- i++;
262
- }
263
- fprintf (stderr, " list len: %d\n " , i);
252
+ fprintf (stderr, " MTFLUSH: Thread[%lu] stat [%lu]\n " ,
253
+ work_item[i].id_usr ,
254
+ work_item[i].n_flushed );
255
+
256
+ if (work_item[i].next == NULL ) {
257
+ break ; /* No more filled work items */
258
+ }
259
+ }
260
+ fprintf (stderr, " MTFLUSH: Stat-Total:%lu\n " , stat_tot);
264
261
}
265
262
#endif /* UNIV_DEBUG */
266
263
@@ -282,10 +279,6 @@ mtflush_service_io(
282
279
mtflush_io->wt_status = WTHR_SIG_WAITING;
283
280
work_item = (wrk_t *)ib_wqueue_timedwait (mtflush_io->wq , max_wait_usecs);
284
281
285
- #ifdef UNIV_DEBUG
286
- mtflu_print_work_list (mtflush_io->work_item );
287
- #endif
288
-
289
282
if (work_item) {
290
283
mtflush_io->wt_status = WTHR_RUNNING;
291
284
} else {
@@ -345,10 +338,28 @@ DECLARE_THREAD(mtflush_io_thread)(
345
338
void * arg)
346
339
{
347
340
thread_sync_t *mtflush_io = ((thread_sync_t *)arg);
341
+ #ifdef UNIV_DEBUG
342
+ ib_uint64_t stat_universal_num_processed = 0 ;
343
+ ib_uint64_t stat_cycle_num_processed = 0 ;
344
+ wrk_t * work_item = mtflush_io[0 ].work_item ;
345
+ ulint i;
346
+ #endif
348
347
349
348
while (srv_shutdown_state != SRV_SHUTDOWN_EXIT_THREADS) {
350
349
mtflush_service_io (mtflush_io);
351
- mtflush_io->stat_cycle_num_processed = 0 ;
350
+
351
+ #ifdef UNIV_DEBUG
352
+ for (i=0 ; i < MTFLUSH_MAX_WORKER; i++) {
353
+ stat_cycle_num_processed+= work_item[i].n_flushed ;
354
+ }
355
+
356
+ stat_universal_num_processed+=stat_cycle_num_processed;
357
+ stat_cycle_num_processed = 0 ;
358
+ fprintf (stderr, " MTFLUSH_IO_THREAD: total %lu cycle %lu\n " ,
359
+ stat_universal_num_processed,
360
+ stat_cycle_num_processed);
361
+ mtflu_print_thread_stat (work_item);
362
+ #endif
352
363
}
353
364
354
365
/* This should make sure that all current work items are
@@ -458,13 +469,16 @@ buf_mtflu_handler_init(
458
469
work_items = (wrk_t *)mem_heap_alloc (mtflush_heap,
459
470
MTFLUSH_MAX_WORKER * sizeof (wrk_t ));
460
471
ut_a (work_items != NULL );
472
+ memset (work_items, 0 , sizeof (wrk_t ) * MTFLUSH_MAX_WORKER);
473
+ memset (mtflush_ctx, 0 , sizeof (thread_sync_t ) * MTFLUSH_MAX_WORKER);
461
474
462
475
/* Initialize work items */
463
- mtflu_setup_work_items (work_items, MTFLUSH_MAX_WORKER );
476
+ mtflu_setup_work_items (work_items, n_threads );
464
477
465
478
/* Create threads for page-compression-flush */
466
479
for (i=0 ; i < n_threads; i++) {
467
480
os_thread_id_t new_thread_id;
481
+ mtflush_ctx[i].n_threads = n_threads;
468
482
mtflush_ctx[i].wq = mtflush_work_queue;
469
483
mtflush_ctx[i].wr_cq = mtflush_write_comp_queue;
470
484
mtflush_ctx[i].rd_cq = mtflush_read_comp_queue;
@@ -531,67 +545,23 @@ buf_mtflu_flush_work_items(
531
545
per_pool_pages_flushed[i] = done_wi->n_flushed ;
532
546
}
533
547
534
- if (done_wi->id_usr == -1 &&
548
+ if (( int ) done_wi->id_usr == -1 &&
535
549
done_wi->wi_status == WRK_ITEM_SET ) {
536
550
fprintf (stderr,
537
- " **Set/Unused work_item[%d ] flush_type=%lu\n " ,
551
+ " **Set/Unused work_item[%lu ] flush_type=%lu\n " ,
538
552
i,
539
553
done_wi->wr .flush_type );
540
554
ut_a (0 );
541
555
}
542
556
543
557
n_flushed+= done_wi->n_flushed ;
544
- /* Reset for next round*/
545
- mtflush_ctx->work_item [i].id_usr = -1 ;
546
-
547
558
i++;
548
559
}
549
560
}
550
561
551
562
return (n_flushed);
552
563
}
553
564
554
- /* ******************************************************************/ /* *
555
- Flushes dirty blocks from the end of the LRU list and also
556
- puts replaceable clean pages from the end of the LRU list to the free
557
- list.
558
- NOTE: The calling thread is not allowed to own any latches on pages!
559
- @return true if a batch was queued successfully. false if another batch
560
- of same type was already running. */
561
- bool
562
- buf_mtflu_flush_LRU (
563
- /* ================*/
564
- buf_pool_t * buf_pool, /* !< in/out: buffer pool instance */
565
- ulint min_n, /* !< in: wished minimum mumber of blocks
566
- flushed (it is not guaranteed that the
567
- actual number is that big, though) */
568
- ulint* n_processed) /* !< out: the number of pages
569
- which were processed is passed
570
- back to caller. Ignored if NULL */
571
- {
572
- ulint page_count;
573
-
574
- if (n_processed) {
575
- *n_processed = 0 ;
576
- }
577
-
578
- if (!buf_flush_start (buf_pool, BUF_FLUSH_LRU)) {
579
- return (false );
580
- }
581
-
582
- page_count = buf_flush_batch (buf_pool, BUF_FLUSH_LRU, min_n, 0 );
583
-
584
- buf_flush_end (buf_pool, BUF_FLUSH_LRU);
585
-
586
- buf_flush_common (BUF_FLUSH_LRU, page_count);
587
-
588
- if (n_processed) {
589
- *n_processed = page_count;
590
- }
591
-
592
- return (true );
593
- }
594
-
595
565
/* ******************************************************************/ /* *
596
566
Multi-threaded version of buf_flush_list
597
567
*/
0 commit comments