From 187f811567d19c5e21f0787f2b1d14e90f72f6fb Mon Sep 17 00:00:00 2001 From: Eagle Chen Date: Thu, 23 Jul 2015 11:45:37 +0800 Subject: [PATCH] add support for parent relationship --- README.md | 27 +++++----- elastic/client.go | 31 +++++++++++ elastic/client_test.go | 33 ++++++++++++ river/river.go | 3 +- river/river_extra_test.go | 110 ++++++++++++++++++++++++++++++++++++++ river/river_test.go | 5 +- river/rule.go | 1 + river/sync.go | 25 ++++++++- 8 files changed, 220 insertions(+), 15 deletions(-) create mode 100644 river/river_extra_test.go diff --git a/README.md b/README.md index 20c02584..ce187a2e 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,8 @@ -go-mysql-elasticsearch is a service syncing your MySQL data into Elasticsearch automatically. +go-mysql-elasticsearch is a service syncing your MySQL data into Elasticsearch automatically. It uses `mysqldump` to fetch the origin data at first, then syncs data incrementally with binlog. -## Install +## Install + Install Go and set your [GOPATH](https://golang.org/doc/code.html#GOPATH) + Install godep `go get github.com/tools/godep` @@ -22,9 +22,9 @@ It uses `mysqldump` to fetch the origin data at first, then syncs data increment ## Notice + binlog format must be **row**. -+ binlog row image must be **full** for MySQL, you may lost some field data if you update PK data in MySQL with minimal or noblob binlog row image. MariaDB only supports full row image. ++ binlog row image must be **full** for MySQL, you may lost some field data if you update PK data in MySQL with minimal or noblob binlog row image. MariaDB only supports full row image. + Can not alter table format at runtime. -+ MySQL table which will be synced must have a PK(primary key), multi columns PK is allowed now, e,g, if the PKs is (a, b), we will use "a:b" as the key. The PK data will be used as "id" in Elasticsearch. ++ MySQL table which will be synced must have a PK(primary key), multi columns PK is allowed now, e,g, if the PKs is (a, b), we will use "a:b" as the key. The PK data will be used as "id" in Elasticsearch. + You should create the associated mappings in Elasticsearch first, I don't think using the default mapping is a wise decision, you must know how to search accurately. + `mysqldump` must exist in the same node with go-mysql-elasticsearch, if not, go-mysql-elasticsearch will try to sync binlog only. + Don't change too many rows at same time in one SQL. @@ -45,14 +45,16 @@ schema = "test_1" tables = ["t3", t4] ``` -`schema` is the database name, and `tables` includes the table need to be synced. +`schema` is the database name, and `tables` includes the table need to be synced. ## Rule -By default, go-mysql-elasticsearch will use MySQL table name as the Elasticserach's index and type name, use MySQL table field name as the Elasticserach's field name. -e.g, if a table named blog, the default index and type in Elasticserach are both named blog, if the table field named title, +By default, go-mysql-elasticsearch will use MySQL table name as the Elasticserach's index and type name, use MySQL table field name as the Elasticserach's field name. +e.g, if a table named blog, the default index and type in Elasticserach are both named blog, if the table field named title, the default field name is also named title. +In addition, one-to-many join ( [parent-child relationship](https://www.elastic.co/guide/en/elasticsearch/guide/current/parent-child.html) in Elasticsearch ) is supported. Simply specify the field name for `parent` property. + Rule can let you change this name mapping. Rule format in config file is below: ``` @@ -61,6 +63,7 @@ schema = "test" table = "t1" index = "t" type = "t" +parent = "parent_id" [rule.field] title = "my_title" @@ -70,7 +73,7 @@ In the example above, we will use a new index and type both named "t" instead of ## Wildcard table -go-mysql-elasticsearch only allows you determind which table to be synced, but sometimes, if you split a big table into multi sub tables, like 1024, table_0000, table_0001, ... table_1023, it is very hard to write rules for every table. +go-mysql-elasticsearch only allows you determind which table to be synced, but sometimes, if you split a big table into multi sub tables, like 1024, table_0000, table_0001, ... table_1023, it is very hard to write rules for every table. go-mysql-elasticserach supports using wildcard table, e.g: @@ -86,18 +89,18 @@ index = "river" type = "river" ``` -"test_river_[0-9]{4}" is a wildcard table definition, which represents "test_river_0000" to "test_river_9999", at the same time, the table in the rule must be same as it. +"test_river_[0-9]{4}" is a wildcard table definition, which represents "test_river_0000" to "test_river_9999", at the same time, the table in the rule must be same as it. -At the above example, if you have 1024 sub tables, all tables will be synced into Elasticsearch with index "river" and type "river". +At the above example, if you have 1024 sub tables, all tables will be synced into Elasticsearch with index "river" and type "river". ## Why not other rivers? Although there are some other MySQL rivers for Elasticsearch, like [elasticsearch-river-jdbc](https://github.com/jprante/elasticsearch-river-jdbc), [elasticsearch-river-mysql](https://github.com/scharron/elasticsearch-river-mysql), I still want to build a new one with Go, why? + Customization, I want to decide which table to be synced, the associated index and type name, or even the field name in Elasticsearch. -+ Incremental update with binlog, and can resume from the last sync position when the service starts again. ++ Incremental update with binlog, and can resume from the last sync position when the service starts again. + A common sync framework not only for Elasticsearch but also for others, like memcached, redis, etc... -+ Wildcard tables support, we have many sub tables like table_0000 - table_1023, but want use a unique Elasticsearch index and type. ++ Wildcard tables support, we have many sub tables like table_0000 - table_1023, but want use a unique Elasticsearch index and type. ## Todo diff --git a/elastic/client.go b/elastic/client.go index 2b9b64de..4aa1ef4b 100644 --- a/elastic/client.go +++ b/elastic/client.go @@ -53,6 +53,7 @@ type BulkRequest struct { Index string Type string ID string + Parent string Data map[string]interface{} } @@ -70,6 +71,9 @@ func (r *BulkRequest) bulk(buf *bytes.Buffer) error { if len(r.ID) > 0 { metaData["_id"] = r.ID } + if len(r.Parent) > 0 { + metaData["_parent"] = r.Parent + } meta[r.Action] = metaData @@ -190,6 +194,32 @@ func (c *Client) DoBulk(url string, items []*BulkRequest) (*BulkResponse, error) return ret, err } +func (c *Client) CreateMapping(index string, docType string, mapping map[string]interface{}) error { + reqUrl := fmt.Sprintf("http://%s/%s", c.Addr, + url.QueryEscape(index)) + + r, err := c.Do("HEAD", reqUrl, nil) + if err != nil { + return err + } + + // index doesn't exist, create index first + if r.Code != http.StatusOK { + _, err = c.Do("POST", reqUrl, nil) + + if err != nil { + return err + } + } + + reqUrl = fmt.Sprintf("http://%s/%s/%s/_mapping", c.Addr, + url.QueryEscape(index), + url.QueryEscape(docType)) + + _, err = c.Do("POST", reqUrl, mapping) + return err +} + func (c *Client) DeleteIndex(index string) error { reqUrl := fmt.Sprintf("http://%s/%s", c.Addr, url.QueryEscape(index)) @@ -266,6 +296,7 @@ func (c *Client) Delete(index string, docType string, id string) error { } } +// only support parent in 'Bulk' related apis func (c *Client) Bulk(items []*BulkRequest) (*BulkResponse, error) { reqUrl := fmt.Sprintf("http://%s/_bulk", c.Addr) diff --git a/elastic/client_test.go b/elastic/client_test.go index 97fc1756..74214317 100644 --- a/elastic/client_test.go +++ b/elastic/client_test.go @@ -92,3 +92,36 @@ func (s *elasticTestSuite) TestSimple(c *C) { c.Assert(err, IsNil) c.Assert(resp.Errors, Equals, false) } + +// this requires a parent setting in _mapping +func (s *elasticTestSuite) TestParent(c *C) { + index := "dummy" + docType := "comment" + + items := make([]*BulkRequest, 10) + + for i := 0; i < 10; i++ { + id := fmt.Sprintf("%d", i) + req := new(BulkRequest) + req.Action = ActionIndex + req.ID = id + req.Data = makeTestData(fmt.Sprintf("abc %d", i), fmt.Sprintf("hello world %d", i)) + req.Parent = "1" + items[i] = req + } + + resp, err := s.c.IndexTypeBulk(index, docType, items) + c.Assert(err, IsNil) + c.Assert(resp.Errors, Equals, false) + + for i := 0; i < 10; i++ { + id := fmt.Sprintf("%d", i) + req := new(BulkRequest) + req.Action = ActionDelete + req.ID = id + items[i] = req + } + resp, err = s.c.IndexTypeBulk(index, docType, items) + c.Assert(err, IsNil) + c.Assert(resp.Errors, Equals, false) +} diff --git a/river/river.go b/river/river.go index 629e0ad5..3484b5f1 100644 --- a/river/river.go +++ b/river/river.go @@ -137,7 +137,7 @@ func (r *River) parseSource() (map[string][]string, error) { tables := []string{} - sql := fmt.Sprintf(`SELECT table_name FROM information_schema.tables WHERE + sql := fmt.Sprintf(`SELECT table_name FROM information_schema.tables WHERE table_name RLIKE "%s" AND table_schema = "%s";`, table, s.Schema) res, err := r.canal.Execute(sql) @@ -202,6 +202,7 @@ func (r *River) prepareRule() error { rr := r.rules[ruleKey(rule.Schema, table)] rr.Index = rule.Index rr.Type = rule.Type + rr.Parent = rule.Parent rr.FieldMapping = rule.FieldMapping } } else { diff --git a/river/river_extra_test.go b/river/river_extra_test.go new file mode 100644 index 00000000..96cd3de2 --- /dev/null +++ b/river/river_extra_test.go @@ -0,0 +1,110 @@ +package river + +import ( + "fmt" + "net/http" + "net/url" + "os" + + . "gopkg.in/check.v1" +) + +func (s *riverTestSuite) setupExtra(c *C) (r *River) { + var err error + + schema := ` + CREATE TABLE IF NOT EXISTS %s ( + id INT, + title VARCHAR(256), + pid INT, + PRIMARY KEY(id)) ENGINE=INNODB; + ` + + s.testExecute(c, "DROP TABLE IF EXISTS test_river_extra") + s.testExecute(c, fmt.Sprintf(schema, "test_river_extra")) + + cfg := new(Config) + cfg.MyAddr = *my_addr + cfg.MyUser = "root" + cfg.MyPassword = "" + cfg.ESAddr = *es_addr + + cfg.ServerID = 1001 + cfg.Flavor = "mysql" + + cfg.DataDir = "/tmp/test_river_extra" + cfg.DumpExec = "mysqldump" + + cfg.StatAddr = "127.0.0.1:12800" + + os.RemoveAll(cfg.DataDir) + + cfg.Sources = []SourceConfig{SourceConfig{Schema: "test", Tables: []string{"test_river_extra"}}} + + cfg.Rules = []*Rule{ + &Rule{Schema: "test", + Table: "test_river_extra", + Index: "river", + Type: "river_extra", + Parent: "pid"}} + + r, err = NewRiver(cfg) + c.Assert(err, IsNil) + + mapping := map[string]interface{}{ + "river_extra": map[string]interface{}{ + "_parent": map[string]string{"type": "river_extra"}, + }, + } + + r.es.CreateMapping("river", "river_extra", mapping) + + return r +} + +func (s *riverTestSuite) testPrepareExtraData(c *C) { + s.testExecute(c, "INSERT INTO test_river_extra (id, title, pid) VALUES (?, ?, ?)", 1, "first", 1) + s.testExecute(c, "INSERT INTO test_river_extra (id, title, pid) VALUES (?, ?, ?)", 2, "second", 1) + s.testExecute(c, "INSERT INTO test_river_extra (id, title, pid) VALUES (?, ?, ?)", 3, "third", 1) + s.testExecute(c, "INSERT INTO test_river_extra (id, title, pid) VALUES (?, ?, ?)", 4, "fourth", 1) +} + +func (s *riverTestSuite) testElasticExtraExists(c *C, id string, parent string, exist bool) { + index := "river" + docType := "river_extra" + + reqUrl := fmt.Sprintf("http://%s/%s/%s/%s?parent=%s", s.r.es.Addr, + url.QueryEscape(index), + url.QueryEscape(docType), + url.QueryEscape(id), + url.QueryEscape(parent)) + + r, err := s.r.es.Do("HEAD", reqUrl, nil) + c.Assert(err, IsNil) + + if exist { + c.Assert(r.Code, Equals, http.StatusOK) + } else { + c.Assert(r.Code, Equals, http.StatusNotFound) + } +} + +func (s *riverTestSuite) TestRiverWithParent(c *C) { + river := s.setupExtra(c) + + defer river.Close() + + s.testPrepareExtraData(c) + + go river.Run() + + <-river.canal.WaitDumpDone() + + s.testElasticExtraExists(c, "1", "1", true) + + s.testExecute(c, "DELETE FROM test_river_extra WHERE id = ?", 1) + err := river.canal.CatchMasterPos(1) + c.Assert(err, IsNil) + + s.testElasticExtraExists(c, "1", "1", false) +} diff --git a/river/river_test.go b/river/river_test.go index 18337388..083378de 100644 --- a/river/river_test.go +++ b/river/river_test.go @@ -34,7 +34,7 @@ func (s *riverTestSuite) SetUpSuite(c *C) { schema := ` CREATE TABLE IF NOT EXISTS %s ( - id INT, + id INT, title VARCHAR(256), content VARCHAR(256), tenum ENUM("e1", "e2", "e3"), @@ -119,6 +119,8 @@ schema = "test" table = "test_river" index = "river" type = "river" +parent = "pid" + [rule.field] title = "es_title" @@ -127,6 +129,7 @@ schema = "test" table = "test_river_[0-9]{4}" index = "river" type = "river" + [rule.field] title = "es_title" diff --git a/river/rule.go b/river/rule.go index c78af435..ccf26701 100644 --- a/river/rule.go +++ b/river/rule.go @@ -12,6 +12,7 @@ type Rule struct { Table string `toml:"table"` Index string `toml:"index"` Type string `toml:"type"` + Parent string `toml:"parent"` // Default, a MySQL table field name is mapped to Elasticsearch field name. // Sometimes, you want to use different name, e.g, the MySQL file name is title, diff --git a/river/sync.go b/river/sync.go index 2e906d56..c7482ac6 100644 --- a/river/sync.go +++ b/river/sync.go @@ -110,9 +110,19 @@ func (r *River) makeUpdateRequest(rule *Rule, rows [][]interface{}) ([]*elastic. return nil, err } + beforeParentID, afterParentID := "", "" + if len(rule.Parent) > 0 { + if beforeParentID, err = r.getParentID(rule, rows[i], rule.Parent); err != nil { + return nil, err + } + if afterParentID, err = r.getParentID(rule, rows[i+1], rule.Parent); err != nil { + return nil, err + } + } + req := &elastic.BulkRequest{Index: rule.Index, Type: rule.Type, ID: beforeID} - if beforeID != afterID { + if beforeID != afterID || beforeParentID != afterParentID { req.Action = elastic.ActionDelete reqs = append(reqs, req) @@ -182,6 +192,10 @@ func (r *River) makeInsertReqData(req *elastic.BulkRequest, rule *Rule, values [ req.Data[c.Name] = r.makeReqColumnData(&c, values[i]) } } + + if len(rule.Parent) > 0 { + req.Parent = fmt.Sprint(req.Data[rule.Parent]) + } } func (r *River) makeUpdateReqData(req *elastic.BulkRequest, rule *Rule, @@ -228,6 +242,15 @@ func (r *River) getDocID(rule *Rule, row []interface{}) (string, error) { return buf.String(), nil } +func (r *River) getParentID(rule *Rule, row []interface{}, columnName string) (string, error) { + index := rule.TableInfo.FindColumn(columnName) + if index < 0 { + return "", fmt.Errorf("parent id not found %s(%s)", rule.TableInfo.Name, columnName) + } + + return fmt.Sprint(row[index]), nil +} + func (r *River) doBulk(reqs []*elastic.BulkRequest) error { if len(reqs) == 0 { return nil