-
Notifications
You must be signed in to change notification settings - Fork 211
/
registry.go
149 lines (132 loc) · 5.15 KB
/
registry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
// Copyright 2017 Canonical Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package registry
import (
"bytes"
"fmt"
"sync"
"testing"
"github.com/CanonicalLtd/dqlite/internal/trace"
"github.com/CanonicalLtd/dqlite/internal/transaction"
"github.com/CanonicalLtd/go-sqlite3"
)
// Registry is a dqlite node-level data structure that tracks:
//
// - The directory where dqlite data for this node lives.
//
// - All SQLite connections opened on the node, either in leader replication
// mode or follower replication mode.
//
// - All inflight WAL write transactions, either for leader or follower
// connections.
//
// - All tracers used to emit trace messages.
//
// - Last log index applied by the FSM.
//
// A single Registry instance is shared by a single replication.FSM instance, a
// single replication.Methods instance and a single dqlite.Driver instance.
//
// Methods that access or mutate the registry are not thread-safe and must be
// performed after acquiring the lock. See Lock() and Unlock().
type Registry struct {
mu sync.Mutex // Serialize access to internal state.
dir string // Node data directory
leaders map[*sqlite3.SQLiteConn]string // Map leader connections to database filenames.
followers map[string]*sqlite3.SQLiteConn // Map database filenames to follower connections.
txns map[uint64]*transaction.Txn // Transactions by ID
tracers *trace.Set // Tracers used by this dqlite instance.
index uint64 // Last log index applied by the dqlite FSM.
frames uint64 // Number of frames written to the WAL so far.
hookSync *hookSync // Used for synchronizing Methods and FSM.
// Map a connection to its serial number. Serial numbers are guaranteed
// to be unique inside the same process.
serial map[*sqlite3.SQLiteConn]uint64
// Circular buffer holding the IDs of the last N transactions that
// where successfully committed. It is used to recover a transaction
// that errored because of lost leadership but that might actually get
// completed because a quorum was reached for the lost commit frames
// command log.
committed []uint64
committedCursor int
// Map a leader connection to the ID of the last transaction executed
// on it. Used by the driver's Tx implementation to know its ID in case
// a client asks for it for recovering a lost commit.
lastTxnIDs map[*sqlite3.SQLiteConn]uint64
// Flag indicating whether transactions state transitions
// should actually callback the relevant SQLite APIs. Some
// tests need set this flag to true because there's no public
// API to acquire the WAL read lock in leader connections.
txnDryRun bool
}
// New creates a new registry.
//
// The 'dir' parameter sets the directory where the node associated with this
// registry will save the SQLite database files.
func New(dir string) *Registry {
tracers := trace.NewSet(250)
// Register the is the tracer that will be used by the FSM associated
// with this registry.
tracers.Add("fsm")
return &Registry{
dir: dir,
leaders: map[*sqlite3.SQLiteConn]string{},
followers: map[string]*sqlite3.SQLiteConn{},
txns: map[uint64]*transaction.Txn{},
tracers: tracers,
serial: map[*sqlite3.SQLiteConn]uint64{},
committed: make([]uint64, committedBufferSize),
lastTxnIDs: make(map[*sqlite3.SQLiteConn]uint64),
}
}
// Lock the registry.
func (r *Registry) Lock() {
r.mu.Lock()
}
// Unlock the registry.
func (r *Registry) Unlock() {
r.mu.Unlock()
}
// Dir is the directory where replicated SQLite files are stored.
func (r *Registry) Dir() string {
return r.dir
}
// Testing sets up this registry for unit-testing.
//
// The tracers will forward all entries to the testing logger, using the given
// node prefix.
func (r *Registry) Testing(t *testing.T, node int) {
r.tracers.Testing(t, node)
}
// Dump the content of the registry, useful for debugging.
func (r *Registry) Dump() string {
buffer := bytes.NewBuffer(nil)
fmt.Fprintf(buffer, "leaders:\n")
for conn, name := range r.leaders {
fmt.Fprintf(buffer, "-> %d: %s\n", r.ConnSerial(conn), name)
}
fmt.Fprintf(buffer, "followers:\n")
for name, conn := range r.followers {
fmt.Fprintf(buffer, "-> %d: %s\n", r.ConnSerial(conn), name)
}
fmt.Fprintf(buffer, "transactions:\n")
for _, txn := range r.txns {
fmt.Fprintf(buffer, "-> %s\n", txn)
}
return buffer.String()
}
// Keep track of at most this much comitted transactions. This number should be
// large enough for any real-world situation, where it's unlikely that a client
// tries to recover a transaction that is so old.
const committedBufferSize = 10000