diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index 9ced64afd..2530f99c4 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -38,8 +38,10 @@ type BinlogSyncer struct { nextPos Position - running bool + running bool semiSyncEnabled bool + + stopCh chan struct{} } func NewBinlogSyncer(serverID uint32, flavor string) *BinlogSyncer { @@ -55,6 +57,8 @@ func NewBinlogSyncer(serverID uint32, flavor string) *BinlogSyncer { b.running = false b.semiSyncEnabled = false + b.stopCh = make(chan struct{}, 1) + return b } @@ -70,6 +74,11 @@ func (b *BinlogSyncer) close() { b.c.SetReadDeadline(time.Now().Add(100 * time.Millisecond)) } + select { + case b.stopCh <- struct{}{}: + default: + } + b.wg.Wait() if b.c != nil { @@ -213,16 +222,17 @@ func (b *BinlogSyncer) EnableSemiSync() error { } _, err := b.c.Execute(`SET @rpl_semi_sync_slave = 1;`) - + if err != nil { b.semiSyncEnabled = true } - + return err } func (b *BinlogSyncer) startDumpStream() *BinlogStreamer { b.running = true + b.stopCh = make(chan struct{}, 1) s := newBinlogStreamer() @@ -508,7 +518,12 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { b.nextPos.Pos = uint32(re.Position) } - s.ch <- e + needStop := false + select { + case s.ch <- e: + case <-b.stopCh: + needStop = true + } if needACK { err := b.replySemiSyncACK(b.nextPos) @@ -517,5 +532,9 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { } } + if needStop { + return errors.New("sync is been closing...") + } + return nil }