Skip to content

Commit

Permalink
[raft] bring in Raft_repl_observer
Browse files Browse the repository at this point in the history
Summary:
Almost a direct port of the feature in 5.6 except for minor visual changes and
some minor changes related to 8.0 thread handling

Also brings in the basic raft_listener_queue without the implemented functions

Differential Revision: D21670710

Use DBUG_TRACE instead of DBUG_ENTER

Summary:
several testcase failed in asan debug,

==106552==ERROR: AddressSanitizer: stack-buffer-underflow on address 0x7f38f3590078 at pc 0x0000093aa495 bp 0x7f38f358ffc0 sp 0x7f38f358ffb8
READ of size 4 at 0x7f38f3590078 thread T203
    #0 0x93aa494 in _db_enter_(char const*, int, char const*, unsigned int, _db_stack_frame_*)
    facebook#1 0x8d0b9bb in Relay_log_info::remove_logged_gtids
    facebook#2 0x8b1ec3f in trim_logged_gtid
    facebook#3 0x8c767cf in process_raft_queue

If Use DBUG_ENTER, then you need to use DBUG_RETURN to pop current frame in CODE_STATE.
If use DBUG_TRACE, it will pop current frame during .dtor

ps. small refactor changes for sql/rpl_handler.cc

Reviewed By: bhatvinay

Differential Revision: D28809418
  • Loading branch information
bhatvinay authored and Herman Lee committed Oct 3, 2023
1 parent 3123eba commit f8ff573
Show file tree
Hide file tree
Showing 6 changed files with 560 additions and 2 deletions.
112 changes: 112 additions & 0 deletions sql/raft_listener_queue_if.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright 2004-present Facebook. All Rights Reserved.

#pragma once

#include <future>
#include <map>
#include <vector>

/* Type of callback that raft plugin wants to invoke in the server */
enum class RaftListenerCallbackType {
SET_READ_ONLY = 1,
UNSET_READ_ONLY = 2,
TRIM_LOGGED_GTIDS = 3,
ROTATE_BINLOG = 4,
ROTATE_RELAYLOG = 5,
RAFT_LISTENER_THREADS_EXIT = 6,
RLI_RELAY_LOG_RESET = 7,
RESET_SLAVE = 8,
BINLOG_CHANGE_TO_APPLY = 9,
BINLOG_CHANGE_TO_BINLOG = 10,
STOP_SQL_THREAD = 11,
START_SQL_THREAD = 12,
STOP_IO_THREAD = 13,
CHANGE_MASTER = 14,
GET_COMMITTED_GTIDS = 15,
GET_EXECUTED_GTIDS = 16,
SET_BINLOG_DURABILITY = 17,
};

/* Callback argument, each type would just populate the fields needed for its
* callback */
class RaftListenerCallbackArg {
public:
explicit RaftListenerCallbackArg() {}

std::vector<std::string> trim_gtids = {};
std::pair<std::string, unsigned long long> log_file_pos = {};
bool val_bool;
uint32_t val_uint;
std::pair<std::string, unsigned int> master_instance;
std::string val_str;
std::map<std::string, unsigned int> val_sys_var_uint;
};

/* Result of the callback execution in the server. This will be set in the
* future's promise (in the QueueElement) and the invoker can get()/wait() for
* the result. Add more fields as needed */
class RaftListenerCallbackResult {
public:
explicit RaftListenerCallbackResult() {}

// Indicates if the callback was able to execute successfully
int error = 0;
std::vector<std::string> gtids;
std::string val_str;
};

class RaftListenerQueueIf {
public:
static const int RAFT_FLAGS_POSTAPPEND = 1;
static const int RAFT_FLAGS_NOOP = 2;

virtual ~RaftListenerQueueIf() {}

/* Defines the element of the queue. It consists of the callback type to be
* invoked and the argument (optional) for the callback */
struct QueueElement {
// Type of the callback to invoke in the server
RaftListenerCallbackType type;

// Argument to the callback
RaftListenerCallbackArg arg;

/* result of the callback will be fulfilled through this promise. If this
* is set, then the invoker should ensure tht he eventually calls
* get()/wait() to retrieve the result. Example:
*
* std::promise<RaftListenerCallbackResult> promise;
* std::future<RaftListenerCallbackResult> fut = promise.get_future();
*
* QueueElement e;
* e.type = RaftListenerCallbackType::SET_READ_ONLY;
* e.result = &promise;
* listener_queue.add(std::move(e));
* ....
* ....
* ....
* // Get the result when we want it. This wll block until the promise is
* // fullfilled by the raft listener thread after executing the callback
* RaftListenerCallbackResult result = fut.get();
*/
std::promise<RaftListenerCallbackResult> *result = nullptr;
};

/* Add an element to the queue. This will signal any listening threads
* after adding the element to the queue
*
* @param element QueueElement to add to queue
*
* @return 0 on success, 1 on error
*/
virtual int add(QueueElement element) = 0;

/* Get an element from the queue. This will block if there are no elements
* in the queue to be processed
*
* @return QueueElement to be processed next
*/
virtual QueueElement get() = 0;

virtual void deinit() = 0;
};
116 changes: 116 additions & 0 deletions sql/replication.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,13 @@
#include "rpl_context.h"
#include "sql/handler.h" // enum_tx_isolation

