forked from asonawalla/gazette
-
Notifications
You must be signed in to change notification settings - Fork 3
/
shard.go
102 lines (81 loc) · 2.56 KB
/
shard.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package consumertest
import (
"io/ioutil"
"os"
rocks "github.com/tecbot/gorocksdb"
"github.com/LiveRamp/gazette/pkg/consumer"
"github.com/LiveRamp/gazette/pkg/topic"
)
// Test type which conforms to consumer.Shard, and manages setup & teardown
// of a test RocksDB instance.
type Shard struct {
IDFixture consumer.ShardID
PartitionFixture topic.Partition
tmpdir string
env *rocks.Env
opts *rocks.Options
ro *rocks.ReadOptions
wo *rocks.WriteOptions
db *rocks.DB
tx *rocks.WriteBatch
cache interface{}
}
// consumer.Shard implementation.
func (s *Shard) ID() consumer.ShardID { return s.IDFixture }
func (s *Shard) Partition() topic.Partition { return s.PartitionFixture }
func (s *Shard) Cache() interface{} { return s.cache }
func (s *Shard) SetCache(c interface{}) { s.cache = c }
func (s *Shard) Database() *rocks.DB { return s.db }
func (s *Shard) Transaction() *rocks.WriteBatch { return s.tx }
func (s *Shard) ReadOptions() *rocks.ReadOptions { return s.ro }
func (s *Shard) WriteOptions() *rocks.WriteOptions { return s.wo }
// Initializes a Shard & database backed by a temporary directory.
// TODO(johnny): Since this is test support, panic on error (rather than returning it).
func NewShard(prefix string) (*Shard, error) {
var s = new(Shard)
var err error
s.tmpdir, err = ioutil.TempDir("", prefix)
if err != nil {
return nil, err
}
s.env = rocks.NewDefaultEnv()
s.opts = rocks.NewDefaultOptions()
s.opts.SetCreateIfMissing(true)
s.ro = rocks.NewDefaultReadOptions()
s.wo = rocks.NewDefaultWriteOptions()
s.db, err = rocks.OpenDb(s.opts, s.tmpdir)
if err != nil {
return nil, err
}
s.tx = rocks.NewWriteBatch()
return s, nil
}
// Flushes the current Shard transaction WriteBatch to the database.
func (s *Shard) FlushTransaction() error {
var err = s.db.Write(s.wo, s.tx)
s.tx.Clear()
return err
}
// DatabaseContents enumerates and returns keys and values from the database
// as a map. As this is a test support method, it panics on iterator error.
func (s *Shard) DatabaseContent() map[string]string {
var results = make(map[string]string)
var it = s.db.NewIterator(s.ro)
defer it.Close()
for it.SeekToFirst(); it.Valid(); it.Next() {
results[string(it.Key().Data())] = string(it.Value().Data())
}
if err := it.Err(); err != nil {
panic(err.Error())
}
return results
}
// Closes and removes the Shard database.
func (s *Shard) Close() error {
s.opts.Destroy()
s.ro.Destroy()
s.wo.Destroy()
s.db.Close()
s.env.Destroy()
return os.RemoveAll(s.tmpdir)
}