# Replication

* Replication means keeping a copy of the same data on multiple machines
  - Can help with keeping data geographically closer
  - Allow system to continue working even if some parts have failed
  - Scale out number of machines that can serve read queries
  - In this chapter, we will assume dataset is small that each machine can hold copy of the dataset. This will be relaxed in chapter 6 where we discuss sharding (partitioning) of big datasets
  - If data that we are replicating is same across time, then replication is easy. All the difficulty in replication lies in handling changes to replicated data

* Three popular algorithms for replicating changes:
 - Single-leader
 - Multi-leader
 - Leaderless

### Leaders and followers
 * Each node that stores a copy of the db is called a replica
 * Every write to the db needs to be processed by every replica for consistency
 * Most common solution for this is leader-based replication (also called active/passive or master/slave replication)
   - One of the replicas is designated as leader and others are called followers
   - Whenever leader writes new data to local storage, it also sends data change to all of its followers as part of replication log or change stream
   - Each follower takes the log from leader and updates local copy by applying same order of writes as in original copy
   - When client wants to read from db, it can query either leader of any followers
   - Writes are only permitted on the leader
   - This mode of operation is built-in feature of many relational dbs like PostgreSQL, MySQL. Also used in some non relational dbs like MongoDB, RethinkDB
   - Leader based replication is not restricted to only dbs, but also distributed message brokers like Kafka and RabbitmQ. Some network filesystems and replicated block devices like DRBD are similar

### Synchronous vs Asynchronous Replication
 * Important detail is if replication happens synchronously or asynchronously. In relational dbs, this is often config driven
 * In synchoronous, leader waits until follower 1 has confirmed that it received write before reporting success to user
 * In asynchronous, leader sends message without waiting for follower. There are no guarantees on how long this might take
 * In asynch: if follower is recovering from failure, followers might fall behind leader even by few minutes
 * In sync: Follower is guaranteed to have up-to-date copy consistent with leader. Even if leader fails, we know there is an up-to-date copy with follower. Disadvantage is that if the synch follower does not respond due to failure or other reasons, write cannot be processed as leader will block writes till synch follower is available
 * Therefore all followers cannot be in synch in practice as otherwise whole system can grind to halt
 * In practice, if we enable synch replication on db, it means one of the followers is synch, others are asynch
 * If the synch follower becomes unavailable/slow, one of the asynch followers is made synch follower. This config is called semi-synch
 * Often leader based replication is configured to be asynch. In this case if leader fails and not recoverable, then writes that are not replicated to followers are lost and hence write is not durable. On the other hand, leader can continue processing writes even when all followers fall behind

### Setting Up New Followers
 * We may need to add more followers from time to time to increase number of replicas or to replace failed nodes
 * We cannot simply copy files as clients are constantly writing to db and data is in flux. We could lock the db but then availability suffers
 * Process followed typically is:
  - Take a consistent snapshop of leader's db at some point in time, if possible without locking
  - Copy snapshot to few follower nodes
  - Follower connects to leader and requests changes since snapshot
  - When follower processes backlog of data since snapshot, it has "caught up"

### Handling Node Outages
 * Any node can go down - both unexpected due to fault or expected due to maintenance
 * Being able to reboot individual nodes without downtime is a big advantage for ops
 * Therefore goal is to keep system as a whole running despite individual node failures

#### Follower Failure: Catch-up recovery
 * Each follower keeps a log of data changes received from leader
 * If follower crashes or there are network delays, follower can recover easily from the log

#### Leader Failure: Failover
 * Handling failure of leader node is trickier: one of the followers need to be promoted to become new leader and followers need to be reconfigured to start consuming from new leade. This is called failover
 * Failover can happen manually (admin is notified and they take necessary steps to make new leader) or automatically
 * Automatic failover process:
  - Determining that leader has failed: Since this is tricky, most systems just use timeout
  - Choosing a new leader: Choose node which has most up-to-date data changes from old leader OR based on previously elected "controller" node
  - Reconfiguring system to use new leader: Clients send write requests to new leader. If old leader comes back, system needs to ensure it now becomes a follower and recognises new leader
 * Lot of things can go wrong with failover:
  - If async replication is used, new leader may not have received all writes from old leader
  - Discarding writes may be dangerous, especially if external db needs to be co-ordinated
  - In certain cases, two nodes believe they are the leader (called "split brain")
  - There is no good timeout to declare leader dead
  - There are no easy solutions to these problems. Some operations team therefore prefer to perform failovers manually

