<a href="https://colab.research.google.com/github/fbeilstein/dbms/blob/master/DB_lecture_6.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Distributed Transactions**

To maintain order in a distributed system, we have to guarantee at least some consistency. 

We already talked about single-object, single-operation consistency models but in databases we **often need to execute multiple operations atomically**.


Generally speaking, when transferring money from one account to another, you’d like to both credit the first account and debit the second one simultaneously. **Note:** it can be that credit and debit are storen in differeent DBs of parts of one DB. However, if we break down the transaction into individual steps, even debiting or crediting doesn’t look atomic at first sight: we need to read the old balance, add or subtract the required amount, and save this result. Each one of these substeps involves several operations: the node receives a request, parses it, locates the data on disk, makes a write and, finally, acknowledges it. Even this is a rather high-level view: to execute a simple write, we have to perform hundreds of small steps. This means that we have to first execute the transaction and only then make its results visible. 


A **transaction** is a set of operations, an atomic unit of execution. Transaction **atomicity** implies that all its results become visible or none of them do. 

For example, if we modify several rows, or even tables in a single transaction, either all or none of the modifications will be applied. To ensure atomicity, transactions should be **recoverable**. In other words, if the transaction cannot complete, is aborted, or times out, its results have to be rolled back completely. A nonrecoverable, partially executed transaction can leave the database in an inconsistent state. 

In case of unsuccessful transaction execution, the database state has to be reverted to its previous state, as if this transaction was never tried in the first place.



Another important aspect is network partitions and node failures: nodes in the system fail and recover independently, but their states have to remain consistent. This means that the atomicity requirement holds not only for the local operations, but also for operations executed on other nodes: changes have to be durably propagated to all of the nodes involved in the transaction or none of them.



Transactions apply operations to data records starting at some point in time. This gives us some flexibility in terms of scheduling and execution: transactions can be reordered and even retried.


The main focus of transaction processing is to determine permissible histories, to model and represent possible interleaving execution scenarios.  History represents a dependency graph: which transactions have been executed prior to execution of the current transaction. History is said to be serializable if it is equivalent (i.e., has the same dependency graph) to some history that executes these transactions sequentially. 


Single-partition transactions involve the pessimistic (lock-based or tracking) or optimistic (try and validate) concurrency control schemes, but neither one of these approaches solves the problem of multipartition transactions, which require coordination between different servers, distributed commit, and rollback protocols.


**Making Operations Appear Atomic**


To make multiple operations **appear atomic**, especially if some of them are remote, we need to use a class of algorithms called **atomic commitment**. 

Atomic commitment needs **consensus** between the participants: a transaction will not commit if even one of the participants votes against it. 

At the same time, this means that failed processes have to reach the same conclusion as the rest of the cohort. Another important implication of this fact is that atomic commitment algorithms do not work in the presence of Byzantine failures: when the process lies about its state or decides on an
arbitrary value, since it contradicts unanimity.


**The problem** = to be or not to be, commit or do not commit, **cannot change transaction itself**

Database implementers have to decide on:
* When the data is considered ready to commit, and they’re just a pointer swap
away from making the changes public.
* How to perform the commit itself to make transaction results visible in the shortest timeframe possible.
* How to roll back the changes made by the transaction if the algorithm decides
not to commit.

Many distributed systems use atomic commitment algorithms: MySQL (for distributed transactions), Kafka (for producer and consumer interaction),etc.


The **transaction manager** is a subsystem responsible for scheduling, coordinating, executing, and tracking transactions. In a distributed
environment, the transaction manager is responsible for ensuring that node-local visibility guarantees are consistent with the visibility prescribed by distributed atomic operations = transactions commit in all partitions, and for all replicas.

**Two-Phase Commit**

2PC executes in two phases. During the first phase, the decided value is distributed, and votes are collected. During the second phase, nodes just flip the switch, making the results of the first phase visible.


2PC assumes the presence of a **leader** (coordinator) that holds the state, collects votes, and is a primary point of reference for the agreement round. The rest of the nodes are called **cohorts** -- usually partitions that operate over disjoint datasets. 


The coordinator can be 
* node that received a request to execute the transaction
* picked at random
* by leader-election algorithm
* assigned manually
* fixed throughout the lifetime of the system
* transferred to another participant for reliability or performance


**Execution**
* **Prepare** The coordinator notifies cohorts about the new transaction by sending a **propose message**. Cohorts make a decision on whether or not they can commit the part of the transaction that applies to them. Then they send coordinator the vote "commit/abort".
* **Commit/abort** Operations within a transaction can change state across different partitions (each represented by a cohort). If even 1 votes for abort -> abort all message. If all commit -> commit all message.


