forked from travisjeffery/jocko
/
replicator.go
132 lines (120 loc) · 3.28 KB
/
replicator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package jocko
import (
"time"
"github.com/cenkalti/backoff"
"github.com/travisjeffery/jocko/log"
"github.com/travisjeffery/jocko/protocol"
)
// Client is used to request other brokers.
type client interface {
Fetch(fetchRequest *protocol.FetchRequest) (*protocol.FetchResponse, error)
CreateTopics(createRequest *protocol.CreateTopicRequests) (*protocol.CreateTopicsResponse, error)
LeaderAndISR(request *protocol.LeaderAndISRRequest) (*protocol.LeaderAndISRResponse, error)
// others
}
// Replicator fetches from the partition's leader producing to itself the follower, thereby replicating the partition.
type Replicator struct {
config ReplicatorConfig
replica *Replica
highwaterMarkOffset int64
offset int64
msgs chan []byte
done chan struct{}
leader client
backoff *backoff.ExponentialBackOff
}
type ReplicatorConfig struct {
MinBytes int32
// todo: make this a time.Duration
MaxWaitTime time.Duration
}
// NewReplicator returns a new replicator instance.
func NewReplicator(config ReplicatorConfig, replica *Replica, leader client) *Replicator {
if config.MinBytes == 0 {
config.MinBytes = 1
}
bo := backoff.NewExponentialBackOff()
r := &Replicator{
config: config,
replica: replica,
leader: leader,
done: make(chan struct{}, 2),
msgs: make(chan []byte, 2),
backoff: bo,
}
return r
}
// Replicate start fetching messages from the leader and appending them to the local commit log.
func (r *Replicator) Replicate() {
go r.fetchMessages()
go r.appendMessages()
}
func (r *Replicator) fetchMessages() {
var fetchRequest *protocol.FetchRequest
var fetchResponse *protocol.FetchResponse
var err error
for {
select {
case <-r.done:
return
default:
fetchRequest = &protocol.FetchRequest{
ReplicaID: r.replica.BrokerID,
MaxWaitTime: r.config.MaxWaitTime,
MinBytes: r.config.MinBytes,
Topics: []*protocol.FetchTopic{{
Topic: r.replica.Partition.Topic,
Partitions: []*protocol.FetchPartition{{
Partition: r.replica.Partition.ID,
FetchOffset: r.offset,
}},
}},
}
fetchResponse, err = r.leader.Fetch(fetchRequest)
// TODO: probably shouldn't panic. just let this replica fall out of ISR.
if err != nil {
log.Error.Printf("replicator: fetch messages error: %s", err)
goto BACKOFF
}
for _, resp := range fetchResponse.Responses {
for _, p := range resp.PartitionResponses {
if p.ErrorCode != protocol.ErrNone.Code() {
log.Error.Printf("replicator: partition response error: %d", p.ErrorCode)
goto BACKOFF
}
if p.RecordSet == nil {
goto BACKOFF
}
offset := int64(protocol.Encoding.Uint64(p.RecordSet[:8]))
if offset > r.offset {
r.msgs <- p.RecordSet
r.highwaterMarkOffset = p.HighWatermark
r.offset = offset
}
}
}
r.backoff.Reset()
continue
BACKOFF:
time.Sleep(r.backoff.NextBackOff())
}
}
}
func (r *Replicator) appendMessages() {
for {
select {
case <-r.done:
return
case msg := <-r.msgs:
_, err := r.replica.Log.Append(msg)
if err != nil {
panic(err)
}
}
}
}
// Close the replicator object when we are no longer following
func (r *Replicator) Close() error {
close(r.done)
return nil
}