#include <queue>

struct MYSQL;
#ifdef INCL_DEFINED_IN_MYSQL_SERVER
extern bool enable_raft_plugin;
#endif
class RaftListenerQueueIf;

#ifdef __cplusplus
class THD;
Expand Down Expand Up @@ -729,6 +735,116 @@ typedef struct Binlog_relay_IO_observer {
applier_log_event_t applier_log_event;
} Binlog_relay_IO_observer;

/**
Raft replication observer parameter
*/
typedef struct Raft_replication_param {
uint32 server_id = 0;
const char *host_or_ip = nullptr;
int64_t term = -1;
int64_t index = -1;
} Raft_replication_param;

/**
Observe special events for Raft replication to work
*/
typedef struct Raft_replication_observer {
uint32 len;

/**
This callback is called before transaction commit
and after binlog sync.
For both non-transactional tables and transactional
tables this is called after binlog sync.
@param param The parameter for transaction observers
@retval 0 Sucess
@retval 1 Failure
*/
int (*before_commit)(Raft_replication_param *param);

/**
This callback is called before events of a txn are written to binlog file
@param param Observer common parameter
@param cache IO_CACHE containing binlog events for the txn
@param noop Is this a Raft NOOP event being faked as a Rotate Event
@retval 0 Sucess
@retval 1 Failure
*/
int (*before_flush)(Raft_replication_param *param, IO_CACHE *cache,
bool no_op);

/**
This callback is called once upfront to setup the appropriate
binlog file, io_cache and its mutexes
@param is_relay_log whether the file being registered is relay or binlog
@param log_file_cache the IO_CACHE pointer
@param log_prefix the prefix of logs e.g. /binlogs/binary-logs-3306
@param log_name the pointer to current log name
@param lock_log the mutex that protects the current log
@param lock_index the mutex that protects the index file
@param update_cond the condvar that is fired after writing to log
@param cur_log_ext a pointer the number of the file.
@param context context of the call (0 for 1st run, 1 for next time)
@retval 0 Sucess
@retval 1 Failure
*/
int (*setup_flush)(bool is_relay_log, IO_CACHE *log_file_cache,
const char *log_prefix, const char *log_name,
mysql_mutex_t *lock_log, mysql_mutex_t *lock_index,
mysql_cond_t *update_cond, ulong *cur_log_ext,
int context);

/**
* This callback is invoked by the server to gracefully shutdown the
* Raft threads
*/
int (*before_shutdown)();

/**
* @param raft_listener_queue - the listener queue in which to add requests
* @param s_uuid - the uuid of the server to be used as the INSTANCE UUID
* in Raft
* @param wal_dir_parent - the parent directory under which raft will create
* config metadata
* @param log_dir_parent - the parent directory under which raft will create
* metric logs
* @param raft_log_path_prefix - the prefix with the dirname path which tells
* @param s_hostname - the proper hostname of server which can be used in
* SMC and logging
* @param port - the port of the server
* raft where to find raft binlogs.
*/
int (*register_paths)(RaftListenerQueueIf *raft_listener_queue,
const std::string &s_uuid,
const std::string &wal_dir_parent,
const std::string &log_dir_parent,
const std::string &raft_log_path_prefix,
const std::string &s_hostname, uint64_t port);

/**
This callback is called after transaction commit to engine
@param param The parameter for the observers
@retval 0 Sucess
@retval 1 Failure
*/
int (*after_commit)(Raft_replication_param *param);
} Raft_replication_observer;

// Finer grained error code during deregister of observer
// Observer was not present in delegate (i.e. not
// previously added to delegate ), and should be safe
// to ignore.
#define MYSQL_REPLICATION_OBSERVER_NOT_FOUND 2

/**
Register a transaction observer
Expand Down

0 comments on commit f8ff573

Please sign in to comment.