During each step the coordinator and cohorts have to write the results of each operation to durable storage to be able to reconstruct the state and recover in case of local failures, and be able to forward and replay results for other participants.


In the context of database systems, each 2PC round is usually responsible for a single transaction. During the prepare phase, transaction contents (operations, identifiers, and other metadata) are transferred from the coordinator to the cohorts. The transaction is executed by the cohorts locally and is left in a **partially committed state** (sometimes called **precommitted**), making it ready for the coordinator to finalize execution during the next phase by either committing or aborting it. By the time the transaction commits, its contents are already stored durably on all other nodes.

**Cohort Failures in 2PC**

Cohort fails -> commit should be aborted.

This requirement has a negative impact on availability. Spanner and etc perform 2PC over Paxos groups rather than individual nodes to improve protocol availability.


If one of the cohorts has failed after accepting the proposal, it has to learn about the actual outcome of the vote before it can serve values correctly, since the coordinator might have aborted the commit due to the other cohorts’ decisions. 

cohort -> accept commit -> fail -> recovery -> request decision log from coordinator -> commit or reject


Link failures might lead to message loss, and this wait will continue indefinitely. If the coordinator does not receive a response from the replica during the propose phase, it can trigger a timeout and abort the transaction

**Coordinator Failures in 2PC**


* coordinator made decision, but link to particular node failed -> node requests decision from peers. Replicating commit decisions is safe since it’s always unanimous: the whole point of 2PC is to either commit or abort on all sites, and commit on one cohort implies that all other cohorts have to commit.
* coordinator collects votes and fails -> wait for coordinator recovery or choose new coordinator and revote

Many databases use 2PC: MySQL, PostgreSQL, MongoDB, etc. 


(+) simple (easy to reason about, implement, and debug)

(+) low overhead (message complexity and the number of round-trips of the protocol are low)

(-) needs proper recovery mechanisms

(-) A two-phase commit protocol cannot dependably recover from a failure of **both** the coordinator and a cohort member during the Commit phase. If both the coordinator and a cohort member failed, it is possible that the failed cohort member was the first to be notified, and had actually done the commit.

**Three-Phase Commit**

In a situation where coordinator fails, remaining sites are bound to first select new coordinator. This new coordinator checks status of the protocol from the remaining sites. If the coordinator had decided to commit, at least one of other ‘k’ sites that it informed will be up and will ensure that commit decision is respected. The new coordinator restarts third phase of protocol if any of rest sites knew that old coordinator intended to commit transaction. Otherwise, new coordinator aborts the transaction.


The three-phase commit (3PC) protocol adds an extra step, and timeouts on both sides that can allow cohorts to proceed with either commit or abort in the event of coordinator failure, depending on the system state. 

3PC assumes a synchronous model and that communication failures are not possible.


* **Propose** The coordinator sends out a proposed value and collects the votes.
* **Prepare** The coordinator notifies cohorts about the vote results. If the vote has passed and all cohorts have decided to commit, the coordinator sends a Prepare message, instructing them to prepare to commit. Otherwise, an Abort message is sent and the round completes.
* **Commit** Cohorts are notified by the coordinator to commit the transaction.


Propose phase crash or timeout (coordinator or cohort) -> abort transaction.

Prepare phase crash or timeout (coordinator or cohort) -> abort transaction.

Commit phase crash or timeout (coordinator or cohort) -> commit transaction.

**Coordinator Failures in 3PC**


All state transitions are coordinated, and cohorts can’t move on to the next phase until everyone is done with the previous one: the coordinator has to wait for the replicas to continue. Cohorts can eventually abort the transaction if they do not hear from the coordinator before the timeout, if they didn’t move past the prepare phase.


As we discussed previously, 2PC cannot recover from coordinator failures, and
cohorts may get stuck in a nondeterministic state until the coordinator comes back. 3PC avoids blocking the processes in this case and allows cohorts to proceed with a deterministic decision.


The worst-case scenario for the 3PC is a network partition: some nodes successfully move to the prepared state, and now can proceed with commit after the timeout. Some can’t communicate with the coordinator, and will abort
after the timeout. This results in a split brain: some nodes proceed with a commit and some abort, all according to the protocol, leaving participants in an inconsistent and contradictory state

(+) particularly solves the problem with 2PC blocking

(-) larger message overhead

(-) introduces potential contradictions

(-) does not work well in the presence of network partitions. This might be the primary reason 3PC is not widely used in practice.

**Distributed Transactions with Calvin**



Problem: reduce the total amount of time during which transactions hold locks.

