Skip to content

Commit

Permalink
MDEV-33627 refactor threading in mariadb-import
Browse files Browse the repository at this point in the history
Use threadpool, instead of one-thread-and-connection-per-table
  • Loading branch information
vaintroub committed Jul 16, 2024
1 parent c483c5c commit 04988d8
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 126 deletions.
3 changes: 2 additions & 1 deletion client/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ MYSQL_ADD_EXECUTABLE(mariadb-dump mysqldump.cc ../sql-common/my_user.c connectio
TARGET_LINK_LIBRARIES(mariadb-dump ${CLIENT_LIB})

MYSQL_ADD_EXECUTABLE(mariadb-import mysqlimport.cc)
TARGET_LINK_LIBRARIES(mariadb-import ${CLIENT_LIB})
target_include_directories(mariadb-import PRIVATE ${CMAKE_SOURCE_DIR}/tpool)
target_link_libraries(mariadb-import PRIVATE tpool ${CLIENT_LIB})

MYSQL_ADD_EXECUTABLE(mariadb-upgrade mysql_upgrade.c COMPONENT Server)
TARGET_LINK_LIBRARIES(mariadb-upgrade ${CLIENT_LIB})
Expand Down
165 changes: 40 additions & 125 deletions client/mysqlimport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,8 @@

#include <welcome_copyright_notice.h> /* ORACLE_WELCOME_COPYRIGHT_NOTICE */


/* Global Thread counter */
uint counter= 0;
pthread_mutex_t init_mutex;
pthread_mutex_t counter_mutex;
pthread_cond_t count_threshhold;
#include <tpool.h>
tpool::thread_pool *thread_pool;

static void db_error_with_table(MYSQL *mysql, char *table);
static void db_error(MYSQL *mysql);
Expand Down Expand Up @@ -445,13 +441,10 @@ static MYSQL *db_connect(char *host, char *database,
fprintf(stdout, "Connecting to %s\n", host ? host : "localhost");
if (opt_use_threads && !lock_tables)
{
pthread_mutex_lock(&init_mutex);
if (!(mysql= mysql_init(NULL)))
{
pthread_mutex_unlock(&init_mutex);
return 0;
}
pthread_mutex_unlock(&init_mutex);
}
else
if (!(mysql= mysql_init(NULL)))
Expand Down Expand Up @@ -496,6 +489,8 @@ static MYSQL *db_connect(char *host, char *database,
if (ignore_foreign_keys)
mysql_query(mysql, "set foreign_key_checks= 0;");

if (mysql_query(mysql, "/*!40101 set @@character_set_database=binary */;"))
db_error(mysql);
return mysql;
}

Expand All @@ -514,14 +509,10 @@ static void safe_exit(int error, MYSQL *mysql)
if (error && ignore_errors)
return;

/* in multi-threaded mode protect from concurrent safe_exit's */
if (counter)
pthread_mutex_lock(&counter_mutex);

if (mysql)
mysql_close(mysql);

if (counter)
if (thread_pool)
{
/* dirty exit. some threads are running,
memory is not freed, openssl not deinitialized */
Expand Down Expand Up @@ -603,49 +594,42 @@ static char *field_escape(char *to,const char *from,uint length)
return to;
}

int exitcode= 0;

pthread_handler_t worker_thread(void *arg)
std::atomic<int> exitcode;
void set_exitcode(int code)
{
int error;
char *raw_table_name= (char *)arg;
MYSQL *mysql= 0;
int expected= 0;
exitcode.compare_exchange_strong(expected,code);
}

if (mysql_thread_init())
goto error;

if (!(mysql= db_connect(current_host,current_db,current_user,opt_password)))
{
goto error;
}
thread_local MYSQL *thread_local_mysql;

if (mysql_query(mysql, "/*!40101 set @@character_set_database=binary */;"))
{
db_error(mysql); /* We shall continue here, if --force was given */
goto error;
}

void load_single_table(void *arg)
{
int error;
char *raw_table_name= (char *)arg;
MYSQL *mysql= thread_local_mysql;
/*
We are not currently catching the error here.
*/
if((error= write_to_table(raw_table_name, mysql)))
if (exitcode == 0)
exitcode= error;

error:
if (mysql)
db_disconnect(current_host, mysql);
set_exitcode(error);
}

pthread_mutex_lock(&counter_mutex);
counter--;
pthread_cond_signal(&count_threshhold);
pthread_mutex_unlock(&counter_mutex);
static void tpool_thread_init(void)
{
mysql_thread_init();
thread_local_mysql= db_connect(current_host,current_db,current_user,opt_password);
}
static void tpool_thread_exit(void)
{
if (thread_local_mysql)
db_disconnect(current_host,thread_local_mysql);
mysql_thread_end();
pthread_exit(0);
return 0;
}


#include <vector>
int main(int argc, char **argv)
{
int error=0;
Expand All @@ -668,102 +652,33 @@ int main(int argc, char **argv)

if (opt_use_threads && !lock_tables)
{
char **save_argv;
uint worker_thread_count= 0, table_count= 0, i= 0;
pthread_t *worker_threads; /* Thread descriptor */
pthread_attr_t attr; /* Thread attributes */
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr,
PTHREAD_CREATE_JOINABLE);

pthread_mutex_init(&init_mutex, NULL);
pthread_mutex_init(&counter_mutex, NULL);
pthread_cond_init(&count_threshhold, NULL);

/* Count the number of tables. This number denotes the total number
of threads spawn.
*/
save_argv= argv;
for (table_count= 0; *argv != NULL; argv++)
table_count++;
argv= save_argv;

if (!(worker_threads= (pthread_t*) my_malloc(PSI_NOT_INSTRUMENTED,
table_count * sizeof(*worker_threads), MYF(0))))
return -2;

for (; *argv != NULL; argv++) /* Loop through tables */
{
pthread_mutex_lock(&counter_mutex);
while (counter == opt_use_threads)
{
struct timespec abstime;

set_timespec(abstime, 3);
pthread_cond_timedwait(&count_threshhold, &counter_mutex, &abstime);
}
/* Before exiting the lock we set ourselves up for the next thread */
counter++;
pthread_mutex_unlock(&counter_mutex);
/* now create the thread */
if (pthread_create(&worker_threads[worker_thread_count], &attr,
worker_thread, (void *)*argv) != 0)
{
pthread_mutex_lock(&counter_mutex);
counter--;
pthread_mutex_unlock(&counter_mutex);
fprintf(stderr,"%s: Could not create thread\n", my_progname);
continue;
}
worker_thread_count++;
}
thread_pool= tpool::create_thread_pool_generic(opt_use_threads,opt_use_threads);
thread_pool->set_thread_callbacks(tpool_thread_init,tpool_thread_exit);

/*
We loop until we know that all children have cleaned up.
*/
pthread_mutex_lock(&counter_mutex);
while (counter)
{
struct timespec abstime;

set_timespec(abstime, 3);
pthread_cond_timedwait(&count_threshhold, &counter_mutex, &abstime);
}
pthread_mutex_unlock(&counter_mutex);
pthread_mutex_destroy(&init_mutex);
pthread_mutex_destroy(&counter_mutex);
pthread_cond_destroy(&count_threshhold);
pthread_attr_destroy(&attr);
std::vector<tpool::task> all_tasks;
for (int i=0; argv[i]; i++)
all_tasks.push_back(tpool::task(load_single_table, argv[i]));

for(i= 0; i < worker_thread_count; i++)
{
if (pthread_join(worker_threads[i], NULL))
fprintf(stderr,"%s: Could not join worker thread.\n", my_progname);
}
for (auto &t: all_tasks)
thread_pool->submit_task(&t);

my_free(worker_threads);
delete thread_pool;
thread_pool= nullptr;
}
else
{
MYSQL *mysql= 0;
if (!(mysql= db_connect(current_host,current_db,current_user,opt_password)))
MYSQL *mysql= db_connect(current_host,current_db,current_user,opt_password);
if (!mysql)
{
free_defaults(argv_to_free);
return(1); /* purecov: dead code */
}

if (mysql_query(mysql, "/*!40101 set @@character_set_database=binary */;"))
{
db_error(mysql); /* We shall continue here, if --force was given */
return(1);
}

if (lock_tables)
lock_table(mysql, argc, argv);
for (; *argv != NULL; argv++)
if ((error= write_to_table(*argv, mysql)))
if (exitcode == 0)
exitcode= error;
set_exitcode(error);
db_disconnect(current_host, mysql);
}
safe_exit(0, 0);
Expand Down

0 comments on commit 04988d8

Please sign in to comment.