@@ -72,6 +72,7 @@ Created 10/8/1995 Heikki Tuuri
72
72
#include " fil0fil.h"
73
73
#include " fil0crypt.h"
74
74
#include " fil0pagecompress.h"
75
+ #include " trx0types.h"
75
76
76
77
77
78
#include < my_service_manager.h>
@@ -2060,6 +2061,15 @@ static bool srv_task_execute()
2060
2061
return false ;
2061
2062
}
2062
2063
2064
+ std::mutex purge_thread_count_mtx;
2065
+ void srv_update_purge_thread_count (uint n)
2066
+ {
2067
+ std::lock_guard<std::mutex> lk (purge_thread_count_mtx);
2068
+ srv_n_purge_threads = n;
2069
+ srv_purge_thread_count_changed = 1 ;
2070
+ }
2071
+
2072
+ Atomic_counter<int > srv_purge_thread_count_changed;
2063
2073
2064
2074
/* * Do the actual purge operation.
2065
2075
@param[in,out] n_total_purged total number of purged pages
@@ -2072,7 +2082,7 @@ static uint32_t srv_do_purge(ulint* n_total_purged)
2072
2082
static ulint n_use_threads = 0 ;
2073
2083
static uint32_t rseg_history_len = 0 ;
2074
2084
ulint old_activity_count = srv_get_activity_count ();
2075
- const ulint n_threads = srv_n_purge_threads;
2085
+ static ulint n_threads = srv_n_purge_threads;
2076
2086
2077
2087
ut_a (n_threads > 0 );
2078
2088
ut_ad (!srv_read_only_mode);
@@ -2088,7 +2098,20 @@ static uint32_t srv_do_purge(ulint* n_total_purged)
2088
2098
}
2089
2099
2090
2100
do {
2091
- if (trx_sys.rseg_history_len > rseg_history_len
2101
+ if (UNIV_UNLIKELY (srv_purge_thread_count_changed)) {
2102
+ /* Read the fresh value of srv_n_purge_threads, reset
2103
+ the changed flag. Both variables are protected by
2104
+ purge_thread_count_mtx.
2105
+
2106
+ This code does not run concurrently, it is executed
2107
+ by a single purge_coordinator thread, and no races
2108
+ involving srv_purge_thread_count_changed are possible.
2109
+ */
2110
+
2111
+ std::lock_guard<std::mutex> lk (purge_thread_count_mtx);
2112
+ n_threads = n_use_threads = srv_n_purge_threads;
2113
+ srv_purge_thread_count_changed = 0 ;
2114
+ } else if (trx_sys.rseg_history_len > rseg_history_len
2092
2115
|| (srv_max_purge_lag > 0
2093
2116
&& rseg_history_len > srv_max_purge_lag)) {
2094
2117
@@ -2136,23 +2159,17 @@ static uint32_t srv_do_purge(ulint* n_total_purged)
2136
2159
2137
2160
static std::queue<THD*> purge_thds;
2138
2161
static std::mutex purge_thd_mutex;
2139
-
2140
- static void purge_create_background_thds (int n)
2141
- {
2142
- THD *thd= current_thd;
2143
- std::unique_lock<std::mutex> lk (purge_thd_mutex);
2144
- while (n--)
2145
- purge_thds.push (innobase_create_background_thd (" InnoDB purge worker" ));
2146
- set_current_thd (thd);
2147
- }
2148
-
2149
2162
extern void * thd_attach_thd (THD*);
2150
2163
extern void thd_detach_thd (void *);
2151
2164
2152
2165
THD* acquire_thd (void **ctx)
2153
2166
{
2154
2167
std::unique_lock<std::mutex> lk (purge_thd_mutex);
2155
- ut_a (!purge_thds.empty ());
2168
+ if (purge_thds.empty ()) {
2169
+ THD* thd = current_thd;
2170
+ purge_thds.push (innobase_create_background_thd (" InnoDB purge worker" ));
2171
+ set_current_thd (thd);
2172
+ }
2156
2173
THD* thd = purge_thds.front ();
2157
2174
purge_thds.pop ();
2158
2175
lk.unlock ();
@@ -2251,10 +2268,8 @@ static void purge_coordinator_callback(void*)
2251
2268
purge_state.m_running = 0 ;
2252
2269
}
2253
2270
2254
- void srv_init_purge_tasks (uint n_tasks )
2271
+ void srv_init_purge_tasks ()
2255
2272
{
2256
- purge_task_group.set_max_tasks (n_tasks - 1 );
2257
- purge_create_background_thds (n_tasks);
2258
2273
purge_coordinator_timer= srv_thread_pool->create_timer
2259
2274
(purge_coordinator_timer_callback, nullptr );
2260
2275
}
@@ -2310,6 +2325,7 @@ ulint srv_get_task_queue_length()
2310
2325
void srv_purge_shutdown ()
2311
2326
{
2312
2327
if (purge_sys.enabled ()) {
2328
+ srv_update_purge_thread_count (innodb_purge_threads_MAX);
2313
2329
while (!srv_purge_should_exit ()) {
2314
2330
ut_a (!purge_sys.paused ());
2315
2331
srv_wake_purge_thread_if_not_active ();
0 commit comments