Skip to content

Commit

Permalink
MDEV-32216 option --parallel/-j for mariadb-dump to increase parallelism
Browse files Browse the repository at this point in the history
At the moment, it only works with --tab, to execute "SELECT INTO OUTFILE"
queries concurrently.

Uses connection_pool for concurrent execution.
  • Loading branch information
vaintroub committed Jan 29, 2024
1 parent ec5db64 commit 4532dae
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 10 deletions.
2 changes: 1 addition & 1 deletion client/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ SET_TARGET_PROPERTIES(mariadb-test PROPERTIES ENABLE_EXPORTS TRUE)
MYSQL_ADD_EXECUTABLE(mariadb-check mysqlcheck.c)
TARGET_LINK_LIBRARIES(mariadb-check ${CLIENT_LIB})

MYSQL_ADD_EXECUTABLE(mariadb-dump mysqldump.cc ../sql-common/my_user.c)
MYSQL_ADD_EXECUTABLE(mariadb-dump mysqldump.cc ../sql-common/my_user.c connection_pool.cc)
TARGET_LINK_LIBRARIES(mariadb-dump ${CLIENT_LIB})

MYSQL_ADD_EXECUTABLE(mariadb-import mysqlimport.c)
Expand Down
96 changes: 87 additions & 9 deletions client/mysqldump.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
#include "mysqld_error.h"

#include <welcome_copyright_notice.h> /* ORACLE_WELCOME_COPYRIGHT_NOTICE */

#include "connection_pool.h"
/* Exit codes */

#define EX_USAGE 1
Expand Down Expand Up @@ -194,7 +194,7 @@ FILE *stderror_file=0;

static uint opt_protocol= 0;
static char *opt_plugin_dir= 0, *opt_default_auth= 0;

static uint opt_parallel= 0;
/*
Dynamic_string wrapper functions. In this file use these
wrappers, they will terminate the process if there is
Expand Down Expand Up @@ -246,6 +246,8 @@ static HASH ignore_table, ignore_data;

static HASH ignore_database;

static async_pool::connection_pool connection_pool;

static struct my_option my_long_options[] =
{
{"all-databases", 'A',
Expand Down Expand Up @@ -526,6 +528,8 @@ static struct my_option my_long_options[] =
{"password", 'p',
"Password to use when connecting to server. If password is not given it's solicited on the tty.",
0, 0, 0, GET_STR, OPT_ARG, 0, 0, 0, 0, 0, 0},
{"parallel", 'j', "Number of dump table jobs executed in parallel (only with --tab option)",
&opt_parallel, &opt_parallel, 0, GET_UINT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
#ifdef _WIN32
{"pipe", 'W', "Use named pipes to connect to server.", 0, 0, 0, GET_NO_ARG,
NO_ARG, 0, 0, 0, 0, 0, 0},
Expand Down Expand Up @@ -1920,6 +1924,13 @@ static void free_resources()
else
fflush(md_result_file);
}
if (first_error && mysql)
{
connection_pool.for_each_connection(
[](MYSQL *c) { mysql_kill(mysql,c->thread_id);});
}
connection_pool.close();

if (get_table_name_result)
mysql_free_result(get_table_name_result);
if (routine_res)
Expand Down Expand Up @@ -4061,6 +4072,21 @@ static void vers_append_system_time(DYNAMIC_STRING* query_string)
}
}

/**
Completion handler for async queries in the pool.
Dies in case query produced an error.
@param mysql The connection that executed the query.
@param query The query that was executed.
@param success Whether the query was successful.
*/
static void send_query_completion_func(MYSQL* mysql, const char* query,
bool success, void*)
{
if (!success)
maybe_die(EX_MYSQLERR, "Couldn't execute async query '%s' (%s)", query,
mysql_error(mysql));
}

