-
Notifications
You must be signed in to change notification settings - Fork 0
/
partition.go
106 lines (86 loc) · 1.91 KB
/
partition.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package turing
type PartitionHandler func (partition *Partition, original EncodedKV, message DecodedKV)
type PartitionCommitHandler func (partition *Partition, message MessageEvent)
const (
OffsetEarliest = -1
OffsetLatest = -2
OffsetStored = -3
OffsetNone = -4
)
type Partition struct {
offset int64
offsetChan chan int64
closeChan chan struct{}
handler PartitionHandler
commitHandler PartitionCommitHandler
codec Codec
Topic string
Id int64
Messages chan MessageEvent
}
func (p *Partition) handleMessageEvent(msg MessageEvent) {
p.offset = msg.Offset
decoded, err := p.codec.Decode(msg.Key, msg.Value)
if err == nil {
p.handler(p, EncodedKV{
Key: msg.Key,
Value: msg.Value,
}, decoded)
if p.commitHandler != nil {
p.commitHandler(p, msg)
}
}
}
func (p *Partition) Close() {
close(p.closeChan)
}
func (p *Partition) SetCodec(codec Codec) {
p.codec = codec
}
func (p *Partition) SetHandler(handler PartitionHandler) {
p.handler = handler
}
func (p *Partition) SetCommitBehavior(handler PartitionCommitHandler) {
p.commitHandler = handler
}
func (p *Partition) SetOffset(offset int64) {
p.offset = offset
}
func (p *Partition) GetOffset() int64 {
return p.offset
}
func (p *Partition) Run() error {
if p.codec == nil {
return NoCodecError
}
if p.handler == nil {
return NoHandlerError
}
if p.offset == OffsetNone {
p.offset = OffsetStored
}
p.offsetChan <- p.offset
for {
select {
case <- p.closeChan:
return nil
case msg := <- p.Messages:
p.handleMessageEvent(msg)
}
}
}
func NewPartition(topic string, partitionId int64) *Partition {
return &Partition{
closeChan: make(chan struct{}),
offset: OffsetNone,
offsetChan: make(chan int64, 1),
commitHandler: nil,
Topic: topic,
Id: partitionId,
Messages: make(chan MessageEvent),
}
}
func DefaultCommitBehavior() PartitionCommitHandler {
return func (partition *Partition, message MessageEvent) {
}
}