### Implementation of Replication Logs
 * Statement based replication: 
   - In the simplest case every write request (statement) that leader executes is  logged and sent to followers. Example: in relational db, every insert, update, delete is forwarded to followers who parse and execute those SQL statments
   - Some issues that may crop up:
   - Any statement that calls non deterministic functions (like current date, random) is likely to generate different value on each replica
   - If statements use autoincrementing column or if they depend on existing data in db (update x where y) then they must be executed in same order on each replica
   - If statements have any side effects (triggers, stored procedures, user defined functions), then those could lead to unpredictable outcomes on each replica
   - Since there are many edge cases, typically other replication methods are preferred
   
 * Write-ahead log (WAL) shipping:
   - As we saw in chapter 3, storage engines usually append writes to a log
   - We can use this log to build replica on another node: besides writing the log to disk, the leader also sends it across the network to its followers
   - When the follower processes this log, it builds a copy of exact same data structures as found on the leader
   - However, the disadvantage is that running different versions of storage format is not feasible (which is required for zero downtime upgrade of db software)
   
 * Logical (row-based) log replication
   - Alternate is to use different log formats for replication and for storage engine, which allows decoupling from storage engine
   - This is called logical log
   - Logical log for relational db is usually a sequence of records with info at row level
   - It is also easier for external applications to parse. This is useful if you want to send contents of db to an external system such as a data warehouse for offline analysis or custom indexes and caches. This technique is called change data capture
   
 * Trigger-based replication
   - Replication techniques above are implemented by db system without involving any application code
   - While this is good for most cases, there are some cases when more flexibility is required
   - Ex: when we want to replicate only a subset of data, or want to replicate from one kind of db to another, or conflict resolution logic
   - Trigger lets you register custom application code that is auto executed when a data change occurs in db

### Problems with Replication Lag
 - Being able to tolerate node failures is just one reason to want replication. Other reasons are scalability (processing more requests than what single machine can handle) and latency (geo location proximity to user)
 - Leader-based replication requires all writes to go through single node, but read only can go to any follower
 - For workloads having mostly reads, we can create many followers and distribute read requests across followers (read-scaling)
 - This works well realistically only with async replication as with sync config entire system could be unavailable for writing if single node fails
 - Even with asynch replication, info might be outdated if follower has fallen behind and can lead to inconsistency in db
 - This inconsistency is temporary as eventually follower will cath up (called eventual consistency)
 - Replication lag is delay happening between write happening on leader reflecting on follower
 - Typically this lag is only fraction of second, however if system is operating near capacity this can go to minutes
 - Three problems that are likley to occur with replication lag
   * Reading Your Own Writes: reading data immediately after writes may have issues due to lag (ex: users viewing what data they have submitted on website)
   * Monotonic reads: If user makes several reads from different replicas, it can happen that user sees things moving backward in time (due to lag). Monotonic reads is guarantee that this does not happen (one way to do this is by making sure user reads from same replica)
   * Consistent Prefix reads: Sequence of events/writes should be maintained. Such guarantee is called consistent prefix reads. This is typically a problem with partitioned dbs (one way to solve this is by making sure all casually related data are written to same partition)

### Multi-Leader Replication
 * Single leader has a major downside: all writes must go through the single leader creating single point of failure
 * Natural extension is to allow more than one node to accept writes
 * Replication happens in same way: each node that processes a write must forward data change to all nodes (multi-leader config or master-master or active/active replication)
 * Each leader simultaneously acts as a follower to other leaders

#### Use-cases for Multi-leader Replication

* Multi-Datacenter operation
 - Consider a db with replicas in several different datacenters (could be for both handling failures+being closer to users)
 - In normal leader-based replication, leader has to be in one of these datacenters and all writes must go through it
 - In multi leader config, we can have a leader in each datacenter 
 - Within each datacenter regular leader-follower replication is used and between datacenters each leader replicates to other leaders
 
* Performance: Since every write can be processed in the local datacenter, the perceived performance may be better
* Tolerance of datacenter outages: Each datacenter can continue operating independently
* Tolerance of network problems: Multi leader with asynch config is better at handling unreliable network issues across data centers
* While multi leader has advantages, it also has big downside: the data may be concurrently modified into two datacenters and those write conflicts must be resolved
* Since multi leader replication is retrofitted feature in many dbs, there are often subtle config pitfalls (autoincrementing keys, triggers can be problematic) and hence multi-leader config is considered dangerous and avoided if possible

#### Clients with offline operation
 * Another use case for multi-leader replication is if application needs to continue to work when disconnected from internet
 * This is a multi leader replication between datacenters where each device becomes a datacenter

#### Collaborative Editing
 * Google docs and other real-time collaborative editing applications allow several people to edit concurrently
 * When one user edits, changes are instantly applied to local replica and async replicated to other users
 * To guarantee there are no editing conficts, application must obtain a lock on the document. In this case, its equivalent to single leader replication
 * However for faster collaboration, we can make unit of change very small (keystroke) and avoid locking - this will require to handle all challenges of multi leader replication including conflict resolution