/*
Expand Down Expand Up @@ -4216,6 +4242,10 @@ static void dump_table(const char *table, const char *db, const uchar *hash_key,
dynstr_append_checked(&query_string, select_field_names.str);
}
dynstr_append_checked(&query_string, " FROM ");
char quoted_db_buf[NAME_LEN * 2 + 3];
char *qdatabase= quote_name(db, quoted_db_buf, opt_quoted);
dynstr_append_checked(&query_string, qdatabase);
dynstr_append_checked(&query_string, ".");
dynstr_append_checked(&query_string, result_table);

if (versioned)
Expand All @@ -4239,8 +4269,16 @@ static void dump_table(const char *table, const char *db, const uchar *hash_key,
my_free(order_by);
order_by= 0;
}

if (mysql_real_query(mysql, query_string.str, (ulong)query_string.length))
if (opt_parallel)
{
if (connection_pool.execute_async(query_string.str,send_query_completion_func,nullptr,true))
{
dynstr_free(&query_string);
DB_error(mysql, "when executing send_query 'SELECT INTO OUTFILE'");
DBUG_VOID_RETURN;
}
}
else if (mysql_real_query(mysql, query_string.str, (ulong)query_string.length))
{
dynstr_free(&query_string);
DB_error(mysql, "when executing 'SELECT INTO OUTFILE'");
Expand Down Expand Up @@ -7082,6 +7120,27 @@ static void dynstr_realloc_checked(DYNAMIC_STRING *str, ulong additional_size)
die(EX_MYSQLERR, DYNAMIC_STR_ERROR_MSG);
}

#define MAX_POOL_CONNECTIONS 256
static void init_connection_pool(uint n_connections)
{
MYSQL *conn[MAX_POOL_CONNECTIONS];

if (n_connections > array_elements(conn))
die(EX_USAGE, "Too many connections");

for (uint i= 0; i < n_connections; i++)
{
MYSQL *c= connect_to_db(current_host, current_user, opt_password);
if (!c)
{
for (uint j= 0; j < i; j++)
mysql_close(conn[j]);
die(EX_MYSQLERR, "Error during connection to DB");
}
conn[i]= c;
}
connection_pool.init(conn, n_connections);
}

int main(int argc, char **argv)
{
Expand Down Expand Up @@ -7118,15 +7177,24 @@ int main(int argc, char **argv)
}
}

if (connect_to_db(current_host, current_user, opt_password))
mysql= connect_to_db(current_host, current_user, opt_password);
if (!mysql)
{
free_resources();
exit(EX_MYSQLERR);
}
if (!path)
{
write_header(md_result_file, *argv);


if (opt_parallel)
{
verbose_msg("-- Warning: ignoring --parallel setting, it currently only "
"works together with --tab\n");
opt_parallel= 0;
}
}
else if (opt_parallel)
init_connection_pool(opt_parallel);

/* Check if the server support multi source */
if (mysql_get_server_version(mysql) >= 100000)
Expand Down Expand Up @@ -7179,8 +7247,15 @@ int main(int argc, char **argv)
goto err;
}

if (opt_single_transaction && start_transaction(mysql))
goto err;
if (opt_single_transaction)
{
if (start_transaction(mysql))
goto err;
connection_pool.for_each_connection([](MYSQL *c) {
if (start_transaction(c))
maybe_die(EX_MYSQLERR, "Failed to start transaction on connection ID %u", mysql->thread_id);
});
}

/* Add 'STOP SLAVE to beginning of dump */
if (opt_slave_apply && add_stop_slave())
Expand Down Expand Up @@ -7273,6 +7348,9 @@ int main(int argc, char **argv)
if (opt_delete_master_logs && purge_bin_logs_to(mysql, bin_log_name))
goto err;

/* wait for outstanding asynchronous queries */
connection_pool.wait_all();

/*
No reason to explicitly COMMIT the transaction, neither to explicitly
UNLOCK TABLES: these will be automatically be done by the server when we
Expand Down

0 comments on commit 4532dae

Please sign in to comment.