-
Notifications
You must be signed in to change notification settings - Fork 3
/
log.go
108 lines (88 loc) · 2.53 KB
/
log.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package sharded
import (
"context"
"errors"
"fmt"
"github.com/benbjohnson/clock"
"github.com/embano1/memlog"
)
type config struct {
shards uint
// memlog.Log settings
startOffset memlog.Offset
segmentSize int // offsets per segment
maxRecordSize int // bytes
}
// Log is a sharded log implementation on top of memlog.Log. It uses a
// configurable sharding strategy (see Sharder interface) during reads and
// writes.
type Log struct {
sharder Sharder
clock clock.Clock
conf config
shards []*memlog.Log
}
// New creates a new sharded log which can be customized with options. If not
// specified, the default sharding strategy uses fnv.New32a for key hashing.
func New(ctx context.Context, options ...Option) (*Log, error) {
var l Log
// apply defaults
for _, opt := range defaultOptions {
if err := opt(&l); err != nil {
return nil, fmt.Errorf("configure log default option: %v", err)
}
}
// apply custom settings
for _, opt := range options {
if err := opt(&l); err != nil {
return nil, fmt.Errorf("configure log custom option: %v", err)
}
}
shards := l.conf.shards
l.shards = make([]*memlog.Log, shards)
opts := []memlog.Option{
memlog.WithClock(l.clock),
memlog.WithMaxRecordDataSize(l.conf.maxRecordSize),
memlog.WithStartOffset(l.conf.startOffset),
memlog.WithMaxSegmentSize(l.conf.segmentSize),
}
for i := 0; i < int(shards); i++ {
ml, err := memlog.New(ctx, opts...)
if err != nil {
return nil, fmt.Errorf("create shard: %w", err)
}
l.shards[i] = ml
}
return &l, nil
}
// Write writes data to the log using the specified key for sharding
func (l *Log) Write(ctx context.Context, key []byte, data []byte) (memlog.Offset, error) {
if key == nil {
return -1, errors.New("invalid key")
}
shard, err := l.sharder.Shard(key, l.conf.shards)
if err != nil {
return -1, fmt.Errorf("get shard: %w", err)
}
offset, err := l.shards[shard].Write(ctx, data)
if err != nil {
return offset, fmt.Errorf("write to shard: %w", err)
}
return offset, nil
}
// Read reads a record from the log at offset using the specified key for shard
// lookup
func (l *Log) Read(ctx context.Context, key []byte, offset memlog.Offset) (memlog.Record, error) {
if key == nil {
return memlog.Record{}, errors.New("invalid key")
}
shard, err := l.sharder.Shard(key, l.conf.shards)
if err != nil {
return memlog.Record{}, fmt.Errorf("get shard: %w", err)
}
r, err := l.shards[shard].Read(ctx, offset)
if err != nil {
return memlog.Record{}, fmt.Errorf("read from shard: %w", err)
}
return r, nil
}