-
Notifications
You must be signed in to change notification settings - Fork 441
/
write_controller.go
130 lines (114 loc) · 3.85 KB
/
write_controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package collector
import (
conf "github.com/alibaba/MongoShake/v2/collector/configure"
utils "github.com/alibaba/MongoShake/v2/common"
module "github.com/alibaba/MongoShake/v2/modules"
"github.com/alibaba/MongoShake/v2/oplog"
"github.com/alibaba/MongoShake/v2/tunnel"
)
type WriteController struct {
// worker (not owned)
worker *Worker
// modules
moduleList []Module
// backend tunnel
tunnel tunnel.Writer
// current max lsn_ack value
LatestLsnAck int64
}
type Module interface {
IsRegistered() bool
/**
* Module install and initialize. return false on failed
* and only invocation on WriteController is preparing
*/
Install() bool
/**
* Handle outstanding request message. and messages
* are passed one by one. Any changes of message in
* Handle() will be preserved and delivery to next
*
* @return tunnel's error code (<0) or ack value
*
*/
Handle(message *tunnel.WMessage) int64
}
// the order of controller modules declared strictly
// doesn't change the order
var orderedModuleList = []Module{
&module.Compressor{},
&module.ChecksumCalculator{},
}
func NewWriteController(worker *Worker) *WriteController {
writeController := &WriteController{worker: worker}
if !writeController.installModules() {
return nil
}
// create t by options
factory := tunnel.WriterFactory{Name: conf.Options.Tunnel}
if writeController.tunnel = factory.Create(conf.Options.TunnelAddress, worker.id); writeController.tunnel != nil {
if writeController.tunnel.Prepare() {
return writeController
}
}
return nil
}
// set init sync finish timestamp if tunnel is direct
func (controller *WriteController) SetInitSyncFinishTs(fullSyncFinishPosition int64) {
if controller.tunnel.Name() == "direct" {
dw := controller.tunnel.(*tunnel.DirectWriter)
dw.BatchExecutor.FullFinishTs = fullSyncFinishPosition
}
}
func (controller *WriteController) installModules() bool {
for _, m := range orderedModuleList {
if m.IsRegistered() {
if !m.Install() {
return false
}
controller.moduleList = append(controller.moduleList, m)
}
}
return true
}
func (controller *WriteController) Send(logs []*oplog.GenericOplog, tag uint32) int64 {
// all tunnel message which contain empty logs will be considered as
// probe message. Include real probe to get ack from remote server
// or a normal message doesn't have logs (which submit by retransmission)
if !controller.tunnel.AckRequired() && tag&tunnel.MsgProbe != 0 {
// probe message is not useful while tunnel AckRequired() is false
// ignore these messages. Tag on these message contain
// MsgRetransmission flag is impossible.
return controller.LatestLsnAck
}
message := &tunnel.WMessage{
TMessage: &tunnel.TMessage{
Tag: tag,
Shard: controller.worker.id,
RawLogs: oplog.LogEntryEncode(logs),
},
ParsedLogs: oplog.LogParsed(logs),
}
for _, m := range controller.moduleList {
if internalCode := m.Handle(message); internalCode < 0 {
return internalCode
}
}
// we return the error directly if send() failed(feedback must less than zero). feedback bigger
// or equal zero means has sent successfully. And if tunnel is AckRequired() we set the LatestLsnAck
// in order to notify the upper layer ACK value. if not, we only drop the ACK and return the
// "last message" timestamp. that indicates nothing should be ACKed
if feedback := controller.tunnel.Send(message); feedback < 0 {
// failed
return feedback
} else if controller.tunnel.AckRequired() {
// ok, need ack value
controller.LatestLsnAck = feedback
} else if message.Tag&tunnel.MsgProbe == 0 && len(message.RawLogs) != 0 {
// direct tunnel way will also come into this branch
controller.LatestLsnAck = utils.TimeStampToInt64(logs[len(logs)-1].Parsed.Timestamp)
}
// accumulated overall logs size
controller.worker.syncer.replMetric.AddTunnelTraffic(message.ApproximateSize())
return controller.LatestLsnAck
}