-
Notifications
You must be signed in to change notification settings - Fork 4
/
store_epoch.go
117 lines (100 loc) · 2.43 KB
/
store_epoch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package gossip
/*
In LRU cache data stored like pointer
*/
import (
"errors"
"fmt"
"sync/atomic"
"github.com/TechPay-io/sirius-base/inter/idx"
"github.com/TechPay-io/sirius-base/kvdb"
"github.com/TechPay-io/sirius-base/kvdb/skiperrors"
"github.com/TechPay-io/sirius-base/kvdb/table"
"github.com/TechPay-io/go-photon/logger"
)
var (
errDBClosed = errors.New("database closed")
)
type (
epochStore struct {
epoch idx.Epoch
db kvdb.DropableStore
table struct {
LastEvents kvdb.Store `table:"t"`
Heads kvdb.Store `table:"H"`
DagIndex kvdb.Store `table:"v"`
}
cache struct {
Heads atomic.Value
LastEvents atomic.Value
}
logger.Instance
}
)
func newEpochStore(epoch idx.Epoch, db kvdb.DropableStore) *epochStore {
es := &epochStore{
epoch: epoch,
db: db,
Instance: logger.MakeInstance(),
}
table.MigrateTables(&es.table, db)
// wrap with skiperrors to skip errors on reading from a dropped DB
es.table.LastEvents = skiperrors.Wrap(es.table.LastEvents, errDBClosed)
es.table.Heads = skiperrors.Wrap(es.table.Heads, errDBClosed)
// load the cache to avoid a race condition
es.GetHeads()
es.GetLastEvents()
return es
}
func (s *Store) getAnyEpochStore() *epochStore {
_es := s.epochStore.Load()
if _es == nil {
return nil
}
return _es.(*epochStore)
}
// getEpochStore is safe for concurrent use.
func (s *Store) getEpochStore(epoch idx.Epoch) *epochStore {
es := s.getAnyEpochStore()
if es.epoch != epoch {
return nil
}
return es
}
func (s *Store) resetEpochStore(newEpoch idx.Epoch) {
oldEs := s.epochStore.Load()
// create new DB
s.createEpochStore(newEpoch)
// drop previous DB
// there may be race condition with threads which hold this DB, so wrap tables with skiperrors
if oldEs != nil {
err := oldEs.(*epochStore).db.Close()
if err != nil {
s.Log.Error("Failed to close epoch DB", "err", err)
return
}
oldEs.(*epochStore).db.Drop()
}
}
func (s *Store) loadEpochStore(epoch idx.Epoch) {
if s.epochStore.Load() != nil {
return
}
s.createEpochStore(epoch)
}
func (s *Store) closeEpochStore() error {
es := s.getAnyEpochStore()
if es == nil {
return nil
}
return es.db.Close()
}
func (s *Store) createEpochStore(epoch idx.Epoch) {
// create new DB
name := fmt.Sprintf("gossip-%d", epoch)
db, err := s.dbs.OpenDB(name)
if err != nil {
s.Log.Crit("Filed to open DB", "name", name, "err", err)
}
s.epochStore.Store(newEpochStore(epoch, db))
}