Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 15 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
go-mysql-elasticsearch is a service syncing your MySQL data into Elasticsearch automatically.
go-mysql-elasticsearch is a service syncing your MySQL data into Elasticsearch automatically.

It uses `mysqldump` to fetch the origin data at first, then syncs data incrementally with binlog.

## Install
## Install

+ Install Go and set your [GOPATH](https://golang.org/doc/code.html#GOPATH)
+ Install godep `go get github.com/tools/godep`
Expand All @@ -22,9 +22,9 @@ It uses `mysqldump` to fetch the origin data at first, then syncs data increment
## Notice

+ binlog format must be **row**.
+ binlog row image must be **full** for MySQL, you may lost some field data if you update PK data in MySQL with minimal or noblob binlog row image. MariaDB only supports full row image.
+ binlog row image must be **full** for MySQL, you may lost some field data if you update PK data in MySQL with minimal or noblob binlog row image. MariaDB only supports full row image.
+ Can not alter table format at runtime.
+ MySQL table which will be synced must have a PK(primary key), multi columns PK is allowed now, e,g, if the PKs is (a, b), we will use "a:b" as the key. The PK data will be used as "id" in Elasticsearch.
+ MySQL table which will be synced must have a PK(primary key), multi columns PK is allowed now, e,g, if the PKs is (a, b), we will use "a:b" as the key. The PK data will be used as "id" in Elasticsearch.
+ You should create the associated mappings in Elasticsearch first, I don't think using the default mapping is a wise decision, you must know how to search accurately.
+ `mysqldump` must exist in the same node with go-mysql-elasticsearch, if not, go-mysql-elasticsearch will try to sync binlog only.
+ Don't change too many rows at same time in one SQL.
Expand All @@ -45,14 +45,16 @@ schema = "test_1"
tables = ["t3", t4]
```

`schema` is the database name, and `tables` includes the table need to be synced.
`schema` is the database name, and `tables` includes the table need to be synced.

## Rule

By default, go-mysql-elasticsearch will use MySQL table name as the Elasticserach's index and type name, use MySQL table field name as the Elasticserach's field name.
e.g, if a table named blog, the default index and type in Elasticserach are both named blog, if the table field named title,
By default, go-mysql-elasticsearch will use MySQL table name as the Elasticserach's index and type name, use MySQL table field name as the Elasticserach's field name.
e.g, if a table named blog, the default index and type in Elasticserach are both named blog, if the table field named title,
the default field name is also named title.

In addition, one-to-many join ( [parent-child relationship](https://www.elastic.co/guide/en/elasticsearch/guide/current/parent-child.html) in Elasticsearch ) is supported. Simply specify the field name for `parent` property.

Rule can let you change this name mapping. Rule format in config file is below:

```
Expand All @@ -61,6 +63,7 @@ schema = "test"
table = "t1"
index = "t"
type = "t"
parent = "parent_id"

[rule.field]
title = "my_title"
Expand All @@ -70,7 +73,7 @@ In the example above, we will use a new index and type both named "t" instead of

## Wildcard table

go-mysql-elasticsearch only allows you determind which table to be synced, but sometimes, if you split a big table into multi sub tables, like 1024, table_0000, table_0001, ... table_1023, it is very hard to write rules for every table.
go-mysql-elasticsearch only allows you determind which table to be synced, but sometimes, if you split a big table into multi sub tables, like 1024, table_0000, table_0001, ... table_1023, it is very hard to write rules for every table.

go-mysql-elasticserach supports using wildcard table, e.g:

Expand All @@ -86,18 +89,18 @@ index = "river"
type = "river"
```

"test_river_[0-9]{4}" is a wildcard table definition, which represents "test_river_0000" to "test_river_9999", at the same time, the table in the rule must be same as it.
"test_river_[0-9]{4}" is a wildcard table definition, which represents "test_river_0000" to "test_river_9999", at the same time, the table in the rule must be same as it.

At the above example, if you have 1024 sub tables, all tables will be synced into Elasticsearch with index "river" and type "river".
At the above example, if you have 1024 sub tables, all tables will be synced into Elasticsearch with index "river" and type "river".

## Why not other rivers?

Although there are some other MySQL rivers for Elasticsearch, like [elasticsearch-river-jdbc](https://github.com/jprante/elasticsearch-river-jdbc), [elasticsearch-river-mysql](https://github.com/scharron/elasticsearch-river-mysql), I still want to build a new one with Go, why?

+ Customization, I want to decide which table to be synced, the associated index and type name, or even the field name in Elasticsearch.
+ Incremental update with binlog, and can resume from the last sync position when the service starts again.
+ Incremental update with binlog, and can resume from the last sync position when the service starts again.
+ A common sync framework not only for Elasticsearch but also for others, like memcached, redis, etc...
+ Wildcard tables support, we have many sub tables like table_0000 - table_1023, but want use a unique Elasticsearch index and type.
+ Wildcard tables support, we have many sub tables like table_0000 - table_1023, but want use a unique Elasticsearch index and type.

## Todo

Expand Down
31 changes: 31 additions & 0 deletions elastic/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type BulkRequest struct {
Index string
Type string
ID string
Parent string

Data map[string]interface{}
}
Expand All @@ -70,6 +71,9 @@ func (r *BulkRequest) bulk(buf *bytes.Buffer) error {
if len(r.ID) > 0 {
metaData["_id"] = r.ID
}
if len(r.Parent) > 0 {
metaData["_parent"] = r.Parent
}

meta[r.Action] = metaData

Expand Down Expand Up @@ -190,6 +194,32 @@ func (c *Client) DoBulk(url string, items []*BulkRequest) (*BulkResponse, error)
return ret, err
}

func (c *Client) CreateMapping(index string, docType string, mapping map[string]interface{}) error {
reqUrl := fmt.Sprintf("http://%s/%s", c.Addr,
url.QueryEscape(index))

r, err := c.Do("HEAD", reqUrl, nil)
if err != nil {
return err
}

// index doesn't exist, create index first
if r.Code != http.StatusOK {
_, err = c.Do("POST", reqUrl, nil)

if err != nil {
return err
}
}

reqUrl = fmt.Sprintf("http://%s/%s/%s/_mapping", c.Addr,
url.QueryEscape(index),
url.QueryEscape(docType))

_, err = c.Do("POST", reqUrl, mapping)
return err
}

func (c *Client) DeleteIndex(index string) error {
reqUrl := fmt.Sprintf("http://%s/%s", c.Addr,
url.QueryEscape(index))
Expand Down Expand Up @@ -266,6 +296,7 @@ func (c *Client) Delete(index string, docType string, id string) error {
}
}

// only support parent in 'Bulk' related apis
func (c *Client) Bulk(items []*BulkRequest) (*BulkResponse, error) {
reqUrl := fmt.Sprintf("http://%s/_bulk", c.Addr)

Expand Down
33 changes: 33 additions & 0 deletions elastic/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,36 @@ func (s *elasticTestSuite) TestSimple(c *C) {
c.Assert(err, IsNil)
c.Assert(resp.Errors, Equals, false)
}

// this requires a parent setting in _mapping
func (s *elasticTestSuite) TestParent(c *C) {
index := "dummy"
docType := "comment"

items := make([]*BulkRequest, 10)

for i := 0; i < 10; i++ {
id := fmt.Sprintf("%d", i)
req := new(BulkRequest)
req.Action = ActionIndex
req.ID = id
req.Data = makeTestData(fmt.Sprintf("abc %d", i), fmt.Sprintf("hello world %d", i))
req.Parent = "1"
items[i] = req
}

resp, err := s.c.IndexTypeBulk(index, docType, items)
c.Assert(err, IsNil)
c.Assert(resp.Errors, Equals, false)

for i := 0; i < 10; i++ {
id := fmt.Sprintf("%d", i)
req := new(BulkRequest)
req.Action = ActionDelete
req.ID = id
items[i] = req
}
resp, err = s.c.IndexTypeBulk(index, docType, items)
c.Assert(err, IsNil)
c.Assert(resp.Errors, Equals, false)
}
3 changes: 2 additions & 1 deletion river/river.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (r *River) parseSource() (map[string][]string, error) {

tables := []string{}

sql := fmt.Sprintf(`SELECT table_name FROM information_schema.tables WHERE
sql := fmt.Sprintf(`SELECT table_name FROM information_schema.tables WHERE
table_name RLIKE "%s" AND table_schema = "%s";`, table, s.Schema)

res, err := r.canal.Execute(sql)
Expand Down Expand Up @@ -202,6 +202,7 @@ func (r *River) prepareRule() error {
rr := r.rules[ruleKey(rule.Schema, table)]
rr.Index = rule.Index
rr.Type = rule.Type
rr.Parent = rule.Parent
rr.FieldMapping = rule.FieldMapping
}
} else {
Expand Down
110 changes: 110 additions & 0 deletions river/river_extra_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package river

import (
"fmt"
"net/http"
"net/url"
"os"

. "gopkg.in/check.v1"
)

func (s *riverTestSuite) setupExtra(c *C) (r *River) {
var err error

schema := `
CREATE TABLE IF NOT EXISTS %s (
id INT,
title VARCHAR(256),
pid INT,
PRIMARY KEY(id)) ENGINE=INNODB;
`

s.testExecute(c, "DROP TABLE IF EXISTS test_river_extra")
s.testExecute(c, fmt.Sprintf(schema, "test_river_extra"))

cfg := new(Config)
cfg.MyAddr = *my_addr
cfg.MyUser = "root"
cfg.MyPassword = ""
cfg.ESAddr = *es_addr

cfg.ServerID = 1001
cfg.Flavor = "mysql"

cfg.DataDir = "/tmp/test_river_extra"
cfg.DumpExec = "mysqldump"

cfg.StatAddr = "127.0.0.1:12800"

os.RemoveAll(cfg.DataDir)

cfg.Sources = []SourceConfig{SourceConfig{Schema: "test", Tables: []string{"test_river_extra"}}}

cfg.Rules = []*Rule{
&Rule{Schema: "test",
Table: "test_river_extra",
Index: "river",
Type: "river_extra",
Parent: "pid"}}

r, err = NewRiver(cfg)
c.Assert(err, IsNil)

mapping := map[string]interface{}{
"river_extra": map[string]interface{}{
"_parent": map[string]string{"type": "river_extra"},
},
}

r.es.CreateMapping("river", "river_extra", mapping)

return r
}

func (s *riverTestSuite) testPrepareExtraData(c *C) {
s.testExecute(c, "INSERT INTO test_river_extra (id, title, pid) VALUES (?, ?, ?)", 1, "first", 1)
s.testExecute(c, "INSERT INTO test_river_extra (id, title, pid) VALUES (?, ?, ?)", 2, "second", 1)
s.testExecute(c, "INSERT INTO test_river_extra (id, title, pid) VALUES (?, ?, ?)", 3, "third", 1)
s.testExecute(c, "INSERT INTO test_river_extra (id, title, pid) VALUES (?, ?, ?)", 4, "fourth", 1)
}

func (s *riverTestSuite) testElasticExtraExists(c *C, id string, parent string, exist bool) {
index := "river"
docType := "river_extra"

reqUrl := fmt.Sprintf("http://%s/%s/%s/%s?parent=%s", s.r.es.Addr,
url.QueryEscape(index),
url.QueryEscape(docType),
url.QueryEscape(id),
url.QueryEscape(parent))

r, err := s.r.es.Do("HEAD", reqUrl, nil)
c.Assert(err, IsNil)

if exist {
c.Assert(r.Code, Equals, http.StatusOK)
} else {
c.Assert(r.Code, Equals, http.StatusNotFound)
}
}

func (s *riverTestSuite) TestRiverWithParent(c *C) {
river := s.setupExtra(c)

defer river.Close()

s.testPrepareExtraData(c)

go river.Run()

<-river.canal.WaitDumpDone()

s.testElasticExtraExists(c, "1", "1", true)

s.testExecute(c, "DELETE FROM test_river_extra WHERE id = ?", 1)
err := river.canal.CatchMasterPos(1)
c.Assert(err, IsNil)

s.testElasticExtraExists(c, "1", "1", false)
}
5 changes: 4 additions & 1 deletion river/river_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (s *riverTestSuite) SetUpSuite(c *C) {

schema := `
CREATE TABLE IF NOT EXISTS %s (
id INT,
id INT,
title VARCHAR(256),
content VARCHAR(256),
tenum ENUM("e1", "e2", "e3"),
Expand Down Expand Up @@ -119,6 +119,8 @@ schema = "test"
table = "test_river"
index = "river"
type = "river"
parent = "pid"

[rule.field]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need a blank

title = "es_title"

Expand All @@ -127,6 +129,7 @@ schema = "test"
table = "test_river_[0-9]{4}"
index = "river"
type = "river"

[rule.field]
title = "es_title"

Expand Down
1 change: 1 addition & 0 deletions river/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type Rule struct {
Table string `toml:"table"`
Index string `toml:"index"`
Type string `toml:"type"`
Parent string `toml:"parent"`

// Default, a MySQL table field name is mapped to Elasticsearch field name.
// Sometimes, you want to use different name, e.g, the MySQL file name is title,
Expand Down
25 changes: 24 additions & 1 deletion river/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,19 @@ func (r *River) makeUpdateRequest(rule *Rule, rows [][]interface{}) ([]*elastic.
return nil, err
}

beforeParentID, afterParentID := "", ""
if len(rule.Parent) > 0 {
if beforeParentID, err = r.getParentID(rule, rows[i], rule.Parent); err != nil {
return nil, err
}
if afterParentID, err = r.getParentID(rule, rows[i+1], rule.Parent); err != nil {
return nil, err
}
}

req := &elastic.BulkRequest{Index: rule.Index, Type: rule.Type, ID: beforeID}

if beforeID != afterID {
if beforeID != afterID || beforeParentID != afterParentID {
req.Action = elastic.ActionDelete
reqs = append(reqs, req)

Expand Down Expand Up @@ -182,6 +192,10 @@ func (r *River) makeInsertReqData(req *elastic.BulkRequest, rule *Rule, values [
req.Data[c.Name] = r.makeReqColumnData(&c, values[i])
}
}

if len(rule.Parent) > 0 {
req.Parent = fmt.Sprint(req.Data[rule.Parent])
}
}

func (r *River) makeUpdateReqData(req *elastic.BulkRequest, rule *Rule,
Expand Down Expand Up @@ -228,6 +242,15 @@ func (r *River) getDocID(rule *Rule, row []interface{}) (string, error) {
return buf.String(), nil
}

func (r *River) getParentID(rule *Rule, row []interface{}, columnName string) (string, error) {
index := rule.TableInfo.FindColumn(columnName)
if index < 0 {
return "", fmt.Errorf("parent id not found %s(%s)", rule.TableInfo.Name, columnName)
}

return fmt.Sprint(row[index]), nil
}

func (r *River) doBulk(reqs []*elastic.BulkRequest) error {
if len(reqs) == 0 {
return nil
Expand Down