Skip to content

shepik/mysqlslave

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

28 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

You can configure replication for your any daemon (this daemon has to use MYSQL database, version not below 5.1, and row-based replication has to be configured) with class CLogParser. Your daemon has to be child of CLogParser and has to define three virtual functions:

	int on_insert(const mysql::CTable& tbl, const mysql::CTable::TRows& rows);
	int on_update(const mysql::CTable& tbl, const mysql::CTable::TRows& rows, const mysql::CTable::TRows& old_rows);
	int on_delete(const mysql::CTable& tbl, const mysql::CTable::TRows& rows);

	tbl -- changed table
	rows -- new rows of the table (in on_delete "rows" -- deleted rows)
	old_rows -- old rows, which were changed by UPDATE-query

Class mysql::CTable::TRows is std::list<mysql::CRow>. These operators are defined for class CRow:

	const CValue& operator[](const std::string& name) const;
	const CValue& operator[](size_t i) const;

In order to get value from CValue in appropriate format, these functions are defined:

	typedef uint16_t TYear;
	typedef struct
	{
		TYear y;
		uint8_t m;
		uint8_t d;
	} TDate;
	typedef struct
	{
		uint8_t h;
		uint8_t m;
		uint8_t s;
	} TTime;
	typedef struct
	{
		TDate date;
		TTime time;
	} TDateTime;

	int8_t as_int8() const;
	int16_t as_int16() const;
	int32_t as_int32() const;
	int64_t as_int64() const;
	uint8_t as_uint8() const;
	uint16_t as_uint16() const;
	uint32_t as_uint32() const;
	uint64_t as_uint64() const; 

	double as_double() const;
	float as_float() const;

	bool as_boolean() const;

	time_t as_timestamp() const;
	TDateTime as_datetime() const;
	TDate as_date() const;
	TTime as_time() const;
	TYear as_year() const;

	std::string as_string() const;
	const char* as_string(size_t* length) const;
	void as_string(std::string& dst) const;

	uint64_t as_enum() const;
	uint64_t as_set() const;

In order to select tables for which you want to catch changes, you should use the function:

	void watch(const std::string& db_name, const std::string& table_name);

You need special thread for replication. It has to contain the loop:

	while (dispatch())
	{
		try
		{
			dispatch_events();
		}
		catch (const std::exception& e)
		{
			sleep(1);
		}
	}

You have to use stop_event_loop() for interrupt of replication loop.

Code example:

class my_class : public mysql::CLogParser
{
	...

	pthread_t th_repl;

	...

	~my_class()
	{
		...

		stop_event_loop();
		::pthread_kill(th_repl, SIGTERM);

		...
	}

	...

	void replication_thread_proc()
	{
		while (dispatch())
		{
			try
			{
				dispatch_events();
			}
			catch (const std::exception& e)
			{
				fprintf(stderr, "catch exception: %s\n", e.what());
				sleep(1);
			}
		}
	}

	void connect_mysql_repl(const char* host, const char* user, const char* passwd, int port, int slave_id)
	{
		set_connection_params(host, slave_id, user, passwd, port);
		watch("db1", "db1_table1");
		watch("db1", "db1_table2");
		watch("db2", "db2_table1");
		prepare();
	}

	int on_insert(const mysql::CTable& tbl, const mysql::CTable::TRows& rows)
	{
		if (strcasecmp(tbl.get_table_name(), "db1_table1") == 0)
		{
			for (mysql::CTable::TRows::const_iterator it = rows.begin(); it != rows.end(); ++it)
			{
				fprintf(stdout, "%u\n", *it["id"].as_uint32());
			}
		}
		return 0;
	}
	int on_update(const mysql::CTable& tbl, const mysql::CTable::TRows& rows, const mysql::CTable::TRows& old_rows)
	{
		return 0;
	}
	int on_delete(const mysql::CTable& tbl, const mysql::CTable::TRows& rows)
	{
		return 0;
	}

	...
};

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • C++ 100.0%