### Handling Write Conflicts
 * Biggest problem with multi-leader replication is write conflicts and resolution required
 * Example: Wiki page editing done simultaneously by multiple users. Each user's change is applied to local leader, however when changes are asynch replicated conflict is detected (this does not occur in single leader replication)

#### Conflict avoidance
 * Since many implementations of multi-leader replication handle conflicts poorly, its better to avoid conflicts
 * If the application can ensure that all writes for particular record go through same leader, then conflicts cannot occur
 * Example: if user is editing their own data, we can ensure requests from a particular user are always routed to same datacenter and use the leader in that datacenter for reading/writing
 * There may still be an issue when one datacenter fails

#### Converging to consistent state
 * When all changes have been replicated, all replicas should have exact same final value for all fields
 * Ways to achieve this:
   - Give each write a unique ID (timestamp or random variable), pick the write with highest ID and discard other writes. This could lead to data loss
   - Give each replica an ID and let writes originated at higher ID to take precedence (also leads to data loss)
   - Merge the conflicting values together somehow (example: order them alphabetically and concatenate them)
   - Record the conflict in explicit data structure that preserves all information and write application code to resolve them at later time (maybe by prompting the user)

#### Custom Conflict resolution logic
 * As most appropriate way might depend on application, most multi-leader replication tools allow us to write conflict resolution using application code which may be executed on read or write
 * Some approaches for automatic conflict resolution
   - Conflict-free replicated datatypes (CRDTs): family of data structures for sets, maps, ordered lists, counters that can be concurrently edited by multiple users which automatically resolve conflicts
   - Mergeable persistent data structures: track history explicitly, similar to git version control and use a three way merge function (CRDTs use two way merge)
   - Operational transformation is the conflict resolution algo used in Etherpad and Google Docs. It was designed for concurrent editing of ordered list of items, such as the list of characters in document

### Multi-Leader Replication Topologies
  * Replication topology describes communication paths along which writes are propagated from one node to other
  * When there are only two leaders, only one topology exists (each leader sends writes to each other)
  * However, when more than 2 leaders exist, multiple topologies are possible
    - Circular, Star, All-to-all
    - Causal ordering is an issue

### Leaderless Replication
 * Some data storage systems do not use concept of leaders and allow direct writes on any replica
 * Dynamo from Amazon uses leaderless replication
 * In some leaderless implementations, client directly sends writes to several replicas while in others a coordinator node does this

#### Writing to the db when Node is down
 * In leaderless config, failover does not exist
 * Even if one of the replicas did not get successful write (say 2 out of 3 replicas were successful), client simply ignores this
 * Since this replica missed the write, it might return outdated data as response
 * To handle this, client sends read requests to several nodes in parallel and chooses one which is most updated (using versioning)

#### Read repair and anti-entropy
 * After an unavailable node comes back online, we should ensure that its updated with latest writes
 * Two mechamisms used in Dynamo-styled datastores:
   - Read repair: When client makes read from several nodes in parallel, it can detect node with stale data and update it. This works well for frequently read values
   - Anti-entropy process: In addition, some datastores have a background process that constantly looks for differences in data between replicas and copies any missing data

#### Quorums for reading and writing
 * Earlier, we assumed 2 out of 3 as the threshold for marking success
 * More generally, if there are n replicas, every write must be confirmed by w nodes to be considered succesful and we must query atleast r nodes for each read
 * As long as w+r>n, we can expect up to date value while reading because atleast one of the r nodes nodes will be up to date
 * Edge cases:
   - If a sloppy quorum is used, w writes may end up on different nodes than r reads
   - If two writes happen concurrently, it is not clear which one happened first. In this case, need to merge the two
   - If a write happens concurrently with a read, the write may be reflected on only few replicas
   - If a write succeeded on some replicas and failed on others and overall succeeded on less than w replicas, it is not rolled back on replicas where it succeeded
   - If a node carrying a new value fails, and its data is restored from replica with stale value, quorum condition fails
 * Stronger guarantees on avoiding problems with replication lag and quorum need transactions or consensus

#### Monitoring staleness
 * Need to keep proactively measuring staleness of data across the db

#### Sloppy Quorums and Hinted Handoff
 * A network interruption can cut off a client from large number of db nodes. Fewer than w or r reachable nodes might remain, so client cannot reach quorum
 * In a large cluster, client can connect to some nodes to assemble quorum for a value (not necessarily nodes that it needs for quorum)
  - Is it betterto return errors for all requests for which we cannot reach quorum
  - Should we accept writes anyway, and write them to some nodes that are reachable, but not necessarily the "home nodes" for the value
 * The later case is called as sloppy quorum
 * Once the network interruption is fixed any writes that one node temporarily accepted on behalf of other node is sent to the right home node. This is called  hinted handoff
 * Sloppy quorums are useful for increasing write availability. They are optional in all Dynamo implementations
 * In Riak, they are enabled by default. In Cassandra and Voldemort they are disabled by default