Skip to content

Commit e129555

Browse files
committed
MDEV-20372 thread_pool_info fails randomly in 10.5
Rework stats a bit, so we're not missing any queue_get() now. Don't do stats_reset_table(), if generic threadpool is off.
1 parent f991c41 commit e129555

File tree

3 files changed

+32
-20
lines changed

3 files changed

+32
-20
lines changed

sql/thread_pool_info.cc

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -194,10 +194,10 @@ static int stats_fill_table(THD* thd, TABLE_LIST* tables, COND*)
194194
table->field[4]->store(counters->wakes_due_to_stall, true);
195195
table->field[5]->store(counters->throttles, true);
196196
table->field[6]->store(counters->stalls, true);
197-
table->field[7]->store(counters->polls_by_listener, true);
198-
table->field[8]->store(counters->polls_by_worker, true);
199-
table->field[9]->store(counters->dequeues_by_listener, true);
200-
table->field[10]->store(counters->dequeues_by_worker, true);
197+
table->field[7]->store(counters->polls[(int)operation_origin::LISTENER], true);
198+
table->field[8]->store(counters->polls[(int)operation_origin::WORKER], true);
199+
table->field[9]->store(counters->dequeues[(int)operation_origin::LISTENER], true);
200+
table->field[10]->store(counters->dequeues[(int)operation_origin::WORKER], true);
201201
mysql_mutex_unlock(&group->mutex);
202202
if (schema_table_store_record(thd, table))
203203
return 1;
@@ -207,6 +207,9 @@ static int stats_fill_table(THD* thd, TABLE_LIST* tables, COND*)
207207

208208
static int stats_reset_table()
209209
{
210+
if (!all_groups)
211+
return 0;
212+
210213
for (uint i = 0; i < threadpool_max_size && all_groups[i].pollfd != INVALID_HANDLE_VALUE; i++)
211214
{
212215
thread_group_t* group = &all_groups[i];

sql/threadpool_generic.cc

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ static void io_poll_close(TP_file_handle fd)
4545
#endif
4646
}
4747

48-
4948
/** Maximum number of native events a listener can read in one go */
5049
#define MAX_EVENTS 1024
5150

@@ -435,6 +434,16 @@ static TP_connection_generic *queue_get(thread_group_t *thread_group)
435434
DBUG_RETURN(0);
436435
}
437436

437+
static TP_connection_generic* queue_get(thread_group_t* group, operation_origin origin)
438+
{
439+
auto ret = queue_get(group);
440+
if (ret)
441+
{
442+
TP_INCREMENT_GROUP_COUNTER(group, dequeues[(int)origin]);
443+
}
444+
return ret;
445+
}
446+
438447
static bool is_queue_empty(thread_group_t *thread_group)
439448
{
440449
for (int i=0; i < NQUEUES; i++)
@@ -684,7 +693,7 @@ static TP_connection_generic * listener(worker_thread_t *current_thread,
684693
break;
685694

686695
cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, -1);
687-
TP_INCREMENT_GROUP_COUNTER(thread_group, polls_by_listener);
696+
TP_INCREMENT_GROUP_COUNTER(thread_group, polls[(int)operation_origin::LISTENER]);
688697
if (cnt <=0)
689698
{
690699
DBUG_ASSERT(thread_group->shutdown);
@@ -750,7 +759,7 @@ static TP_connection_generic * listener(worker_thread_t *current_thread,
750759
if (listener_picks_event)
751760
{
752761
/* Handle the first event. */
753-
retval= queue_get(thread_group);
762+
retval= queue_get(thread_group, operation_origin::LISTENER);
754763
mysql_mutex_unlock(&thread_group->mutex);
755764
break;
756765
}
@@ -1130,10 +1139,9 @@ TP_connection_generic *get_event(worker_thread_t *current_thread,
11301139
/* Check if queue is not empty */
11311140
if (!oversubscribed)
11321141
{
1133-
connection = queue_get(thread_group);
1142+
connection = queue_get(thread_group, operation_origin::WORKER);
11341143
if(connection)
11351144
{
1136-
TP_INCREMENT_GROUP_COUNTER(thread_group,dequeues_by_worker);
11371145
break;
11381146
}
11391147
}
@@ -1146,10 +1154,7 @@ TP_connection_generic *get_event(worker_thread_t *current_thread,
11461154
mysql_mutex_unlock(&thread_group->mutex);
11471155

11481156
connection = listener(current_thread, thread_group);
1149-
if (connection)
1150-
{
1151-
TP_INCREMENT_GROUP_COUNTER(thread_group, dequeues_by_listener);
1152-
}
1157+
11531158
mysql_mutex_lock(&thread_group->mutex);
11541159
thread_group->active_thread_count++;
11551160
/* There is no listener anymore, it just returned. */
@@ -1167,11 +1172,11 @@ TP_connection_generic *get_event(worker_thread_t *current_thread,
11671172
{
11681173
native_event ev[MAX_EVENTS];
11691174
int cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, 0);
1170-
TP_INCREMENT_GROUP_COUNTER(thread_group, polls_by_worker);
1175+
TP_INCREMENT_GROUP_COUNTER(thread_group, polls[(int)operation_origin::WORKER]);
11711176
if (cnt > 0)
11721177
{
11731178
queue_put(thread_group, ev, cnt);
1174-
connection= queue_get(thread_group);
1179+
connection= queue_get(thread_group,operation_origin::WORKER);
11751180
break;
11761181
}
11771182
}

sql/threadpool_generic.h

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,12 @@ typedef I_P_List<TP_connection_generic,
108108

109109
const int NQUEUES = 2; /* We have high and low priority queues*/
110110

111+
enum class operation_origin
112+
{
113+
WORKER,
114+
LISTENER
115+
};
116+
111117
struct thread_group_counters_t
112118
{
113119
ulonglong thread_creations;
@@ -116,10 +122,8 @@ struct thread_group_counters_t
116122
ulonglong wakes_due_to_stall;
117123
ulonglong throttles;
118124
ulonglong stalls;
119-
ulonglong dequeues_by_worker;
120-
ulonglong dequeues_by_listener;
121-
ulonglong polls_by_listener;
122-
ulonglong polls_by_worker;
125+
ulonglong dequeues[2];
126+
ulonglong polls[2];
123127
};
124128

125129
struct MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) thread_group_t
@@ -143,7 +147,7 @@ struct MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) thread_group_t
143147
thread_group_counters_t counters;
144148
};
145149

146-
#define TP_INCREMENT_GROUP_COUNTER(group,var) group->counters.var++;
150+
#define TP_INCREMENT_GROUP_COUNTER(group,var) do {group->counters.var++;}while(0)
147151

148152
extern thread_group_t* all_groups;
149153
#endif

0 commit comments

Comments
 (0)