forked from vitessio/vitess
/
keyrange_filter.go
173 lines (161 loc) · 6.01 KB
/
keyrange_filter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
// Copyright 2012, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package binlog
import (
log "github.com/golang/glog"
"github.com/youtube/vitess/go/vt/key"
"github.com/youtube/vitess/go/vt/sqlannotation"
"errors"
"fmt"
"github.com/youtube/vitess/go/vt/sqlparser"
binlogdatapb "github.com/youtube/vitess/go/vt/proto/binlogdata"
querypb "github.com/youtube/vitess/go/vt/proto/query"
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
)
// KeyRangeFilterFunc returns a function that calls callback only if statements
// in the transaction match the specified keyrange. The resulting function can be
// passed into the Streamer: bls.Stream(file, pos, sendTransaction) ->
// bls.Stream(file, pos, KeyRangeFilterFunc(keyrange, sendTransaction))
func KeyRangeFilterFunc(keyrange *topodatapb.KeyRange, callback func(*binlogdatapb.BinlogTransaction) error) sendTransactionFunc {
return func(eventToken *querypb.EventToken, statements []FullBinlogStatement) error {
matched := false
filtered := make([]*binlogdatapb.BinlogTransaction_Statement, 0, len(statements))
for _, statement := range statements {
switch statement.Statement.Category {
case binlogdatapb.BinlogTransaction_Statement_BL_SET:
filtered = append(filtered, statement.Statement)
case binlogdatapb.BinlogTransaction_Statement_BL_DDL:
log.Warningf("Not forwarding DDL: %s", statement.Statement.Sql)
continue
case binlogdatapb.BinlogTransaction_Statement_BL_INSERT,
binlogdatapb.BinlogTransaction_Statement_BL_UPDATE,
binlogdatapb.BinlogTransaction_Statement_BL_DELETE:
// Handle RBR case first.
if statement.KeyspaceID != nil {
if !key.KeyRangeContains(keyrange, statement.KeyspaceID) {
// Skip keyspace ids that don't belong to the destination shard.
continue
}
filtered = append(filtered, statement.Statement)
matched = true
continue
}
// SBR case.
keyspaceIDS, err := sqlannotation.ExtractKeyspaceIDS(string(statement.Statement.Sql))
if err != nil {
if statement.Statement.Category == binlogdatapb.BinlogTransaction_Statement_BL_INSERT {
// TODO(erez): Stop filtered-replication here, and alert.
logExtractKeySpaceIDError(err)
continue
}
// If no keyspace IDs are found, we replicate to all targets.
// This is safe for UPDATE and DELETE because vttablet rewrites queries to
// include the primary key and the query will only affect the shards that
// have the rows.
filtered = append(filtered, statement.Statement)
matched = true
continue
}
if len(keyspaceIDS) == 1 {
if !key.KeyRangeContains(keyrange, keyspaceIDS[0]) {
// Skip keyspace ids that don't belong to the destination shard.
continue
}
filtered = append(filtered, statement.Statement)
matched = true
continue
}
query, err := getValidRangeQuery(string(statement.Statement.Sql), keyspaceIDS, keyrange)
if err != nil {
log.Errorf("Error parsing statement (%s). Got %v", string(statement.Statement.Sql), err)
continue
}
if query == "" {
continue
}
splitStatement := &binlogdatapb.BinlogTransaction_Statement{
Category: statement.Statement.Category,
Charset: statement.Statement.Charset,
Sql: []byte(query),
}
filtered = append(filtered, splitStatement)
matched = true
case binlogdatapb.BinlogTransaction_Statement_BL_UNRECOGNIZED:
updateStreamErrors.Add("KeyRangeStream", 1)
log.Errorf("Error parsing keyspace id: %s", statement.Statement.Sql)
continue
}
}
trans := &binlogdatapb.BinlogTransaction{
EventToken: eventToken,
}
if matched {
trans.Statements = filtered
}
return callback(trans)
}
}
func getValidRangeQuery(sql string, keyspaceIDs [][]byte, keyrange *topodatapb.KeyRange) (query string, err error) {
statement, err := sqlparser.Parse(sql)
_, trailingComments := sqlparser.SplitTrailingComments(sql)
if err != nil {
return "", err
}
switch statement := statement.(type) {
case *sqlparser.Insert:
query, err := generateSingleInsertQuery(statement, keyspaceIDs, trailingComments, keyrange)
if err != nil {
return "", err
}
return query, nil
default:
return "", errors.New("unsupported construct ")
}
}
func generateSingleInsertQuery(ins *sqlparser.Insert, keyspaceIDs [][]byte, trailingComments string, keyrange *topodatapb.KeyRange) (query string, err error) {
switch rows := ins.Rows.(type) {
case *sqlparser.Select, *sqlparser.Union:
return "", errors.New("unsupported: insert into select")
case sqlparser.Values:
var values sqlparser.Values
if len(rows) != len(keyspaceIDs) {
return "", fmt.Errorf("length of values tuples %v doesn't match with length of keyspaceids %v", len(values), len(keyspaceIDs))
}
queryBuf := sqlparser.NewTrackedBuffer(nil)
for rowNum, val := range rows {
if key.KeyRangeContains(keyrange, keyspaceIDs[rowNum]) {
values = append(values, val)
}
}
if len(values) == 0 {
return "", nil
}
ins.Rows = values
ins.Format(queryBuf)
queryBuf.WriteString(trailingComments)
return queryBuf.String(), nil
default:
return "", errors.New("unexpected construct in insert")
}
}
func logExtractKeySpaceIDError(err error) {
extractErr, ok := err.(*sqlannotation.ExtractKeySpaceIDError)
if !ok {
log.Fatalf("Expected sqlannotation.ExtractKeySpaceIDError. Got: %v", err)
}
switch extractErr.Kind {
case sqlannotation.ExtractKeySpaceIDParseError:
log.Errorf(
"Error parsing keyspace id annotation. Skipping statement. (%s)", extractErr.Message)
updateStreamErrors.Add("ExtractKeySpaceIDParseError", 1)
case sqlannotation.ExtractKeySpaceIDReplicationUnfriendlyError:
log.Errorf(
"Found replication unfriendly statement. (%s). "+
"Filtered replication should abort, but we're currenty just skipping the statement.",
extractErr.Message)
updateStreamErrors.Add("ExtractKeySpaceIDReplicationUnfriendlyError", 1)
default:
log.Fatalf("Unexpected extractErr.Kind. (%v)", extractErr)
}
}