Golang MySql binary log replication listener
Go Shell
Switch branches/tags
Nothing to show
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Failed to load latest commit information.
example readme, license Feb 4, 2015
tests fixes Feb 4, 2015
LICENSE readme, license Feb 4, 2015
README.md readme Feb 4, 2015
binlog_dump.go fixes Feb 4, 2015
binlog_dump_test.go fixes Feb 4, 2015
capability.go fixes Feb 4, 2015
connect_db.go fixes Feb 4, 2015
connect_db_test.go fixes Feb 4, 2015
connection.go fixes Feb 4, 2015
const.go fixes Feb 4, 2015
event_log.go fixes Feb 4, 2015
event_log_test.go fixes Feb 4, 2015
field_list.go fixes Feb 4, 2015
field_list_test.go fixes Feb 4, 2015
handshake.go fixes Feb 4, 2015
handshake_test.go fixes Feb 4, 2015
init_db.go fixes Feb 4, 2015
init_db_test.go fixes Feb 4, 2015
ok_packet.go fixes Feb 4, 2015
pack.go fixes Feb 4, 2015
pack_test.go fixes Feb 4, 2015
passwd.go fixes Feb 4, 2015
protocol.go fixes Feb 4, 2015
protocol_test.go fixes Feb 4, 2015
query.go fixes Feb 4, 2015
query_test.go fixes Feb 4, 2015
register_slave.go fixes Feb 4, 2015
register_slave_test.go fixes Feb 4, 2015
result_set.go fixes Feb 4, 2015
result_set_test.go fixes Feb 4, 2015

README.md

Go MySql binary log replication listener

Pure Go Implementation of MySQL replication protocol. This allow you to receive event like insert, update, delete with their datas and raw SQL queries. This code has been developed and maintained by Ven at January 2015.

Installation

go get github.com/2tvenom/myreplication

Test

The project is test with:

  • Go 1.3.3
  • MySQL 5.5, 5.6 and 5.7 (beta)
  • Docker 1.4.1 build 5bc2ff8 (functional tests)

It's not tested in real production situation.

Unit tests

go test

Docker tests

Functonal tests with Docker. Test statement based and row based replication. MySql versions 5.5, 5.6, 5.7.

cd tests
sudo ./test.sh

MySQL server settings

In your MySQL server configuration file you need to enable replication:

[mysqld]
server-id		 = 1
log_bin			 = /var/log/mysql/mysql-bin.log
expire_logs_days = 10
max_binlog_size  = 100M
binlog-format    = row #Row based replication

Example

package main

import (
	"fmt"
	"myreplication"
)

var (
	host     = "localhost"
	port     = 3307
	username = "admin"
	password = "admin"
)

func main() {
	newConnection := myreplication.NewConnection()
	serverId := uint32(2)
	err := newConnection.ConnectAndAuth(host, port, username, password)

	if err != nil {
		panic("Client not connected and not autentificate to master server with error:" + err.Error())
	}
	//Get position and file name
	pos, filename, err := newConnection.GetMasterStatus()

	if err != nil {
		panic("Master status fail: " + err.Error())
	}

	el, err := newConnection.StartBinlogDump(pos, filename, serverId)

	if err != nil {
		panic("Cant start bin log: " + err.Error())
	}
	events := el.GetEventChan()
	go func() {
		for {
			event := <-events

			switch e := event.(type) {
			case *myreplication.QueryEvent:
				//Output query event
				println(e.GetQuery())
			case *myreplication.IntVarEvent:
				//Output last insert_id  if statement based replication
				println(e.GetValue())
			case *myreplication.WriteEvent:
				//Output Write (insert) event
				println("Write", e.GetTable())
				//Rows loop
				for i, row := range e.GetRows() {
					//Columns loop
					for j, col := range row {
						//Output row number, column number, column type and column value
						println(fmt.Sprintf("%d %d %d %v", i, j, col.GetType(), col.GetValue()))
					}
				}
			case *myreplication.DeleteEvent:
				//Output delete event
				println("Delete", e.GetTable())
				for i, row := range e.GetRows() {
					for j, col := range row {
						println(fmt.Sprintf("%d %d %d %v", i, j, col.GetType(), col.GetValue()))
					}
				}
			case *myreplication.UpdateEvent:
				//Output update event
				println("Update", e.GetTable())
				//Output old data before update
				for i, row := range e.GetRows() {
					for j, col := range row {
						println(fmt.Sprintf("%d %d %d %v", i, j, col.GetType(), col.GetValue()))
					}
				}
				//Output new
				for i, row := range e.GetNewRows() {
					for j, col := range row {
						println(fmt.Sprintf("%d %d %d %v", i, j, col.GetType(), col.GetValue()))
					}
				}
			default:
			}
		}
	}()
	err = el.Start()
	println(err.Error())
}

Links

Licence

WTFPL