One of the ways to do this is to let replicas agree on the execution order and transaction boundaries before acquiring locks and proceeding with execution - failed node may recover from other participants that execute the same transaction in parallel.  

traditional DB | [Calvin](https://cs.yale.edu/homes/thomson/publications/calvin-sigmod12.pdf)
---|---
two-phase locking or optimistic concurrency control | deterministic order = uses a sequencer (an entry point for all transactions) 
| since all replicas get the same inputs, they also produce equivalent outputs.
no deterministic transaction order | deterministic transaction order
MySQL | FaunaDB

one partition: sequencer -> scheduler -> worker -> storage

**sequencer**
* The sequencer determines the order in which transactions are executed, and
establishes a global transaction input sequence. 
* To minimize contention and batch decisions, the timeline is split into epochs. 
* The sequencer collects transactions and groups them into short time windows = replication units, so transactions do not have to be communicated separately.
* As soon as a transaction batch is successfully replicated, sequencer forwards it to the scheduler, which orchestrates transaction execution.

**scheduler**
* uses a deterministic scheduling protocol that executes parts of transaction in parallel, while preserving the serial execution order specified by the sequencer. 
* applying transaction to a specific state is guaranteed to produce only changes specified by the transaction and transaction order is predetermined, replicas do not have to further communicate with the sequencer.
* each transaction in Calvin has a **read set** and a **write set**. Calvin does not natively support transactions that rely on additional reads that would determine read and write sets.


**worker thread**
* It analyzes the transaction’s read and write sets, determines node-local data
records from the read set, and creates the list of active participants (i.e., ones that hold the elements of the write set, and will perform modifications on the data).
* It collects the local data required to execute the transaction, in other words, the read set records that happen to reside on that node. The collected data records are forwarded to the corresponding active participants.
* If this worker thread is executing on an active participant node, it receives data records forwarded from the other participants, as a counterpart of the operations executed during step 2.
* Finally, it executes a batch of transactions, persisting results into local storage. It does not have to forward execution results to the other nodes, as they receive the same inputs for transactions and execute and persist results locally themselves.


A typical Calvin implementation colocates sequencer, scheduler, worker, and storage subsystems. Conses: Paxos algorithm (next lecture, no time) or leader replica.


**Distributed Transactions with Spanner**


Calvin is often contrasted with another approach for distributed transaction management called Spanner (CockroachDB, YugaByte DB, etc). 

* uses two-phase commit over consensus groups per partition (shard)
* TrueTime to achieve consistency and impose transaction order

TrueTime = a high-precision wall-clock API that also exposes an uncertainty bound, allowing local operations to introduce artificial slowdowns to wait for the uncertainty bound to pass.

Spanner offers three main operation types: 
* **read-write transactions** require locks, pessimistic concurrency control, and presence of the leader replica
* **read-only transactions** lock-free and can be executed at any replica
* **snapshot reads** leader is required for the latest timestamp reads

Reads at the specific timestamp are consistent, since values are versioned and snapshot contents can’t be changed once written. Each data record has a timestamp assigned, which holds a value of the transaction commit time. This also implies that multiple timestamped versions of the record can be stored.


Spanner architecture is too complicated to discuss in details. 

* Each **replica** holds several **tablets**, with **Paxos state machines** attached to them. Replicas are grouped into replica sets called Paxos groups -- a unit of data placement and replication. 
* Each Paxos group has a long-lived **leader**. Leaders communicate with each
other during multishard transactions.
* **Every write** has to go through the Paxos group **leader**
* **Reads** can be served directly from the tablet on up-to-date replicas. 


The leader holds a lock table that is used to implement concurrency control using the two-phase locking mechanism and a transaction manager that is responsible for multishard distributed transactions. 

Operations that require synchronization (such as writes and reads within a transaction) have to acquire the locks from the lock table, while other operations (snapshot reads) can access the data directly.

For multishard transactions, group leaders have to coordinate and perform a twophase commit to ensure consistency, and use two-phase locking to ensure isolation.

Since the 2PC algorithm requires the presence of all participants for a successful commit, it hurts availability. Spanner solves this by using Paxos groups rather than individual nodes as cohorts. This means that 2PC can continue operating even if some of the members of the group are down. 


Single-shard transactions do not have to consult the transaction manager.


Spanner read-write transactions offer a serialization order called external consistency:
* transaction timestamps reflect serialization order, even in cases of distributed transactions. 
* if transaction T1 commits before T2 starts, T1’s timestamp is smaller than the timestamp of T2.


To summarize, Spanner uses Paxos for consistent transaction log replication, twophase commit for cross-shard transactions, and TrueTime for deterministic transaction ordering. This means that multipartition transactions have a higher cost due to an additional two-phase commit round, compared to Calvin.

 Calvin | Spanner
 ---|---
 global consensus | per shard consesus
 deterministic serial ordering | 2 phase commit
 cannot support SQL, noSQL only | SQL
 Fauna DB | YugaByte DB, Cockroach DB
 **read**
 high latency | low latency
 low throughput | high throughput
 **write**
 low latency for distributed transactions | low latency for single shard transactions
 high throughput for concurrent access for the same data | high throughput for concurrent access for random data
 **fault tolerance**
 leader with global consensus | fully decentralized architecture
 failure impacts all data | failure impacts subset of data
 **clock skew**
 no problem | needs true time
 **license**
 proprietary | open-source

**Database Partitioning**


Since storing all database records on a single node is rather unrealistic for the majority of modern applications, many databases use **partitioning**: a logical division of data into smaller manageable segments.



**sharding**: every replica set acts as a single source for a subset (range) of data. Clients (or query coordinators) have to route requests based on the **routing key** to the correct replica.


To use partitions most effectively, they have to be sized, taking the load and value distribution into consideration. 



When nodes are added to or removed from the cluster, the database has to repartition the data to maintain the balance. To ensure consistent movements, we
should relocate the data before we update the cluster metadata and start routing
requests to the new targets. Some databases perform **auto-sharding** and relocate the data using placement algorithms that determine optimal partitioning. These algorithms use information about read, write loads, and amounts of data in each shard.


To find a target node from the routing key, some database systems compute a hash of the key, and use some form of mapping from the hash value to the node ID and taking a remainder of the division by the size of the cluster.

**Consistent Hashing**


*hash(key) mod N* is not good if *N* changes, because a number of data should be relocated.

Consistent hashing = calculate *hash(key) mod M*, where *M >> N* ans each server contains $m_i$ and handles data $m_i > \text{hash}(\text{key}) \mod M > m_{i-1}$.

![img](https://miro.medium.com/max/828/1*YUc0c0oOM-OzQDLHj48yYg.png)

**Distributed Transactions with Percolator**


If serializability is not required by the application, one of the ways to avoid the write anomalies is to use a transactional model called **snapshot isolation** (SI). 


**Snapshot isolation** guarantees that all reads made within the transaction are consistent with a **snapshot** of the database. The **snapshot** contains all values that were committed before the transaction’s start timestamp. If there’s a write-write conflict, only one of them will commit. This characteristic is usually referred to as first committer wins.

* Snapshot isolation prevents **read skew**
* It allows only repeatable reads of committed data.
* Values are consistent, as they’re read from the snapshot at a specific timestamp.
* Conflicting writes are aborted and retried to prevent inconsistencies.

A **read skew** occurs when invariant between two variables is destroyed by concurrent transaction.

$~$| $A_1$ | $A_2$ | $$A_1 + A_2 = 100$$
---|---|---|---
Intial state | 50\$ | 50\$ | True 
$T_1$: read balance | 50\$ | 50\$ | True
$T_2$: move 20\$ from $A_1$ to $A_2$ | 30\$ | 70\$ | True
**T2 writes data when T1 is reading**
$T_1$: read balance | 50\$ | 70\$ | False


**BUT**
* histories under snapshot isolation are not serializable. 
* we can still end up with a **write skew**

A **write skew** occurs when each individual transaction respects the required invariants, but their combination does not satisfy these invariants.

$~$| $A_1$ | $A_2$ | $$A_1 + A_2 > 0$$
---|---|---|---
Intial state | 100\$ | 150\$ | True 
$T_1$: withdraw 200\$ from $A_1$ | -100\$ | 150\$ | True
$T_2$: withdraw 200\$ from $A_2$ | 100\$ | -50\$ | True
**transactions write data concurrently**
| -100\$ | -50\$ | False


Snapshot isolation provides semantics that can be useful for many applications and has the major advantage of efficient reads, because no locks have to be acquired since snapshot data cannot be changed.


**Percolator** is a library that implements a transactional API on top of the distributed database **Bigtable** (Wide Column Store). 


This is a great example of
building a transaction API on top of the existing system. Percolator stores data
records, committed data point locations (write metadata), and locks in different col‐
umns. To avoid race conditions and reliably lock tables in a single RPC call, it uses a
conditional mutation Bigtable API that allows it to perform read-modify-write operations with a single remote call.
Each transaction has to consult the timestamp oracle (a source of cluster wide consistent monotonically increasing timestamps) twice: for a transaction start timestamp, and during commit. 

* Writes are buffered and committed using a client-driven two-phase commit.


* Initial state. After the execution of the previous transaction, 1 is the latest timestamp for both accounts. No locks are held.

Account | Timestamp | Data | Locks | Metadata
--|--|--|---|---|
1 | 1 | 100\$ | - | timestamp 0 was the latest
2 | 1 | 200\$ | - | timestamp 0 was the latest

* **Prewrite**. The transaction attempts to acquire locks for **all** cells written during the transaction. One of the locks is marked as primary and
is used for client recovery. If any conflict is detected, the transaction
aborts.

Account | Timestamp | Data | Locks | Metadata
--|--|--|---|---|
1 | 1 | 100\$ | Primary | timestamp 0 was the latest
2 | 1 | 200\$ | Primary at A1 | timestamp 0 was the latest


* If all locks were successfully acquired and the possibility of conflict is ruled out, the transaction can continue. During the second phase, the client releases its locks, starting with the primary one. It publishes its write by replacing the lock with a write record, updating write metadata with the timestamp of the latest data point.

Account | Timestamp | Data | Locks | Metadata
--|--|--|---|---|
1 | 2 | 150\$ | - | timestamp 1 was the latest
2 | 2 | 150\$ | - | timestamp 1 was the latest


Only one transaction can hold a lock at a time and all state transitions are
atomic, so situations in which two transactions attempt to perform operations on the contents are not possible.


One of the examples of databases based on the Percolator model is TiDB (“Ti” stands for Titatium). TiDB is a strongly consistent, highly available, and horizontally scalable open source database, compatible with MySQL.

**Coordination Avoidance**


**Coordination** can be avoided, while preserving data integrity constraints, if operations are **invariant confluent**. Invariant Confluence (IConfluence) is defined as a property that ensures that two invariant-valid but diverged database states **can be merged** into a single valid, final state. Invariants in this case preserve consistency in ACID terms.


Because any two valid states can be merged into a valid state, **I-Confluent** operations can be executed without additional coordination, which significantly improves performance characteristics and scalability potential.


Transactions are executed against the local database versions (snapshots). If a transaction requires any state from other partitions for execution, this state is made available for it locally. If a transaction commits, resulting changes made to the local snapshot are migrated and merged with the snapshots on the other nodes. 

A system model that allows coordination avoidance has to guarantee the following properties:
* **Global validity**
Required invariants are always satisfied, for both merged and divergent committed database states, and transactions cannot observe invalid states.
* **Availability**
If all nodes holding states are reachable by the client, the transaction has to reach a commit decision, or abort, if committing it would violate one of the transaction invariants.
* **Convergence**
Nodes can maintain their local states independently, but in the absence of further transactions and indefinite network partitions, they have to be able to reach the same state.
* **Coordination freedom**
Local transaction execution is independent from the operations against the local
states performed on behalf of the other nodes.


One of the examples of implementing coordination avoidance is [Read-Atomic Multi
Partition (RAMP)](https://people.eecs.berkeley.edu/~alig/papers/ramp.pdf) transactions. RAMP uses multiversion concurrency control and metadata of current in-flight operations to fetch any missing state updates from other nodes, allowing read and write operations to be executed concurrently. For example, readers that overlap with some writer modifying the same entry can be detected and, if necessary, repaired by retrieving required information from
the in-flight write metadata in an additional round of communication.


Using lock-based approaches in a distributed environment might be not the best idea, and instead of doing that, RAMP provides two properties:
* **Synchronization independence**
One client’s transactions won’t stall, abort, or force the other client’s transactions to wait.
* **Partition independence**
Clients do not have to contact partitions whose values aren’t involved in their
transactions.

RAMP introduces the read **atomic isolation level**: all or none transaction updates are visible to concurrent transactions (**fractured reads**: when a transaction observes only a subset of writes executed by some other transaction).

RAMP offers atomic write visibility **without** requiring mutual exclusion = transactions can proceed without stalling each other. RAMP distributes transaction metadata that allows reads to detect concurrent inflight writes. By using this metadata, transactions can detect the presence of newer record versions, find and fetch the latest ones, and operate on them. 

To avoid coordination, all local commit decisions must also be valid globally. In RAMP, this is solved by requiring that, by the time a write becomes visible in one partition, writes from the same transaction in all other involved partitions are also visible for readers in those partitions.


To allow readers and writers to proceed without blocking other concurrent readers and writers, while maintaining the read atomic isolation level both locally and system-wide (in all other partitions modified by the committing transaction), writes in RAMP are installed and made visible using **two-phase commit**.

RAMP allows multiple versions of the same record to be present at any given
moment: 
* latest value
* in-flight uncommitted changes
* stale versions, overwritten by later transactions (as soon as all concurrent readers complete, stale values can be discarded)
