Permalink
Newer
Older
100644 1572 lines (1292 sloc) 74.9 KB
1
# About
2
This document is an updated version of the original design documents
3
by Spencer Kimball from early 2014.
4
5
# Overview
6
7
CockroachDB is a distributed SQL database. The primary design goals
8
are **scalability**, **strong consistency** and **survivability**
9
(hence the name). CockroachDB aims to tolerate disk, machine, rack, and
10
even **datacenter failures** with minimal latency disruption and **no
11
manual intervention**. CockroachDB nodes are symmetric; a design goal is
12
**homogeneous deployment** (one binary) with minimal configuration and
13
no required external dependencies.
14
15
The entry point for database clients is the SQL interface. Every node
16
in a CockroachDB cluster can act as a client SQL gateway. A SQL
17
gateway transforms and executes client SQL statements to key-value
18
(KV) operations, which the gateway distributes across the cluster as
19
necessary and returns results to the client. CockroachDB implements a
20
**single, monolithic sorted map** from key to value where both keys
21
and values are byte strings.
22
23
The KV map is logically composed of smaller segments of the keyspace called
24
ranges. Each range is backed by data stored in a local KV storage engine (we
25
use [RocksDB](http://rocksdb.org/), a variant of
26
[LevelDB](https://github.com/google/leveldb)). Range data is replicated to a
27
configurable number of additional CockroachDB nodes. Ranges are merged and
28
split to maintain a target size, by default `64M`. The relatively small size
29
facilitates quick repair and rebalancing to address node failures, new capacity
30
and even read/write load. However, the size must be balanced against the
31
pressure on the system from having more ranges to manage.
32
33
CockroachDB achieves horizontally scalability:
34
- adding more nodes increases the capacity of the cluster by the
35
amount of storage on each node (divided by a configurable
36
replication factor), theoretically up to 4 exabytes (4E) of logical
37
data;
38
- client queries can be sent to any node in the cluster, and queries
39
can operate independently (w/o conflicts), meaning that overall
40
throughput is a linear factor of the number of nodes in the cluster.
41
- queries are distributed (ref: distributed SQL) so that the overall
42
throughput of single queries can be increased by adding more nodes.
44
CockroachDB achieves strong consistency:
45
- uses a distributed consensus protocol for synchronous replication of
46
data in each key value range. We’ve chosen to use the [Raft
47
consensus algorithm](https://raftconsensus.github.io); all consensus
48
state is stored in RocksDB.
49
- single or batched mutations to a single range are mediated via the
50
range's Raft instance. Raft guarantees ACID semantics.
51
- logical mutations which affect multiple ranges employ distributed
52
transactions for ACID semantics. CockroachDB uses an efficient
53
**non-locking distributed commit** protocol.
54
55
CockroachDB achieves survivability:
56
- range replicas can be co-located within a single datacenter for low
57
latency replication and survive disk or machine failures. They can
58
be distributed across racks to survive some network switch failures.
59
- range replicas can be located in datacenters spanning increasingly
60
disparate geographies to survive ever-greater failure scenarios from
61
datacenter power or networking loss to regional power failures
62
(e.g. `{ US-East-1a, US-East-1b, US-East-1c }`, `{ US-East, US-West,
63
Japan }`, `{ Ireland, US-East, US-West}`, `{ Ireland, US-East,
64
US-West, Japan, Australia }`).
66
CockroachDB provides [snapshot
67
isolation](http://en.wikipedia.org/wiki/Snapshot_isolation) (SI) and
68
serializable snapshot isolation (SSI) semantics, allowing **externally
69
consistent, lock-free reads and writes**--both from a historical snapshot
70
timestamp and from the current wall clock time. SI provides lock-free reads
71
and writes but still allows write skew. SSI eliminates write skew, but
72
introduces a performance hit in the case of a contentious system. SSI is the
73
default isolation; clients must consciously decide to trade correctness for
74
performance. CockroachDB implements [a limited form of linearizability
75
](#strict-serializability-linearizability), providing ordering for any
76
observer or chain of observers.
77
78
Similar to
79
[Spanner](http://static.googleusercontent.com/media/research.google.com/en/us/archive/spanner-osdi2012.pdf)
80
directories, CockroachDB allows configuration of arbitrary zones of data.
81
This allows replication factor, storage device type, and/or datacenter
82
location to be chosen to optimize performance and/or availability.
83
Unlike Spanner, zones are monolithic and don’t allow movement of fine
84
grained data on the level of entity groups.
85
86
# Architecture
87
88
CockroachDB implements a layered architecture. The highest level of
89
abstraction is the SQL layer (currently unspecified in this document).
90
It depends directly on the [*SQL layer*](#sql),
91
which provides familiar relational concepts
92
such as schemas, tables, columns, and indexes. The SQL layer
93
in turn depends on the [distributed key value store](#key-value-api),
94
which handles the details of range addressing to provide the abstraction
95
of a single, monolithic key value store. The distributed KV store
96
communicates with any number of physical cockroach nodes. Each node
97
contains one or more stores, one per physical device.
98
99
![Architecture](media/architecture.png)
100
101
Each store contains potentially many ranges, the lowest-level unit of
102
key-value data. Ranges are replicated using the Raft consensus protocol.
103
The diagram below is a blown up version of stores from four of the five
104
nodes in the previous diagram. Each range is replicated three ways using
105
raft. The color coding shows associated range replicas.
106
107
![Ranges](media/ranges.png)
108
109
Each physical node exports two RPC-based key value APIs: one for
110
external clients and one for internal clients (exposing sensitive
111
operational features). Both services accept batches of requests and
112
return batches of responses. Nodes are symmetric in capabilities and
113
exported interfaces; each has the same binary and may assume any
114
role.
115
116
Nodes and the ranges they provide access to can be arranged with various
117
physical network topologies to make trade offs between reliability and
118
performance. For example, a triplicated (3-way replica) range could have
119
each replica located on different:
120
121
- disks within a server to tolerate disk failures.
122
- servers within a rack to tolerate server failures.
123
- servers on different racks within a datacenter to tolerate rack power/network failures.
124
- servers in different datacenters to tolerate large scale network or power outages.
125
126
Up to `F` failures can be tolerated, where the total number of replicas `N = 2F + 1` (e.g. with 3x replication, one failure can be tolerated; with 5x replication, two failures, and so on).
127
128
# Keys
129
130
Cockroach keys are arbitrary byte arrays. Keys come in two flavors:
131
system keys and table data keys. System keys are used by Cockroach for
132
internal data structures and metadata. Table data keys contain SQL
133
table data (as well as index data). System and table data keys are
134
prefixed in such a way that all system keys sort before any table data
135
keys.
136
137
System keys come in several subtypes:
138
139
- **Global** keys store cluster-wide data such as the "meta1" and
140
"meta2" keys as well as various other system-wide keys such as the
141
node and store ID allocators.
142
- **Store local** keys are used for unreplicated store metadata
143
(e.g. the `StoreIdent` structure). "Unreplicated" indicates that
144
these values are not replicated across multiple stores because the
145
data they hold is tied to the lifetime of the store they are
146
present on.
147
- **Range local** keys store range metadata that is associated with a
148
global key. Range local keys have a special prefix followed by a
149
global key and a special suffix. For example, transaction records
150
are range local keys which look like:
151
`\x01k<global-key>txn-<txnID>`.
152
- **Replicated Range ID local** keys store range metadata that is
153
present on all of the replicas for a range. These keys are updated
154
via Raft operations. Examples include the range lease state and
155
abort cache entries.
156
- **Unreplicated Range ID local** keys store range metadata that is
157
local to a replica. The primary examples of such keys are the Raft
158
state and Raft log.
159
160
Table data keys are used to store all SQL data. Table data keys
161
contain internal structure as described in the section on [mapping
162
data between the SQL model and
163
KV](#data-mapping-between-the-sql-model-and-kv).
164
165
# Versioned Values
166
167
Cockroach maintains historical versions of values by storing them with
168
associated commit timestamps. Reads and scans can specify a snapshot
169
time to return the most recent writes prior to the snapshot timestamp.
170
Older versions of values are garbage collected by the system during
171
compaction according to a user-specified expiration interval. In order
172
to support long-running scans (e.g. for MapReduce), all versions have a
173
minimum expiration.
174
175
Versioned values are supported via modifications to RocksDB to record
176
commit timestamps and GC expirations per key.
177
178
# Lock-Free Distributed Transactions
179
180
Cockroach provides distributed transactions without locks. Cockroach
181
transactions support two isolation levels:
182
183
- snapshot isolation (SI) and
184
- *serializable* snapshot isolation (SSI).
185
186
*SI* is simple to implement, highly performant, and correct for all but a
187
handful of anomalous conditions (e.g. write skew). *SSI* requires just a touch
188
more complexity, is still highly performant (less so with contention), and has
189
no anomalous conditions. Cockroach’s SSI implementation is based on ideas from
190
the literature and some possibly novel insights.
191
192
SSI is the default level, with SI provided for application developers
193
who are certain enough of their need for performance and the absence of
194
write skew conditions to consciously elect to use it. In a lightly
195
contended system, our implementation of SSI is just as performant as SI,
196
requiring no locking or additional writes. With contention, our
197
implementation of SSI still requires no locking, but will end up
198
aborting more transactions. Cockroach’s SI and SSI implementations
199
prevent starvation scenarios even for arbitrarily long transactions.
200
201
See the [Cahill paper](https://drive.google.com/file/d/0B9GCVTp_FHJIcEVyZVdDWEpYYXVVbFVDWElrYUV0NHFhU2Fv/edit?usp=sharing)
202
for one possible implementation of SSI. This is another [great paper](http://cs.yale.edu/homes/thomson/publications/calvin-sigmod12.pdf).
203
For a discussion of SSI implemented by preventing read-write conflicts
204
(in contrast to detecting them, called write-snapshot isolation), see
205
the [Yabandeh paper](https://drive.google.com/file/d/0B9GCVTp_FHJIMjJ2U2t6aGpHLTFUVHFnMTRUbnBwc2pLa1RN/edit?usp=sharing),
206
which is the source of much inspiration for Cockroach’s SSI.
207
208
Both SI and SSI require that the outcome of reads must be preserved, i.e.
209
a write of a key at a lower timestamp than a previous read must not succeed. To
210
this end, each range maintains a bounded *in-memory* cache from key range to
211
the latest timestamp at which it was read.
212
213
Most updates to this *timestamp cache* correspond to keys being read, though
214
the timestamp cache also protects the outcome of some writes (notably range
215
deletions) which consequently must also populate the cache. The cache’s entries
216
are evicted oldest timestamp first, updating the low water mark of the cache
217
appropriately.
218
219
Each Cockroach transaction is assigned a random priority and a
220
"candidate timestamp" at start. The candidate timestamp is the
221
provisional timestamp at which the transaction will commit, and is
222
chosen as the current clock time of the node coordinating the
223
transaction. This means that a transaction without conflicts will
224
usually commit with a timestamp that, in absolute time, precedes the
225
actual work done by that transaction.
226
May 22, 2015
227
In the course of coordinating a transaction between one or more
228
distributed nodes, the candidate timestamp may be increased, but will
229
never be decreased. The core difference between the two isolation levels
230
SI and SSI is that the former allows the transaction's candidate
231
timestamp to increase and the latter does not.
233
**Hybrid Logical Clock**
234
235
Each cockroach node maintains a hybrid logical clock (HLC) as discussed
236
in the [Hybrid Logical Clock paper](http://www.cse.buffalo.edu/tech-reports/2014-04.pdf).
237
HLC time uses timestamps which are composed of a physical component (thought of
238
as and always close to local wall time) and a logical component (used to
239
distinguish between events with the same physical component). It allows us to
240
track causality for related events similar to vector clocks, but with less
241
overhead. In practice, it works much like other logical clocks: When events
242
are received by a node, it informs the local HLC about the timestamp supplied
243
with the event by the sender, and when events are sent a timestamp generated by
244
the local HLC is attached.
245
246
For a more in depth description of HLC please read the paper. Our
Oct 13, 2016
247
implementation is [here](https://github.com/cockroachdb/cockroach/blob/master/pkg/util/hlc/hlc.go).
248
249
Cockroach picks a Timestamp for a transaction using HLC time. Throughout this
250
document, *timestamp* always refers to the HLC time which is a singleton
251
on each node. The HLC is updated by every read/write event on the node, and
252
the HLC time >= wall time. A read/write timestamp received in a cockroach request
253
from another node is not only used to version the operation, but also updates
254
the HLC on the node. This is useful in guaranteeing that all data read/written
255
on a node is at a timestamp < next HLC time.
257
**Transaction execution flow**
258
259
Transactions are executed in two phases:
261
1. Start the transaction by selecting a range which is likely to be
262
heavily involved in the transaction and writing a new transaction
263
record to a reserved area of that range with state "PENDING". In
264
parallel write an "intent" value for each datum being written as part
265
of the transaction. These are normal MVCC values, with the addition of
266
a special flag (i.e. “intent”) indicating that the value may be
267
committed after the transaction itself commits. In addition,
268
the transaction id (unique and chosen at txn start time by client)
269
is stored with intent values. The txn id is used to refer to the
270
transaction record when there are conflicts and to make
271
tie-breaking decisions on ordering between identical timestamps.
272
Each node returns the timestamp used for the write (which is the
273
original candidate timestamp in the absence of read/write conflicts);
274
the client selects the maximum from amongst all write timestamps as the
275
final commit timestamp.
277
2. Commit the transaction by updating its transaction record. The value
278
of the commit entry contains the candidate timestamp (increased as
279
necessary to accommodate any latest read timestamps). Note that the
280
transaction is considered fully committed at this point and control
281
may be returned to the client.
282
283
In the case of an SI transaction, a commit timestamp which was
284
increased to accommodate concurrent readers is perfectly
285
acceptable and the commit may continue. For SSI transactions,
286
however, a gap between candidate and commit timestamps
287
necessitates transaction restart (note: restart is different than
288
abort--see below).
289
290
After the transaction is committed, all written intents are upgraded
291
in parallel by removing the “intent” flag. The transaction is
292
considered fully committed before this step and does not wait for
293
it to return control to the transaction coordinator.
294
295
In the absence of conflicts, this is the end. Nothing else is necessary
296
to ensure the correctness of the system.
297
298
**Conflict Resolution**
299
300
Things get more interesting when a reader or writer encounters an intent
301
record or newly-committed value in a location that it needs to read or
302
write. This is a conflict, usually causing either of the transactions to
303
abort or restart depending on the type of conflict.
304
305
***Transaction restart:***
306
307
This is the usual (and more efficient) type of behaviour and is used
308
except when the transaction was aborted (for instance by another
309
transaction).
310
In effect, that reduces to two cases; the first being the one outlined
311
above: An SSI transaction that finds upon attempting to commit that
312
its commit timestamp has been pushed. The second case involves a transaction
313
actively encountering a conflict, that is, one of its readers or writers
314
encounter data that necessitate conflict resolution
315
(see transaction interactions below).
316
317
When a transaction restarts, it changes its priority and/or moves its
318
timestamp forward depending on data tied to the conflict, and
319
begins anew reusing the same txn id. The prior run of the transaction might
320
have written some write intents, which need to be deleted before the
321
transaction commits, so as to not be included as part of the transaction.
322
These stale write intent deletions are done during the reexecution of the
323
transaction, either implicitly, through writing new intents to
324
the same keys as part of the reexecution of the transaction, or explicitly,
325
by cleaning up stale intents that are not part of the reexecution of the
326
transaction. Since most transactions will end up writing to the same keys,
327
the explicit cleanup run just before committing the transaction is usually
328
a NOOP.
329
330
***Transaction abort:***
331
332
This is the case in which a transaction, upon reading its transaction
333
record, finds that it has been aborted. In this case, the transaction
334
can not reuse its intents; it returns control to the client before
335
cleaning them up (other readers and writers would clean up dangling
336
intents as they encounter them) but will make an effort to clean up
337
after itself. The next attempt (if applicable) then runs as a new
338
transaction with **a new txn id**.
339
340
***Transaction interactions:***
341
342
There are several scenarios in which transactions interact:
343
344
- **Reader encounters write intent or value with newer timestamp far
345
enough in the future**: This is not a conflict. The reader is free
346
to proceed; after all, it will be reading an older version of the
347
value and so does not conflict. Recall that the write intent may
348
be committed with a later timestamp than its candidate; it will
349
never commit with an earlier one. **Side note**: if a SI transaction
350
reader finds an intent with a newer timestamp which the reader’s own
351
transaction has written, the reader always returns that intent's value.
352
353
- **Reader encounters write intent or value with newer timestamp in the
354
near future:** In this case, we have to be careful. The newer
355
intent may, in absolute terms, have happened in our read's past if
356
the clock of the writer is ahead of the node serving the values.
357
In that case, we would need to take this value into account, but
358
we just don't know. Hence the transaction restarts, using instead
359
a future timestamp (but remembering a maximum timestamp used to
360
limit the uncertainty window to the maximum clock skew). In fact,
361
this is optimized further; see the details under "choosing a time
362
stamp" below.
363
364
- **Reader encounters write intent with older timestamp**: the reader
365
must follow the intent’s transaction id to the transaction record.
366
If the transaction has already been committed, then the reader can
367
just read the value. If the write transaction has not yet been
368
committed, then the reader has two options. If the write conflict
369
is from an SI transaction, the reader can *push that transaction's
370
commit timestamp into the future* (and consequently not have to
371
read it). This is simple to do: the reader just updates the
372
transaction’s commit timestamp to indicate that when/if the
373
transaction does commit, it should use a timestamp *at least* as
374
high. However, if the write conflict is from an SSI transaction,
375
the reader must compare priorities. If the reader has the higher priority,
376
it pushes the transaction’s commit timestamp (that
377
transaction will then notice its timestamp has been pushed, and
378
restart). If it has the lower or same priority, it retries itself using as
379
a new priority `max(new random priority, conflicting txn’s
380
priority - 1)`.
382
- **Writer encounters uncommitted write intent**:
383
If the other write intent has been written by a transaction with a lower
384
priority, the writer aborts the conflicting transaction. If the write
385
intent has a higher or equal priority the transaction retries, using as a new
386
priority *max(new random priority, conflicting txn’s priority - 1)*;
387
the retry occurs after a short, randomized backoff interval.
389
- **Writer encounters newer committed value**:
390
The committed value could also be an unresolved write intent made by a
391
transaction that has already committed. The transaction restarts. On restart,
392
the same priority is reused, but the candidate timestamp is moved forward
393
to the encountered value's timestamp.
395
- **Writer encounters more recently read key**:
396
The *read timestamp cache* is consulted on each write at a node. If the write’s
397
candidate timestamp is earlier than the low water mark on the cache itself
398
(i.e. its last evicted timestamp) or if the key being written has a read
399
timestamp later than the write’s candidate timestamp, this later timestamp
400
value is returned with the write. A new timestamp forces a transaction
401
restart only if it is serializable.
403
**Transaction management**
404
405
Transactions are managed by the client proxy (or gateway in SQL Azure
406
parlance). Unlike in Spanner, writes are not buffered but are sent
407
directly to all implicated ranges. This allows the transaction to abort
408
quickly if it encounters a write conflict. The client proxy keeps track
409
of all written keys in order to resolve write intents asynchronously upon
410
transaction completion. If a transaction commits successfully, all intents
411
are upgraded to committed. In the event a transaction is aborted, all written
412
intents are deleted. The client proxy doesn’t guarantee it will resolve intents.
413
414
In the event the client proxy restarts before the pending transaction is
415
committed, the dangling transaction would continue to "live" until
416
aborted by another transaction. Transactions periodically heartbeat
417
their transaction record to maintain liveness.
418
Transactions encountered by readers or writers with dangling intents
419
which haven’t been heartbeat within the required interval are aborted.
420
In the event the proxy restarts after a transaction commits but before
421
the asynchronous resolution is complete, the dangling intents are upgraded
422
when encountered by future readers and writers and the system does
423
not depend on their timely resolution for correctness.
424
425
An exploration of retries with contention and abort times with abandoned
426
transaction is
427
[here](https://docs.google.com/document/d/1kBCu4sdGAnvLqpT-_2vaTbomNmX3_saayWEGYu1j7mQ/edit?usp=sharing).
428
429
**Transaction Records**
Oct 13, 2016
431
Please see [pkg/roachpb/data.proto](https://github.com/cockroachdb/cockroach/blob/master/pkg/roachpb/data.proto) for the up-to-date structures, the best entry point being `message Transaction`.
432
433
**Pros**
434
435
- No requirement for reliable code execution to prevent stalled 2PC
436
protocol.
437
- Readers never block with SI semantics; with SSI semantics, they may
438
abort.
439
- Lower latency than traditional 2PC commit protocol (w/o contention)
440
because second phase requires only a single write to the
441
transaction record instead of a synchronous round to all
442
transaction participants.
443
- Priorities avoid starvation for arbitrarily long transactions and
444
always pick a winner from between contending transactions (no
445
mutual aborts).
446
- Writes not buffered at client; writes fail fast.
447
- No read-locking overhead required for *serializable* SI (in contrast
448
to other SSI implementations).
449
- Well-chosen (i.e. less random) priorities can flexibly give
450
probabilistic guarantees on latency for arbitrary transactions
451
(for example: make OLTP transactions 10x less likely to abort than
452
low priority transactions, such as asynchronously scheduled jobs).
453
454
**Cons**
455
456
- Reads from non-lease holder replicas still require a ping to the lease holder
457
to update the *read timestamp cache*.
458
- Abandoned transactions may block contending writers for up to the
459
heartbeat interval, though average wait is likely to be
460
considerably shorter (see [graph in link](https://docs.google.com/document/d/1kBCu4sdGAnvLqpT-_2vaTbomNmX3_saayWEGYu1j7mQ/edit?usp=sharing)).
461
This is likely considerably more performant than detecting and
462
restarting 2PC in order to release read and write locks.
463
- Behavior different than other SI implementations: no first writer
464
wins, and shorter transactions do not always finish quickly.
465
Element of surprise for OLTP systems may be a problematic factor.
466
- Aborts can decrease throughput in a contended system compared with
467
two phase locking. Aborts and retries increase read and write
468
traffic, increase latency and decrease throughput.
469
470
**Choosing a Timestamp**
471
472
A key challenge of reading data in a distributed system with clock skew
473
is choosing a timestamp guaranteed to be greater than the latest
474
timestamp of any committed transaction (in absolute time). No system can
475
claim consistency and fail to read already-committed data.
476
477
Accomplishing consistency for transactions (or just single operations)
478
accessing a single node is easy. The timestamp is assigned by the node
479
itself, so it is guaranteed to be at a greater timestamp than all the
480
existing timestamped data on the node.
481
482
For multiple nodes, the timestamp of the node coordinating the
483
transaction `t` is used. In addition, a maximum timestamp `t+ε` is
484
supplied to provide an upper bound on timestamps for already-committed
485
data (`ε` is the maximum clock skew). As the transaction progresses, any
486
data read which have timestamps greater than `t` but less than `t+ε`
487
cause the transaction to abort and retry with the conflicting timestamp
488
t<sub>c</sub>, where t<sub>c</sub> \> t. The maximum timestamp `t+ε` remains
489
the same. This implies that transaction restarts due to clock uncertainty
490
can only happen on a time interval of length `ε`.
492
We apply another optimization to reduce the restarts caused
493
by uncertainty. Upon restarting, the transaction not only takes
494
into account t<sub>c</sub>, but the timestamp of the node at the time
495
of the uncertain read t<sub>node</sub>. The larger of those two timestamps
496
t<sub>c</sub> and t<sub>node</sub> (likely equal to the latter) is used
497
to increase the read timestamp. Additionally, the conflicting node is
498
marked as “certain”. Then, for future reads to that node within the
499
transaction, we set `MaxTimestamp = Read Timestamp`, preventing further
500
uncertainty restarts.
501
502
Correctness follows from the fact that we know that at the time of the read,
503
there exists no version of any key on that node with a higher timestamp than
504
t<sub>node</sub>. Upon a restart caused by the node, if the transaction
505
encounters a key with a higher timestamp, it knows that in absolute time,
506
the value was written after t<sub>node</sub> was obtained, i.e. after the
507
uncertain read. Hence the transaction can move forward reading an older version
508
of the data (at the transaction's timestamp). This limits the time uncertainty
509
restarts attributed to a node to at most one. The tradeoff is that we might
510
pick a timestamp larger than the optimal one (> highest conflicting timestamp),
511
resulting in the possibility of a few more conflicts.
512
513
We expect retries will be rare, but this assumption may need to be
514
revisited if retries become problematic. Note that this problem does not
515
apply to historical reads. An alternate approach which does not require
516
retries makes a round to all node participants in advance and
517
chooses the highest reported node wall time as the timestamp. However,
518
knowing which nodes will be accessed in advance is difficult and
519
potentially limiting. Cockroach could also potentially use a global
520
clock (Google did this with [Percolator](https://www.usenix.org/legacy/event/osdi10/tech/full_papers/Peng.pdf)),
521
which would be feasible for smaller, geographically-proximate clusters.
523
# Strict Serializability (Linearizability)
524
525
Roughly speaking, the gap between <i>strict serializability</i> (which we use
526
interchangeably with <i>linearizability</i>) and CockroachDB's default
527
isolation level (<i>serializable</i>) is that with linearizable transactions,
528
causality is preserved. That is, if one transaction (say, creating a posting
529
for a user) waits for its predecessor (creating the user in the first place)
530
to complete, one would hope that the logical timestamp assigned to the former
531
is larger than that of the latter.
532
In practice, in distributed databases this may not hold, the reason typically
533
being that clocks across a distributed system are not perfectly synchronized
534
and the "later" transaction touches a part disjoint from that on which the
535
first transaction ran, resulting in clocks with disjoint information to decide
536
on the commit timestamps.
537
538
In practice, in CockroachDB many transactional workloads are actually
539
linearizable, though the precise conditions are too involved to outline them
540
here.
541
542
Causality is typically not required for many transactions, and so it is
543
advantageous to pay for it only when it *is* needed. CockroachDB implements
544
this via <i>causality tokens</i>: When committing a transaction, a causality
545
token can be retrieved and passed to the next transaction, ensuring that these
546
two transactions get assigned increasing logical timestamps.
547
548
Additionally, as better synchronized clocks become a standard commodity offered
549
by cloud providers, CockroachDB can provide global linearizability by doing
550
much the same that [Google's
551
Spanner](http://research.google.com/archive/spanner.html) does: wait out the
552
maximum clock offset after committing, but before returning to the client.
553
554
See the blog post below for much more in-depth information.
555
556
https://www.cockroachlabs.com/blog/living-without-atomic-clocks/
557
558
# Logical Map Content
559
560
Logically, the map contains a series of reserved system key/value
561
pairs preceding the actual user data (which is managed by the SQL
562
subsystem).
564
- `\x02<key1>`: Range metadata for range ending `\x03<key1>`. This a "meta1" key.
565
- ...
566
- `\x02<keyN>`: Range metadata for range ending `\x03<keyN>`. This a "meta1" key.
567
- `\x03<key1>`: Range metadata for range ending `<key1>`. This a "meta2" key.
568
- ...
569
- `\x03<keyN>`: Range metadata for range ending `<keyN>`. This a "meta2" key.
570
- `\x04{desc,node,range,store}-idegen`: ID generation oracles for various component types.
571
- `\x04status-node-<varint encoded Store ID>`: Store runtime metadata.
572
- `\x04tsd<key>`: Time-series data key.
573
- `<key>`: A user key. In practice, these keys are managed by the SQL
574
subsystem, which employs its own key anatomy.
576
# Stores and Storage
577
578
Nodes contain one or more stores. Each store should be placed on a unique disk.
579
Internally, each store contains a single instance of RocksDB with a block cache
580
shared amongst all of the stores in a node. And these stores in turn have
581
a collection of range replicas. More than one replica for a range will never
582
be placed on the same store or even the same node.
583
584
Early on, when a cluster is first initialized, the few default starting ranges
585
will only have a single replica, but as soon as other nodes are available they
586
will replicate to them until they've reached their desired replication factor,
587
the default being 3.
588
589
Zone configs can be used to control a range's replication factor and add
590
constraints as to where the range's replicas can be located. When there is a
591
change in a range's zone config, the range will up or down replicate to the
592
appropriate number of replicas and move its replicas to the appropriate stores
593
based on zone config's constraints.
594
595
# Self Repair
596
597
If a store has not been heard from (gossiped their descriptors) in some time,
598
the default setting being 5 minutes, the cluster will consider this store to be
599
dead. When this happens, all ranges that have replicas on that store are
600
determined to be unavailable and removed. These ranges will then upreplicate
601
themselves to other available stores until their desired replication factor is
602
again met. If 50% or more of the replicas are unavailable at the same time,
603
there is no quorum and the whole range will be considered unavailable until at
604
least greater than 50% of the replicas are again available.
605
606
# Rebalancing
607
608
As more data are added to the system, some stores may grow faster than others.
609
To combat this and to spread the overall load across the full cluster, replicas
610
will be moved between stores maintaining the desired replication factor. The
611
heuristics used to perform this rebalancing include:
612
613
- the number of replicas per store
614
- the total size of the data used per store
615
- free space available per store
616
617
In the future, some other factors that might be considered include:
618
619
- cpu/network load per store
620
- ranges that are used together often in queries
621
- number of active ranges per store
622
- number of range leases held per store
623
624
# Range Metadata
625
626
The default approximate size of a range is 64M (2\^26 B). In order to
627
support 1P (2\^50 B) of logical data, metadata is needed for roughly
628
2\^(50 - 26) = 2\^24 ranges. A reasonable upper bound on range metadata
629
size is roughly 256 bytes (3\*12 bytes for the triplicated node
630
locations and 220 bytes for the range key itself). 2\^24 ranges \* 2\^8
631
B would require roughly 4G (2\^32 B) to store--too much to duplicate
632
between machines. Our conclusion is that range metadata must be
633
distributed for large installations.
634
635
To keep key lookups relatively fast in the presence of distributed metadata,
636
we store all the top-level metadata in a single range (the first range). These
637
top-level metadata keys are known as *meta1* keys, and are prefixed such that
638
they sort to the beginning of the key space. Given the metadata size of 256
639
bytes given above, a single 64M range would support 64M/256B = 2\^18 ranges,
640
which gives a total storage of 64M \* 2\^18 = 16T. To support the 1P quoted
641
above, we need two levels of indirection, where the first level addresses the
642
second, and the second addresses user data. With two levels of indirection, we
643
can address 2\^(18 + 18) = 2\^36 ranges; each range addresses 2\^26 B, and
644
altogether we address 2\^(36+26) B = 2\^62 B = 4E of user data.
645
646
For a given user-addressable `key1`, the associated *meta1* record is found
647
at the successor key to `key1` in the *meta1* space. Since the *meta1* space
648
is sparse, the successor key is defined as the next key which is present. The
649
*meta1* record identifies the range containing the *meta2* record, which is
650
found using the same process. The *meta2* record identifies the range
651
containing `key1`, which is again found the same way (see examples below).
653
Concretely, metadata keys are prefixed by `\x02` (meta1) and `\x03`
654
(meta2); the prefixes `\x02` and `\x03` provide for the desired
655
sorting behaviour. Thus, `key1`'s *meta1* record will reside at the
656
successor key to `\x02<key1>`.
Jul 29, 2015
658
Note: we append the end key of each range to meta{1,2} records because
659
the RocksDB iterator only supports a Seek() interface which acts as a
660
Ceil(). Using the start key of the range would cause Seek() to find the
661
key *after* the meta indexing record we’re looking for, which would
662
result in having to back the iterator up, an option which is both less
663
efficient and not available in all cases.
664
665
The following example shows the directory structure for a map with
666
three ranges worth of data. Ellipses indicate additional key/value
667
pairs to fill an entire range of data. For clarity, the examples use
668
`meta1` and `meta2` to refer to the prefixes `\x02` and `\x03`. Except
669
for the fact that splitting ranges requires updates to the range
670
metadata with knowledge of the metadata layout, the range metadata
671
itself requires no special treatment or bootstrapping.
672
673
**Range 0** (located on servers `dcrama1:8000`, `dcrama2:8000`,
674
`dcrama3:8000`)
675
676
- `meta1\xff`: `dcrama1:8000`, `dcrama2:8000`, `dcrama3:8000`
677
- `meta2<lastkey0>`: `dcrama1:8000`, `dcrama2:8000`, `dcrama3:8000`
678
- `meta2<lastkey1>`: `dcrama4:8000`, `dcrama5:8000`, `dcrama6:8000`
679
- `meta2\xff`: `dcrama7:8000`, `dcrama8:8000`, `dcrama9:8000`
680
- ...
681
- `<lastkey0>`: `<lastvalue0>`
682
683
**Range 1** (located on servers `dcrama4:8000`, `dcrama5:8000`,
684
`dcrama6:8000`)
685
686
- ...
687
- `<lastkey1>`: `<lastvalue1>`
688
689
**Range 2** (located on servers `dcrama7:8000`, `dcrama8:8000`,
690
`dcrama9:8000`)
691
692
- ...
693
- `<lastkey2>`: `<lastvalue2>`
694
695
Consider a simpler example of a map containing less than a single
696
range of data. In this case, all range metadata and all data are
697
located in the same range:
698
699
**Range 0** (located on servers `dcrama1:8000`, `dcrama2:8000`,
700
`dcrama3:8000`)*
701
702
- `meta1\xff`: `dcrama1:8000`, `dcrama2:8000`, `dcrama3:8000`
703
- `meta2\xff`: `dcrama1:8000`, `dcrama2:8000`, `dcrama3:8000`
704
- `<key0>`: `<value0>`
705
- `...`
706
707
Finally, a map large enough to need both levels of indirection would
708
look like (note that instead of showing range replicas, this
709
example is simplified to just show range indexes):
710
711
**Range 0**
712
713
- `meta1<lastkeyN-1>`: Range 0
714
- `meta1\xff`: Range 1
715
- `meta2<lastkey1>`: Range 1
716
- `meta2<lastkey2>`: Range 2
717
- `meta2<lastkey3>`: Range 3
718
- ...
719
- `meta2<lastkeyN-1>`: Range 262143
720
721
**Range 1**
722
723
- `meta2<lastkeyN>`: Range 262144
724
- `meta2<lastkeyN+1>`: Range 262145
725
- ...
726
- `meta2\xff`: Range 500,000
727
- ...
728
- `<lastkey1>`: `<lastvalue1>`
729
730
**Range 2**
731
732
- ...
733
- `<lastkey2>`: `<lastvalue2>`
734
735
**Range 3**
736
737
- ...
738
- `<lastkey3>`: `<lastvalue3>`
739
740
**Range 262144**
741
742
- ...
743
- `<lastkeyN>`: `<lastvalueN>`
744
745
**Range 262145**
746
747
- ...
748
- `<lastkeyN+1>`: `<lastvalueN+1>`
749
750
Note that the choice of range `262144` is just an approximation. The
751
actual number of ranges addressable via a single metadata range is
752
dependent on the size of the keys. If efforts are made to keep key sizes
753
small, the total number of addressable ranges would increase and vice
754
versa.
755
756
From the examples above it’s clear that key location lookups require at
757
most three reads to get the value for `<key>`:
758
759
1. lower bound of `meta1<key>`
760
2. lower bound of `meta2<key>`,
761
3. `<key>`.
762
763
For small maps, the entire lookup is satisfied in a single RPC to Range 0. Maps
764
containing less than 16T of data would require two lookups. Clients cache both
765
levels of range metadata, and we expect that data locality for individual
766
clients will be high. Clients may end up with stale cache entries. If on a
767
lookup, the range consulted does not match the client’s expectations, the
768
client evicts the stale entries and possibly does a new lookup.
769
770
# Raft - Consistency of Range Replicas
771
772
Each range is configured to consist of three or more replicas, as specified by
773
their ZoneConfig. The replicas in a range maintain their own instance of a
774
distributed consensus algorithm. We use the [*Raft consensus algorithm*](https://raftconsensus.github.io)
775
as it is simpler to reason about and includes a reference implementation
776
covering important details.
777
[ePaxos](https://www.cs.cmu.edu/~dga/papers/epaxos-sosp2013.pdf) has
778
promising performance characteristics for WAN-distributed replicas, but
779
it does not guarantee a consistent ordering between replicas.
780
781
Raft elects a relatively long-lived leader which must be involved to
782
propose commands. It heartbeats followers periodically and keeps their logs
783
replicated. In the absence of heartbeats, followers become candidates
784
after randomized election timeouts and proceed to hold new leader
785
elections. Cockroach weights random timeouts such that the replicas with
786
shorter round trip times to peers are more likely to hold elections
787
first (not implemented yet). Only the Raft leader may propose commands;
788
followers will simply relay commands to the last known leader.
790
Our Raft implementation was developed together with CoreOS, but adds an extra
791
layer of optimization to account for the fact that a single Node may have
792
millions of consensus groups (one for each Range). Areas of optimization
793
are chiefly coalesced heartbeats (so that the number of nodes dictates the
794
number of heartbeats as opposed to the much larger number of ranges) and
795
batch processing of requests.
796
Future optimizations may include two-phase elections and quiescent ranges
797
(i.e. stopping traffic completely for inactive ranges).
798
799
# Range Leases
800
801
As outlined in the Raft section, the replicas of a Range are organized as a
802
Raft group and execute commands from their shared commit log. Going through
803
Raft is an expensive operation though, and there are tasks which should only be
804
carried out by a single replica at a time (as opposed to all of them).
805
In particular, it is desirable to serve authoritative reads from a single
806
Replica (ideally from more than one, but that is far more difficult).
808
For these reasons, Cockroach introduces the concept of **Range Leases**:
809
This is a lease held for a slice of (database, i.e. hybrid logical) time.
810
A replica establishes itself as owning the lease on a range by committing
811
a special lease acquisition log entry through raft. The log entry contains
812
the replica node's epoch from the node liveness table--a system
813
table containing an epoch and an expiration time for each node. A node is
814
responsible for continuously updating the expiration time for its entry
815
in the liveness table. Once the lease has been committed through raft
816
the replica becomes the lease holder as soon as it applies the lease
817
acquisition command, guaranteeing that when it uses the lease it has
818
already applied all prior writes on the replica and can see them locally.
819
820
To prevent two nodes from acquiring the lease, the requestor includes a copy
821
of the lease that it believes to be valid at the time it requests the lease.
822
If that lease is still valid when the new lease is applied, it is granted,
823
or another lease is granted in the interim and the requested lease is
824
ignored. A lease can move from node A to node B only after node A's
825
liveness record has expired and its epoch has been incremented.
826
827
Note: range leases for ranges within the node liveness table keyspace and
828
all ranges that precede it, including meta1 and meta2, are not managed using
829
the above mechanism to prevent circular dependencies.
830
831
A replica holding a lease at a specific epoch can use the lease as long as
832
the node epoch hasn't changed and the expiration time hasn't passed.
833
The replica holding the lease may satisfy reads locally, without incurring the
834
overhead of going through Raft, and is in charge or involved in handling
835
Range-specific maintenance tasks such as splitting, merging and rebalancing
836
837
All Reads and writes are generally addressed to the replica holding
838
the lease; if none does, any replica may be addressed, causing it to try
839
to obtain the lease synchronously. Requests received by a non-lease holder
840
(for the HLC timestamp specified in the request's header) fail with an
841
error pointing at the replica's last known lease holder. These requests
842
are retried transparently with the updated lease by the gateway node and
843
never reach the client.
844
845
Since reads bypass Raft, a new lease holder will, among other things, ascertain
846
that its timestamp cache does not report timestamps smaller than the previous
847
lease holder's (so that it's compatible with reads which may have occurred on
848
the former lease holder). This is accomplished by letting leases enter
849
a <i>stasis period</i> (which is just the expiration minus the maximum clock
850
offset) before the actual expiration of the lease, so that all the next lease
851
holder has to do is set the low water mark of the timestamp cache to its
852
new lease's start time.
853
854
As a lease enters its stasis period, no more reads or writes are served, which
855
is undesirable. However, this would only happen in practice if a node became
856
unavailable. In almost all practical situations, no unavailability results
857
since leases are usually long-lived (and/or eagerly extended, which can avoid
858
the stasis period) or proactively transferred away from the lease holder, which
859
can also avoid the stasis period by promising not to serve any further reads
860
until the next lease goes into effect.
861
862
## Colocation with Raft leadership
864
The range lease is completely separate from Raft leadership, and so without
865
further efforts, Raft leadership and the Range lease might not be held by the
866
same Replica. Since it's expensive to not have these two roles colocated (the
867
lease holder has to forward each proposal to the leader, adding costly RPC
868
round-trips), each lease renewal or transfer also attempts to colocate them.
869
In practice, that means that the mismatch is rare and self-corrects quickly.
871
## Command Execution Flow
872
873
This subsection describes how a lease holder replica processes a
874
read/write command in more details. Each command specifies (1) a key
875
(or a range of keys) that the command accesses and (2) the ID of a
876
range which the key(s) belongs to. When receiving a command, a node
877
looks up a range by the specified Range ID and checks if the range is
878
still responsible for the supplied keys. If any of the keys do not
879
belong to the range, the node returns an error so that the client will
880
retry and send a request to a correct range.
882
When all the keys belong to the range, the node attempts to
883
process the command. If the command is an inconsistent read-only
884
command, it is processed immediately. If the command is a consistent
885
read or a write, the command is executed when both of the following
886
conditions hold:
887
888
- The range replica has a range lease.
889
- There are no other running commands whose keys overlap with
890
the submitted command and cause read/write conflict.
891
892
When the first condition is not met, the replica attempts to acquire
893
a lease or returns an error so that the client will redirect the
894
command to the current lease holder. The second condition guarantees that
895
consistent read/write commands for a given key are sequentially
896
executed.
897
898
When the above two conditions are met, the lease holder replica processes the
899
command. Consistent reads are processed on the lease holder immediately.
900
Write commands are committed into the Raft log so that every replica
901
will execute the same commands. All commands produce deterministic
902
results so that the range replicas keep consistent states among them.
903
904
When a write command completes, all the replica updates their response
905
cache to ensure idempotency. When a read command completes, the lease holder
906
replica updates its timestamp cache to keep track of the latest read
907
for a given key.
908
909
There is a chance that a range lease gets expired while a command is
910
executed. Before executing a command, each replica checks if a replica
911
proposing the command has a still lease. When the lease has been
912
expired, the command will be rejected by the replica.
913
914
915
# Splitting / Merging Ranges
916
917
Nodes split or merge ranges based on whether they exceed maximum or
918
minimum thresholds for capacity or load. Ranges exceeding maximums for
919
either capacity or load are split; ranges below minimums for *both*
920
capacity and load are merged.
921
922
Ranges maintain the same accounting statistics as accounting key
923
prefixes. These boil down to a time series of data points with minute
924
granularity. Everything from number of bytes to read/write queue sizes.
925
Arbitrary distillations of the accounting stats can be determined as the
926
basis for splitting / merging. Two sensible metrics for use with
927
split/merge are range size in bytes and IOps. A good metric for
928
rebalancing a replica from one node to another would be total read/write
929
queue wait times. These metrics are gossipped, with each range / node
930
passing along relevant metrics if they’re in the bottom or top of the
931
range it’s aware of.
932
933
A range finding itself exceeding either capacity or load threshold
934
splits. To this end, the range lease holder computes an appropriate split key
935
candidate and issues the split through Raft. In contrast to splitting,
936
merging requires a range to be below the minimum threshold for both
937
capacity *and* load. A range being merged chooses the smaller of the
938
ranges immediately preceding and succeeding it.
939
940
Splitting, merging, rebalancing and recovering all follow the same basic
941
algorithm for moving data between roach nodes. New target replicas are
942
created and added to the replica set of source range. Then each new
943
replica is brought up to date by either replaying the log in full or
944
copying a snapshot of the source replica data and then replaying the log
945
from the timestamp of the snapshot to catch up fully. Once the new
946
replicas are fully up to date, the range metadata is updated and old,
947
source replica(s) deleted if applicable.
948
949
**Coordinator** (lease holder replica)
950
951
```
952
if splitting
953
SplitRange(split_key): splits happen locally on range replicas and
954
only after being completed locally, are moved to new target replicas.
955
else if merging
956
Choose new replicas on same servers as target range replicas;
957
add to replica set.
958
else if rebalancing || recovering
959
Choose new replica(s) on least loaded servers; add to replica set.
960
```
961
962
**New Replica**
963
964
*Bring replica up to date:*
965
966
```
967
if all info can be read from replicated log
968
copy replicated log
969
else
970
snapshot source replica
971
send successive ReadRange requests to source replica
972
referencing snapshot
973
974
if merging
975
combine ranges on all replicas
976
else if rebalancing || recovering
977
remove old range replica(s)
978
```
979
980
Nodes split ranges when the total data in a range exceeds a
981
configurable maximum threshold. Similarly, ranges are merged when the
982
total data falls below a configurable minimum threshold.
983
984
**TBD: flesh this out**: Especially for merges (but also rebalancing) we have a
985
range disappearing from the local node; that range needs to disappear
986
gracefully, with a smooth handoff of operation to the new owner of its data.
987
988
Ranges are rebalanced if a node determines its load or capacity is one
989
of the worst in the cluster based on gossipped load stats. A node with
990
spare capacity is chosen in the same datacenter and a special-case split
991
is done which simply duplicates the data 1:1 and resets the range
992
configuration metadata.
993
994
# Node Allocation (via Gossip)
995
996
New nodes must be allocated when a range is split. Instead of requiring
997
every node to know about the status of all or even a large number
998
of peer nodes --or-- alternatively requiring a specialized curator or
999
master with sufficiently global knowledge, we use a gossip protocol to
1000
efficiently communicate only interesting information between all of the
1001
nodes in the cluster. What’s interesting information? One example would
1002
be whether a particular node has a lot of spare capacity. Each node,
1003
when gossiping, compares each topic of gossip to its own state. If its
1004
own state is somehow “more interesting” than the least interesting item
1005
in the topic it’s seen recently, it includes its own state as part of
1006
the next gossip session with a peer node. In this way, a node with
1007
capacity sufficiently in excess of the mean quickly becomes discovered
1008
by the entire cluster. To avoid piling onto outliers, nodes from the
1009
high capacity set are selected at random for allocation.
1010
1011
The gossip protocol itself contains two primary components:
1012
1013
- **Peer Selection**: each node maintains up to N peers with which it
1014
regularly communicates. It selects peers with an eye towards
1015
maximizing fanout. A peer node which itself communicates with an
1016
array of otherwise unknown nodes will be selected over one which
1017
communicates with a set containing significant overlap. Each time
1018
gossip is initiated, each nodes’ set of peers is exchanged. Each
1019
node is then free to incorporate the other’s peers as it sees fit.
1020
To avoid any node suffering from excess incoming requests, a node
1021
may refuse to answer a gossip exchange. Each node is biased
1022
towards answering requests from nodes without significant overlap
1023
and refusing requests otherwise.
1024
1025
Peers are efficiently selected using a heuristic as described in
1026
[Agarwal & Trachtenberg (2006)](https://drive.google.com/file/d/0B9GCVTp_FHJISmFRTThkOEZSM1U/edit?usp=sharing).
1027
1028
**TBD**: how to avoid partitions? Need to work out a simulation of
1029
the protocol to tune the behavior and see empirically how well it
1030
works.
1031
1032
- **Gossip Selection**: what to communicate. Gossip is divided into
1033
topics. Load characteristics (capacity per disk, cpu load, and
1034
state [e.g. draining, ok, failure]) are used to drive node
1035
allocation. Range statistics (range read/write load, missing
1036
replicas, unavailable ranges) and network topology (inter-rack
1037
bandwidth/latency, inter-datacenter bandwidth/latency, subnet
1038
outages) are used for determining when to split ranges, when to
1039
recover replicas vs. wait for network connectivity, and for
1040
debugging / sysops. In all cases, a set of minimums and a set of
1041
maximums is propagated; each node applies its own view of the
1042
world to augment the values. Each minimum and maximum value is
1043
tagged with the reporting node and other accompanying contextual
1044
information. Each topic of gossip has its own protobuf to hold the
1045
structured data. The number of items of gossip in each topic is
1046
limited by a configurable bound.
1047
1048
For efficiency, nodes assign each new item of gossip a sequence
1049
number and keep track of the highest sequence number each peer
1050
node has seen. Each round of gossip communicates only the delta
1051
containing new items.
1052
1053
# Node and Cluster Metrics
1054
1055
Every component of the system is responsible for exporting interesting
1056
metrics about itself. These could be histograms, throughput counters, or
1057
gauges.
1058
1059
These metrics are exported for external monitoring systems (such as Prometheus)
1060
via a HTTP endpoint, but CockroachDB also implements an internal timeseries
1061
database which is stored in the replicated key-value map.
1062
1063
Time series are stored at Store granularity and allow the admin dashboard
1064
to efficiently gain visibility into a universe of information at the Cluster,
1065
Node or Store level. A [periodic background process](RFCS/20160901_time_series_culling.md)
1066
culls older timeseries data, downsampling and eventually discarding it.
1068
# Key-prefix Accounting and Zones
1070
Arbitrarily fine-grained accounting is specified via
1071
key prefixes. Key prefixes can overlap, as is necessary for capturing
1072
hierarchical relationships. For illustrative purposes, let’s say keys
1073
specifying rows in a set of databases have the following format:
1074
1075
`<db>:<table>:<primary-key>[:<secondary-key>]`
1076
1077
In this case, we might collect accounting with
1078
key prefixes:
1079
1080
`db1`, `db1:user`, `db1:order`,
1081
1082
Accounting is kept for the entire map by default.
1083
1084
## Accounting
1085
to keep accounting for a range defined by a key prefix, an entry is created in
1086
the accounting system table. The format of accounting table keys is:
1087
1088
`\0acct<key-prefix>`
1089
1090
In practice, we assume each node is capable of caching the
1091
entire accounting table as it is likely to be relatively small.
1092
1093
Accounting is kept for key prefix ranges with eventual consistency for
1094
efficiency. There are two types of values which comprise accounting:
1095
counts and occurrences, for lack of better terms. Counts describe
1096
system state, such as the total number of bytes, rows,
1097
etc. Occurrences include transient performance and load metrics. Both
1098
types of accounting are captured as time series with minute
1099
granularity. The length of time accounting metrics are kept is
1100
configurable. Below are examples of each type of accounting value.
1101
1102
**System State Counters/Performance**
1103
1104
- Count of items (e.g. rows)
1105
- Total bytes
1106
- Total key bytes
1107
- Total value length
1108
- Queued message count
1109
- Queued message total bytes
1110
- Count of values \< 16B
1111
- Count of values \< 64B
1112
- Count of values \< 256B
1113
- Count of values \< 1K
1114
- Count of values \< 4K
1115
- Count of values \< 16K
1116
- Count of values \< 64K
1117
- Count of values \< 256K
1118
- Count of values \< 1M
1119
- Count of values \> 1M
1120
- Total bytes of accounting
1121
1122
1123
**Load Occurrences**
1124
1125
- Get op count
1126
- Get total MB
1127
- Put op count
1128
- Put total MB
1129
- Delete op count
1130
- Delete total MB
1131
- Delete range op count
1132
- Delete range total MB
1133
- Scan op count
1134
- Scan op MB
1135
- Split count
1136
- Merge count
1137
1138
Because accounting information is kept as time series and over many
1139
possible metrics of interest, the data can become numerous. Accounting
1140
data are stored in the map near the key prefix described, in order to
1141
distribute load (for both aggregation and storage).
1142
1143
Accounting keys for system state have the form:
1144
`<key-prefix>|acctd<metric-name>*`. Notice the leading ‘pipe’
1145
character. It’s meant to sort the root level account AFTER any other
1146
system tables. They must increment the same underlying values as they
1147
are permanent counts, and not transient activity. Logic at the
1148
node takes care of snapshotting the value into an appropriately
1149
suffixed (e.g. with timestamp hour) multi-value time series entry.
1150
1151
Keys for perf/load metrics:
1152
`<key-prefix>acctd<metric-name><hourly-timestamp>`.
1153
1154
`<hourly-timestamp>`-suffixed accounting entries are multi-valued,
1155
containing a varint64 entry for each minute with activity during the
1156
specified hour.
1157
1158
To efficiently keep accounting over large key ranges, the task of
1159
aggregation must be distributed. If activity occurs within the same
1160
range as the key prefix for accounting, the updates are made as part
1161
of the consensus write. If the ranges differ, then a message is sent
1162
to the parent range to increment the accounting. If upon receiving the
1163
message, the parent range also does not include the key prefix, it in
1164
turn forwards it to its parent or left child in the balanced binary
1165
tree which is maintained to describe the range hierarchy. This limits
1166
the number of messages before an update is visible at the root to `2*log N`,
1167
where `N` is the number of ranges in the key prefix.
1168
1169
## Zones
1170
zones are stored in the map with keys prefixed by
1171
`\0zone` followed by the key prefix to which the zone
1172
configuration applies. Zone values specify a protobuf containing
1173
the datacenters from which replicas for ranges which fall under
1174
the zone must be chosen.
1175
Oct 13, 2016
1176
Please see [pkg/config/config.proto](https://github.com/cockroachdb/cockroach/blob/master/pkg/config/config.proto) for up-to-date data structures used, the best entry point being `message ZoneConfig`.
1178
If zones are modified in situ, each node verifies the
1179
existing zones for its ranges against the zone configuration. If
1180
it discovers differences, it reconfigures ranges in the same way
1181
that it rebalances away from busy nodes, via special-case 1:1
1182
split to a duplicate range comprising the new configuration.
1183
1184
# SQL
1185
1186
Each node in a cluster can accept SQL client connections. CockroachDB
1187
supports the PostgreSQL wire protocol, to enable reuse of native
1188
PostgreSQL client drivers. Connections using SSL and authenticated
1189
using client certificates are supported and even encouraged over
1190
unencrypted (insecure) and password-based connections.
1191
1192
Each connection is associated with a SQL session which holds the
1193
server-side state of the connection. Over the lifespan of a session
1194
the client can send SQL to open/close transactions, issue statements
1195
or queries or configure session parameters, much like with any other
1196
SQL database.
1197
1198
## Language support
1199
1200
CockroachDB also attempts to emulate the flavor of SQL supported by
1201
PostgreSQL, although it also diverges in significant ways:
1202
1203
- CockroachDB exclusively implements MVCC-based consistency for
1204
transactions, and thus only supports SQL's isolation levels SNAPSHOT
1205
and SERIALIZABLE. The other traditional SQL isolation levels are
1206
internally mapped to either SNAPSHOT or SERIALIZABLE.
1207
1208
- CockroachDB implements its own [SQL type system](RFCS/20160203_typing.md)
1209
which only supports a limited form of implicit coercions between
1210
types compared to PostgreSQL. The rationale is to keep the
1211
implementation simple and efficient, capitalizing on the observation
1212
that 1) most SQL code in clients is automatically generated with
1213
coherent typing already and 2) existing SQL code for other databases
1214
will need to be massaged for CockroachDB anyways.
1215
1216
## SQL architecture
1217
1218
Client connections over the network are handled in each node by a
1219
pgwire server process (goroutine). This handles the stream of incoming
1220
commands and sends back responses including query/statement results.
1221
The pgwire server also handles pgwire-level prepared statements,
1222
binding prepared statements to arguments and looking up prepared
1223
statements for execution.
1224
1225
Meanwhile the state of a SQL connection is maintained by a Session
1226
object and a monolithic `planner` object (one per connection) which
1227
coordinates execution between the session, the current SQL transaction
1228
state and the underlying KV store.
1229
1230
Upon receiving a query/statement (either directly or via an execute
1231
command for a previously prepared statement) the pgwire server forwards
1232
the SQL text to the `planner` associated with the connection. The SQL
1233
code is then transformed into a SQL query plan.
1234
The query plan is implemented as a tree of objects which describe the
1235
high-level data operations needed to resolve the query, for example
1236
"join", "index join", "scan", "group", etc.
1237
1238
The query plan objects currently also embed the run-time state needed
1239
for the execution of the query plan. Once the SQL query plan is ready,
1240
methods on these objects then carry the execution out in the fashion
1241
of "generators" in other programming languages: each node *starts* its
1242
children nodes and from that point forward each child node serves as a
1243
*generator* for a stream of result rows, which the parent node can
1244
consume and transform incrementally and present to its own parent node
1245
also as a generator.
1246
1247
The top-level planner consumes the data produced by the top node of
1248
the query plan and returns it to the client via pgwire.
1249
1250
## Data mapping between the SQL model and KV
1251
1252
Every SQL table has a primary key in CockroachDB. (If a table is created
1253
without one, an implicit primary key is provided automatically.)
1254
The table identifier, followed by the value of the primary key for
1255
each row, are encoded as the *prefix* of a key in the underlying KV
1256
store.
1257
1258
Each remaining column or *column family* in the table is then encoded
1259
as a value in the underlying KV store, and the column/family identifier
1260
is appended as *suffix* to the KV key.
1261
1262
For example:
1263
1264
- after table `customers` is created in a database `mydb` with a
1265
primary key column `name` and normal columns `address` and `URL`, the KV pairs
1266
to store the schema would be:
1267
1268
| Key | Values |
1269
| ---------------------------- | ------ |
1270
| `/system/databases/mydb/id` | 51 |
1271
| `/system/tables/customer/id` | 42 |
1272
| `/system/desc/51/42/address` | 69 |
1273
| `/system/desc/51/42/url` | 66 |
1274
1275
(The numeric values on the right are chosen arbitrarily for the
1276
example; the structure of the schema keys on the left is simplified
1277
for the example and subject to change.) Each database/table/column
1278
name is mapped to a spontaneously generated identifier, so as to
1279
simplify renames.
1280
1281
Then for a single row in this table:
1282
1283
| Key | Values |
1284
| ----------------- | -------------------------------- |
1285
| `/51/42/Apple/69` | `1 Infinite Loop, Cupertino, CA` |
1286
| `/51/42/Apple/66` | `http://apple.com/` |
1287
1288
Each key has the table prefix `/51/42` followed by the primary key
1289
prefix `/Apple` followed by the column/family suffix (`/66`,
1290
`/69`). The KV value is directly encoded from the SQL value.
1291
1292
Efficient storage for the keys is guaranteed by the underlying RocksDB engine
1293
by means of prefix compression.
1294
1295
Finally, for SQL indexes, the KV key is formed using the SQL value of the
1296
indexed columns, and the KV value is the KV key prefix of the rest of
1297
the indexed row.
1298
1299
## Distributed SQL
1300
1301
Dist-SQL is a new execution framework being developed as of Q3 2016 with the
1302
goal of distributing the processing of SQL queries.
1303
See the [Distributed SQL
1304
RFC](RFCS/20160421_distributed_sql.md)
1305
for a detailed design of the subsystem; this section will serve as a summary.
1306
1307
Distributing the processing is desirable for multiple reasons:
1308
- Remote-side filtering: when querying for a set of rows that match a filtering
1309
expression, instead of querying all the keys in certain ranges and processing
1310
the filters after receiving the data on the gateway node over the network,
1311
we'd like the filtering expression to be processed by the lease holder or
1312
remote node, saving on network traffic and related processing.
1313
- For statements like `UPDATE .. WHERE` and `DELETE .. WHERE` we want to
1314
perform the query and the updates on the node which has the data (as opposed
1315
to receiving results at the gateway over the network, and then performing the
1316
update or deletion there, which involves additional round-trips).
1317
- Parallelize SQL computation: when significant computation is required, we
1318
want to distribute it to multiple node, so that it scales with the amount of
1319
data involved. This applies to `JOIN`s, aggregation, sorting.
1320
1321
The approach we took was originally inspired by
1322
[Sawzall](https://cloud.google.com/dataflow/model/programming-model) - a
1323
project by Rob Pike et al. at Google that proposes a "shell" (high-level
1324
language interpreter) to ease the exploitation of MapReduce. It provides a
1325
clear separation between "local" processes which process a limited amount of
1326
data and distributed computations, which are abstracted away behind a
1327
restricted set of conceptual constructs.
1328
1329
To run SQL statements in a distributed fashion, we introduce a couple of concepts:
1330
- _logical plan_ - similar on the surface to the `planNode` tree described in
1331
the [SQL](#sql) section, it represents the abstract (non-distributed) data flow
1332
through computation stages.
1333
- _physical plan_ - a physical plan is conceptually a mapping of the _logical
1334
plan_ nodes to CockroachDB nodes. Logical plan nodes are replicated and
1335
specialized depending on the cluster topology. The components of the physical
1336
plan are scheduled and run on the cluster.
1337
1338
## Logical planning
1339
1340
The logical plan is made up of _aggregators_. Each _aggregator_ consumes an
1341
_input stream_ of rows (or multiple streams for joins) and produces an _output
1342
stream_ of rows. Both the input and the output streams have a set schema. The
1343
streams are a logical concept and might not map to a single data stream in the
1344
actual computation. Aggregators will be potentially distributed when converting
1345
the *logical plan* to a *physical plan*; to express what distribution and
1346
parallelization is allowed, an aggregator defines a _grouping_ on the data that
1347
flows through it, expressing which rows need to be processed on the same node
1348
(this mechanism constraints rows matching in a subset of columns to be
1349
processed on the same node). This concept is useful for aggregators that need
1350
to see some set of rows for producing output - e.g. the SQL aggregation
1351
functions. An aggregator with no grouping is a special but important case in
1352
which we are not aggregating multiple pieces of data, but we may be filtering,
1353
transforming, or reordering individual pieces of data.
1354
1355
Special **table reader** aggregators with no inputs are used as data sources; a
1356
table reader can be configured to output only certain columns, as needed.
1357
A special **final** aggregator with no outputs is used for the results of the
1358
query/statement.
1359
1360
To reflect the result ordering that a query has to produce, some aggregators
1361
(`final`, `limit`) are configured with an **ordering requirement** on the input
1362
stream (a list of columns with corresponding ascending/descending
1363
requirements). Some aggregators (like `table readers`) can guarantee a certain
1364
ordering on their output stream, called an **ordering guarantee**. All
1365
aggregators have an associated **ordering characterization** function
1366
`ord(input_order) -> output_order` that maps `input_order` (an ordering
1367
guarantee on the input stream) into `output_order` (an ordering guarantee for
1368
the output stream) - meaning that if the rows in the input stream are ordered
1369
according to `input_order`, then the rows in the output stream will be ordered
1370
according to `output_order`.
1371
1372
The ordering guarantee of the table readers along with the characterization
1373
functions can be used to propagate ordering information across the logical plan.
1374
When there is a mismatch (an aggregator has an ordering requirement that is not
1375
matched by a guarantee), we insert a **sorting aggregator**.
1376
1377
### Types of aggregators
1378
1379
- `TABLE READER` is a special aggregator, with no input stream. It's configured
1380
with spans of a table or index and the schema that it needs to read.
1381
Like every other aggregator, it can be configured with a programmable output
1382
filter.
1383
- `JOIN` performs a join on two streams, with equality constraints between
1384
certain columns. The aggregator is grouped on the columns that are
1385
constrained to be equal.
1386
- `JOIN READER` performs point-lookups for rows with the keys indicated by the
1387
input stream. It can do so by performing (potentially remote) KV reads, or by
1388
setting up remote flows.
1389
- `SET OPERATION` takes several inputs and performs set arithmetic on them
1390
(union, difference).
1391
- `AGGREGATOR` is the one that does "aggregation" in the SQL sense. It groups
1392
rows and computes an aggregate for each group. The group is configured using
1393
the group key. `AGGREGATOR` can be configured with one or more aggregation
1394
functions:
1395
- `SUM`
1396
- `COUNT`
1397
- `COUNT DISTINCT`
1398
- `DISTINCT`
1399
1400
An optional output filter has access to the group key and all the
1401
aggregated values (i.e. it can use even values that are not ultimately
1402
outputted).
1403
- `SORT` sorts the input according to a configurable set of columns.
1404
This is a no-grouping aggregator, hence it can be distributed arbitrarily to
1405
the data producers. This means that it doesn't produce a global ordering,
1406
instead it just guarantees an intra-stream ordering on each physical output
1407
streams). The global ordering, when needed, is achieved by an input
1408
synchronizer of a grouped processor (such as `LIMIT` or `FINAL`).
1409
- `LIMIT` is a single-group aggregator that stops after reading so many input
1410
rows.
1411
- `FINAL` is a single-group aggregator, scheduled on the gateway, that collects
1412
the results of the query. This aggregator will be hooked up to the pgwire
1413
connection to the client.
1414
1415
## Physical planning
1416
1417
Logical plans are transformed into physical plans in a *physical planning
1418
phase*. See the [corresponding
1419
section](RFCS/20160421_distributed_sql.md#from-logical-to-physical) of the Distributed SQL RFC
1420
for details. To summarize, each aggregator is planned as one or more
1421
*processors*, which we distribute starting from the data layout - `TABLE
1422
READER`s have multiple instances, split according to the ranges - each instance
1423
is planned on the lease holder of the relevant range. From that point on,
1424
subsequent processors are generally either colocated with their inputs, or
1425
planned as singletons, usually on the final destination node.
1426
1427
### Processors
1428
1429
When turning a _logical plan_ into a _physical plan_, its nodes are turned into
1430
_processors_. Processors are generally made up of three components:
1431
1432
![Processor](RFCS/images/distributed_sql_processor.png?raw=true "Processor")
1433
1434
1. The *input synchronizer* merges the input streams into a single stream of
1435
data. Types:
1436
* single-input (pass-through)
1437
* unsynchronized: passes rows from all input streams, arbitrarily
1438
interleaved.
1439
* ordered: the input physical streams have an ordering guarantee (namely the
1440
guarantee of the corresponding logical stream); the synchronizer is careful
1441
to interleave the streams so that the merged stream has the same guarantee.
1442
1443
2. The *data processor* core implements the data transformation or aggregation
1444
logic (and in some cases performs KV operations).
1445
1446
3. The *output router* splits the data processor's output to multiple streams;
1447
types:
1448
* single-output (pass-through)
1449
* mirror: every row is sent to all output streams
1450
* hashing: each row goes to a single output stream, chosen according
1451
to a hash function applied on certain elements of the data tuples.
1452
* by range: the router is configured with range information (relating to a
1453
certain table) and is able to send rows to the nodes that are lease holders for
1454
the respective ranges (useful for `JoinReader` nodes (taking index values
1455
to the node responsible for the PK) and `INSERT` (taking new rows to their
1456
lease holder-to-be)).
1457
1458
To illustrate with an example from the Distributed SQL RFC, the query:
1459
```
1460
TABLE Orders (OId INT PRIMARY KEY, CId INT, Value DECIMAL, Date DATE)
1461
1462
SELECT CID, SUM(VALUE) FROM Orders
1463
WHERE DATE > 2015
1464
GROUP BY CID
1465
ORDER BY 1 - SUM(Value)
1466
```
1467
1468
produces the following logical plan:
1469
1470
![Logical plan](RFCS/images/distributed_sql_logical_plan.png?raw=true "Logical Plan")
1471
1472
This logical plan above could be transformed into either one of the following
1473
physical plans:
1474
1475
![Physical plan](RFCS/images/distributed_sql_physical_plan.png?raw=true "Physical Plan")
1476
1477
or
1478
1479
![Alternate physical plan](RFCS/images/distributed_sql_physical_plan_2.png?raw=true "Alternate physical Plan")
1480
1481
1482
## Execution infrastructure
1483
1484
Once a physical plan has been generated, the system needs to divvy it up
1485
between the nodes and send it around for execution. Each node is responsible
1486
for locally scheduling data processors and input synchronizers. Nodes also
1487
communicate with each other for connecting output routers to input
1488
synchronizers through a streaming interface.
1489
1490
### Creating a local plan: the `ScheduleFlows` RPC
1491
1492
Distributed execution starts with the gateway making a request to every node
1493
that's supposed to execute part of the plan asking the node to schedule the
1494
sub-plan(s) it's responsible for (except for "on-the-fly" flows, see design
1495
doc). A node might be responsible for multiple disparate pieces of the overall
1496
DAG - let's call each of them a *flow*. A flow is described by the sequence of
1497
physical plan nodes in it, the connections between them (input synchronizers,
1498
output routers) plus identifiers for the input streams of the top node in the
1499
plan and the output streams of the (possibly multiple) bottom nodes. A node
1500
might be responsible for multiple heterogeneous flows. More commonly, when a
1501
node is the lease holder for multiple ranges from the same table involved in
1502
the query, it will run a `TableReader` configured with all the spans to be
1503
read across all the ranges local to the node.
1504
1505
A node therefore implements a `ScheduleFlows` RPC which takes a set of flows,
1506
sets up the input and output [mailboxes](#mailboxes), creates the local
1507
processors and starts their execution.
1508
1509
### Local scheduling of flows
1510
1511
The simplest way to schedule the different processors locally on a node is
1512
concurrently: each data processor, synchronizer and router runs as a goroutine,
1513
with channels between them. The channels are buffered to synchronize producers
1514
and consumers to a controllable degree.
1515
1516
### Mailboxes
1517
1518
Flows on different nodes communicate with each other over gRPC streams. To
1519
allow the producer and the consumer to start at different times,
1520
`ScheduleFlows` creates named mailboxes for all the input and output streams.
1521
These message boxes will hold some number of tuples in an internal queue until
1522
a gRPC stream is established for transporting them. From that moment on, gRPC
1523
flow control is used to synchronize the producer and consumer. A gRPC stream is
1524
established by the consumer using the `StreamMailbox` RPC, taking a mailbox id
1525
(the same one that's been already used in the flows passed to `ScheduleFlows`).
1526
1527
A diagram of a simple query using mailboxes for its execution:
1528
![Mailboxes](RFCS/images/distributed_sql_mailboxes.png?raw=true)
1529
1530
## A complex example: Daily Promotion
1531
1532
To give a visual intuition of all the concepts presented, we draw the physical plan of a relatively involved query. The
1533
point of the query is to help with a promotion that goes out daily, targeting
1534
customers that have spent over $1000 in the last year. We'll insert into the
1535
`DailyPromotion` table rows representing each such customer and the sum of her
1536
recent orders.
1537
1538
```SQL
1539
TABLE DailyPromotion (
1540
Email TEXT,
1541
Name TEXT,
1542
OrderCount INT
1543
)
1544
1545
TABLE Customers (
1546
CustomerID INT PRIMARY KEY,
1547
Email TEXT,
1548
Name TEXT
1549
)
1550
1551
TABLE Orders (
1552
CustomerID INT,
1553
Date DATETIME,
1554
Value INT,
1555
1556
PRIMARY KEY (CustomerID, Date),
1557
INDEX date (Date)
1558
)
1559
1560
INSERT INTO DailyPromotion
1561
(SELECT c.Email, c.Name, os.OrderCount FROM
1562
Customers AS c
1563
INNER JOIN
1564
(SELECT CustomerID, COUNT(*) as OrderCount FROM Orders
1565
WHERE Date >= '2015-01-01'
1566
GROUP BY CustomerID HAVING SUM(Value) >= 1000) AS os
1567
ON c.CustomerID = os.CustomerID)
1568
```
1569
1570
A possible physical plan:
1571
![Physical plan](RFCS/images/distributed_sql_daily_promotion_physical_plan.png?raw=true)