Permalink
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
464 lines (353 sloc) 20.9 KB
--------------------------------------------------------------------------------
EVENTQL - SEGMENT BASED REPLICATION
--------------------------------------------------------------------------------
v0.1 - September, 2016 Paul Asmuth <paul@eventql.io>
Table of Contents
1. Design Overview
2. Durability Considerations
3. Implementation Details
3.1 Compaction
3.2 Partition Leader
3.3 Segment Merge and Fast-Forward
3.4 Replication Order
3.5 Replication Procedure
3.6 ABA Considerations
3.7 Changes to the Server Join Operation
3.8 Changes to the Partition Split Operation
4. Affected Subsystems
4.1 Binary Protocol Additions
4.2 Changes to the Query Subsystem
5. Rollout and Backwards Compatibility
6. Alternatives Considered
6.1 Row-based replication
6.2 Operation-log-based replication
7. Code Locations
1. Design Overview
EventQL stores data as rows in tables. Each table is split into a list of
partitions. Each partition contains a subset of the rows in the table. The
partition itself is stored on disk as a list of segment files. A segment file
in turn is an immutable data file containing one or more rows.
These segment files form an abstract log structured merge tree: when rows are
inserted or updated in a partition, the updates are first buffered in memory
and after a threshold is reached, written out to disk as a new segment file.
Now, the smallest unit of data that is replicated in EventQL are these segment
files.
Replication in EventQL is push-based, which means that a server that stores
a segment file is resposible for making sure that all other servers which should
store the segment file also have it.
On a very high level the replication algorithm is thus very simple: Every time
a server writes out a new segment file for a partition, it adds the segment file
into a queue to be pushed out to every other server that should have it. On
receiving a segment file, a server adds it to it's local log structured merge
tree.
If everything runs smoothly, all the EventQL replication does is to copy files
in the 1-500MB range between servers. This is pretty much as cheap and efficient
as it could get: we can easily saturate available IO and network bandwith with
just using a minimal amount of CPU ressources. Also, the algorithm transfers
only the minimal required amount (neglecting parity-based schemes) of data in
the steady operation case. I.e each row is copied exactly N-1 times between
servers where N is the replication factor.
2. Durability Considerations
The one thing we have to tradeoff for the performance gains that segment based
replication gives us is a bit of durability. Previously, with row-based
replication any insert would be written out to N different nodes as soon as
it was received by the cluster.
With segment based replication, each write is initially only durably stored
on one server in the cluster and only pushed out after a compaction timeout
of usually around 1 minute.
The implication of this is that with row based replication, as soon as a write
is confirmed we can be sure that it will never be rolled back unless three
machines simultaneously die with an unrecoverable hard drive. With segment
based replication however, when a single machine dies with an unrecoverable
hard drive error, up to 1 minute (or whatever the compaction timeout plus
replication delay is) of recent writes may be lost.
Given that most transactional databases (like MySQL or Postgres) do not give
any stronger guarantees than this and EventQL is specfically intended for
analytical usecases, we consider it a fair tradeoff.
3. Implementation Details
Each server keeps a list of local segments for each partition. The list contains,
for each segment file:
- segment_id -- the segment file id
- base_segment_id -- the id of the segment file on which this file is based
- included_segment_id -- a list of segment ids included in this segment
- acked_servers -- the list of servers that have acknowledged the segment file
- is_major -- is this a major segment (false by default)
Additionally, a server keeps for each partition that it stores a "root segment
id". Every time a server creates a new segment file, it writes it out to disk,
adds a new entry to the segment list and sets the base_segment_id of the new
entry to the current root segment id. Then it sets the current root segment id
to the new segment id and enqueues the new segment for replication.
The initial root segment id is "NULL". This first segment created thus has
a base_segment_id of NULL. Also, the first segment (based on NULL) must have
the is_major flag set.
3.1 Compaction
When compacting a partition (i.e. folding the list of minor segments into
a new major segment) the server writes out a new segment file. The new segment
file has the base_segment_id set to the id of the most recent minor segment
included in the new segment. Additionaly, when compacting a table, a list of
up to MAX_SEGMENT_HISTORY included segment ids (i.e. the ids of previous segments
that have been compacted into the new segment) might be added to the new segment
entry.
Note that the old segments do not get deleted immediately after compaction, but
are later deleted in the replication procedure.
Only the leader of a partition performs compaction.
3.2 Partition Leader
To ensure that replication will always be fast-forward in the steady case we
elect a "leader" for each partition using the coordination service. The exact
leader election algrithm should be pluggable and is not discussed here.
Suffice to say that it should always name exactly one of the replicas as the
leader. Still, even if for a short period of time two nodes consider themselves
the leader for a given partition, no corruption will occur. It will merely
result in a lot of unnecessary merges.
At any time, only the leader of a given partition accepts writes for that
partition. This implies that the leader is also the only server that produces
new segments for the partition. However any other replicat of the partition
can become the leader at any time.
JOINING servers are not eligible to be elected as leaders.
3.3 Segment Merge and Fast-Forward
If we take a step back and look at the problem a different way, we are trying
to create a single, consistent chain of segments. Each segment is immutable
but multiple servers are involved and we do not rely on the leader lock to be
perfect, so it is possible that the chain "splits". I.e. two hosts compute
independent, conflicting segments based on the same base segment.
The main idea behind the whole segment based replication design is this: We can
assing a unique id to each segment and then store, with each segment, the id
of the last segment this one was based on. This way, when receving a segment
from another node, we can easily tell if the chain split / we have a conflict
or not.
Based on that, we have two different procedures to handle incoming data.
In the steady case, where there is only one leader creating new segments and we
have a perfect, unoforked chain, each new segment that any node sees will always
be based on the last one it has seen. In this case, we can simply store the new
segment on disk, add it to our local segment list and change the root segment id
to the id of the new segment without ever looking at the data. This is extremely
cheap. We could potentially add millions of rows with a metadata operation
(adding the segment entry) that takes milliseconds. We will call this operation
"fast forward" from now on.
However, when we receive a segment that is not based on the latest segment we
have seen, we cannot fast forward it. Note that this case does not only happen
when there was a fork in the chain, but can also happen if a node has been
offline for a sufficiently long time. In this case, we will perform another
operation that we wil call "merge".
The merge operation simply reads in the segment and inserts every row into the
local partition like it was a new write. N.B. that due to the way primary keys
in EventQL work, repeated writes with the exact same value are idempotent, i.e.
regardless of how often you write a given (exact same) row the result is always
the same.
Another sidenote is that the "merge" operation is pretty much exactly what
we previously did with row based replication for _every_ row and a lot of other
products still do. So another way to look at segment based replication is that
it allows us to "fast-forward" pretty much all of the work in replication in the
steady case.
3.4 Replication Order
With respect to a given source and target server combination it is guaranteed
that the segments will be sent in sequential order. I.e. they will be sent in
the order in which they were added to the segments list and no segment will be
sent until it's predecessor segment has been acknowledged.
3.5 Replication Procedure
Note that this replication procedure only applies to partitions in the LOADING
or LIVE states. Partitions in the UNLOAD stage have a special replication
procedure (see below)
We define the following helper procedures:
is_referenced(partition P, segment S):
- set the next_id variable to the root segment id
- iterate over all segments in the list from new to old as S:
- Check if the segment id of S equals next_id
- If yes
- Check if the included_segment_id list of S includes the search id
- If yes, return true
- Check if the segment id of S equals our search id:
- If yes, return true
- If no, set next_id to the base_segment_id of S and continue
- If no, return false
- if we completed the loop without a match, return false
is_reference_weak(partition P, segment S):
- set the next_id variable to the root segment id
- iterate over all segments in the list from new to old as S:
- Check if the segment id of S equals next_id
- If yes
- Check if the included_segment_id list of S includes the search id
- If yes, return true
- Check if the segment id of S equals our search id:
- If yes, return false
- If no
- Check if the segment S is major
- If yes, return true
- If no, set next_id to the base_segment_id of S and continue
- If no, return true
- if we completed the loop without a match, return true
If we are the leader for a given partition, execute this replication procedure:
replicate_partition(P):
- For each segment file S in the partition P, from oldest to newest
- Check if is_referenced(P, S);
- If yes, check if is_reference_weak(P, S)
- If yes, delete the segment
- If no, merge the segment into the current partition and then delete it
replicate_partition_to(P, R)
- For each replica R of the partition that is not ourselves:
- Iterate the segments list as S from old to new, starting at the most
recent major segment referenced by the root segment id
- If the replica R is not included in the acked_servers list for
segment S
- Offer the segment S to replica R
- If the offer was declined with SEGMENT_EXISTS, add R to the
acked_servers list for S
- If the offer was declined with OVERLOADED, INVALID or INFLIGHT
abort and retry later
- If the offer was declined with OUT_OF_ORDER, remove the replica R
from the acknowledged_server list and restart replication
- If the offer was accepted, add R to the acked_servers list
for S
- If an error occurs, abort and retry later
If we are a follower for a given partition, execute this replication procedure:
- For each segment file S in the partition
- Check if partition leader is in the acked_servers list for S
- If yes, check if is_referenced(P, S):
- If yes, check if is_reference_weak(P, S)
- If yes, delete the segment
- If no, push the segment to the partition leader
- If the offer was declined with SEGMENT_EXISTS, add R to the
acked_servers list for S
- If the offer was declined with OVERLOADED, INVALID or INFLIGHT
abort and retry later
- If the offer was accepted, add R to the acked_servers list for S
- If an error occurs, abort and retry later
Upon receiving a segment from another node, execute this procedure:
- Check if we are already receiving more than MAX_REPL_INCOMING_SEGMENTS
segments simultaneously
- If yes, decline with OVERLOADED and abort
- Check if we are simultaneously receiving a segment with the same ID
- If yes, decline with INFLIGHT and abort
- Check if we are the leader for this partition
- Check if the partition state is LOADING and the root segment id is NULL
- If yes, accept and fast-forward-add the segment
- If no, check if is_referenced(P, S)
- If yes, decline with EXISTS
- If no, accept and merge the segment into the partition
- If no, check if the sender is the leader for this partition
- if yes, check if the segments base_segment_id equals the local root
segment id
- If yes
- Accept and fast-forward-add the segment to the current partition
- Add the leader to the acked_servers for the new segment entry
- If no, check if is_referenced(P, S)
- If yes, decline with EXISTS
- If no, check if the segment is a major segment
- If yes
- Accept and fast-forward-add the segment
- Add the leader to the acked_servers for the new segment entry
- If no, decline with OUT_OF_ORDER
- If no, decline with INVALID
FIXME: how to handle in-memory data in follower? (we don't accept writes
as the follower, but when we loose our leader status this could happen)
3.6 ABA Considerations
One ABA scenario occurs when a follower recives a major segment followed by
a minor segment, then another major segment and then another minor segment
that is based on the previous minor segment. From the senders point of view,
the previous minor segment has been acknowledged. From the receivers point of
view the minor segment is not based on the latest segment. This is handled by
responding with an out of order error code.
Another ABA scenario occurs in this case: Say replica R is leader for partition
P. Now all other replicas have pushed all their data to R. Now, R is unsassigned
from the partition by the master and another replica takes over as leader. Later,
R is re-assigned to the partition and becomes leader again, starting out with a
an empty partition. Now, all other replicas will accept the empty partition from
R and delete their local data because they think it was already acknowledged by
R. This is prevented by assinging a placement id to each replica+partition
combination that is unique on each new assignment. The placement id is what we
actually store in the assigned_servers list.
3.7 Changes to the Server Join Operation
The server join operation remains largely unchanged. Joining nodes do not
participate in leader election but are otherwise considered normal followers.
Once a partition leader has sent the latest segment to a joining node, it will
initiate a JOIN_FINALIZE metadata operation.
3.8 Changes to the Partition Split Operation
The partition split operation complicates things a bit. The problem is that
in the event of a leader failover of the splitting partition, we do not want
to re-start the partition split from scratch. Also we have to ensure that a
replica that is offline during the partition split and comes online later does
not overwrite the new (splitted) partitions with it's old data.
Forgetting about these issues for a second, the split procedure operation is
remains conceptually unchanged. Splits are only initiated by the leader. Once
a partition starts splitting the partition leaders for all target partitions
are added to the follower list with a keyrange restriction.
Due to the general replication procedure, only the leader of a given partition
will actually push data to the new partitions. It will rewrite each segment
file to exclude all records that do not matchthe target keyrange, but otherwise
leave the segment unchanged, i.e. not change it's id or base_segment_id.
If, for example, our partition P has segments A, B and C and is splitting into
partitions P1 and P2, both new partitions P1 and P2 will also have three
segmen files A', B' and C' with the same segment ids and base_segment_ids as
the original A, B and C segment files.
Once all new partition leaders have confirmed the segment, the leader performs
the SPLIT_FINALIZE operation.
Since the leader partition will still accept writes while it's splitting, with
a large enough arrival rate it's impossible to make sure all records are pushed
out to the new leaders before marking the partition split as done.
Also, there is the special case of async splits: We allow asynchronous splits
to be enabled for ta able. With async splits, the partition split immediately
finalizes once it was initiated (before the old data is copied out to the new
partitions).
This complicates the end of the partition lifecycle. We can't just drop all
partition replicas once the split is complete for the reasons above. Instead,
once a partition is in the "UNLOAD" stage of its lifecycle (i.e. it has split
into another partition), we don't accept any writes anymore so there is also
no need to elect a leader anymore. In this stage, all replicas are equal and
perform this special replication procedure:
- For each segment file S in the partition
- For each split target replica R
- Check if R is in the acked_servers list for S
- If no, push the segment to R
- If the offer was declined with SEGMENT_EXISTS, add R to the
acked_servers list for S
- If the offer was declined with OVERLOADED or INVALID, abort and
retry later
- If the offer was accepted, add R to the acked_servers list for S
- If an error occurs, abort and retry later
- At this point we ensured all segments have been acked by all new targets
and the partition may be deleted (moves to the PARTITION_UNLOAD_FINAL stage)
4. Affected Subsystems
Besides the replication subsystem which needs to be re-written in large parts
this change will also affect the query, insert and compaction subsystems.
4.1 Binary Protocol Additions
We add the following new opcodes to the binary protocol:
SEGMENT_OFFER
Offer a segment from one server to another, reponse is SEGMENT_ACCEPT or
SEGMENT_DECLINE
SEGMENT_ACCEPT
Tell the offering node to start transmitting the segment.
SEGMENT_DECLINE
Tell the offering node that the segment should not be transmitted at this
time. Valid reasons are SEGMENT_EXISTS, OVERLOADED, INVALID, INFLIGHT and
OUT_OF_ORDER
SEGMENT_TRANSIT
In response to a SEGMENT_ACCEPT operation, the offering node will send
one or more SEGMENT_TRANSIT frames containing the segment data.
SEGMENT_ACK
The receiving server confirms each SEGMENT_TRANSIT op with a SEGMENT_ACK.
4.2 Changes to the Query Subsystem
FIXME
5. Rollout and Backwards Compatibility
FIXME
6. Alternatives Considered
6.1 Row-based replication
The problem with the old row-based replication scheme was that it had to run
through the regular partition insert code for each row individually at least
6 times (assuming a replication level of 3). This was a considerable overhead:
The row only gets stored on disk 3 out of those 6 times (and rejected on
the other 3 inserts), but just checking if the row should be accepted or
rejected is a fairly expensive operation and, depending on cache contents,
might require multiple disk roundtrips.
6.2 Operation-log-based replication
The major issue with an operation log based approach is that that we would
have to duplicate data (we would need to store the actual columnar data files
plus an operation log). In the worst case this would increase our disk
footprint by more than 100% because the operation log is not columnar.
Of course, we could delete the operation log once we have pushed it out to all
servers. However, what do we do once a new server joins or a partition splits?
We would have to create a synthetic operation log from the cstable files which
would be expensive.
Additionally, any operation-log-based approach would incur the same overheads
as the row based replication.
7. Code Locations
FIXME