This document describes the implementation of multi-row transactions in Kudu. An initial design document was written to provide Kudu-related context and explore possible implementations. That can be found here.
Last updated:
-
1.15.0: experimental,
INSERT
andINSERT_IGNORE
ops only. To enable, set--txn_manager_enabled
on the Kudu master and--enable_txn_system_client_init
on Kudu tablet servers, each with--unlock_experimental_flags
. By default, transactions require at least three tablet servers to function to host a system table, but this can be adjusted by setting--txn_manager_status_table_num_replicas=1
on the master.
Some new terminology added with the introduction of transactions:
Transaction participant: a tablet that has had ops sent to it within a transaction. “Participant” refers to the logical tablet, in the same way that “tablet” refers to the logical partition of a table. “Leader participant” and “follower participant” refer to the leader and follower replicas of a tablet that is a participant in a transaction. A transaction may have many participants.
Transaction status table: a distributed table with a logical schema of (txn_id
, commit_state
,
start_timestamp
, last_update_timestamp
, commit_timestamp
, list_of_participant_tablet_ids
).
This table is used to keep track of what transactions are in flight.
TxnStatusManager
: an entity that lives in-memory alongside each tablet replica of the
transaction status table. It is in charge of actually writing to the transaction status tablets, and
transitioning the in-memory transaction states through the lifecycle of each transaction.
TxnManager
: a proxy between the Kudu client and the TxnStatusManagers
so the user client
doesn’t have to interact directly with the transaction status table internals (e.g. opening the
table, getting the table locations, etc).
TxnSystemClient
: encapsulates all the logic required for finding and sending RPCs to the
transaction status table, and to participants. This is used by the TxnManager
, the transaction
status manager, and transaction participants.
Transaction handle: a public-facing, client-side object that interacts with
the TxnManager
to start, commit, abort, and heartbeat to the TxnStatusManagers
.
Before delving into the details of each component, below is a depiction of the transactional write path. It is assumed that the transaction status table has the default replication factor (RF=3).
-
A Kudu client sends a request to the
TxnManager
to begin a transaction. -
The
TxnManager
has cached the highest transaction ID seen by a transaction so far (though this could also be heartbeated around), and sends a request to the appropriate leaderTxnStatusManager
to begin a transaction with the next transaction ID. -
If the transaction ID is accepted by the transaction status tablet (i.e. it’s of an acceptable hash and range), a new status record is inserted to it.
-
Upon successfully persisting the new status record in the transaction status table, the
TxnStatusManager
returns a response to theTxnManager
, who now knows of a new highest transaction ID. -
The
TxnManager
returns to the Kudu client with the transaction ID. -
The Kudu client sends writes directly to the tablet servers, but with a transaction ID associated with every WriteRequestPB. It’s first checked whether or not the transaction participant has been registered with the
TxnStatusManager
. -
If the transaction ID hasn’t been registered with this participant, the participant uses the transaction system client to register itself with the appropriate
TxnStatusManager
. TheTxnStatusManager
writes a participant record that includes the participant’s tablet ID. -
The transaction participant leader replicates the client write to its followers.
-
The transaction participant returns to the Kudu client with success.
-
When the user wants to commit the transaction, the Kudu client sends a request to the
TxnManager
to commit the transaction. -
The
TxnManager
sends a request to theTxnStatusManager
to commit the transaction. -
The
TxnStatusManager
updates in-memory state to block further participants from registering, and replicates an update to the status record to indicate the transaction has begun committing. -
The
TxnStatusManager
returns an ack to theTxnManager
, which is returned to the client. -
Asynchronously, the
TxnStatusManager
sends requests to all registered participants to begin committing. -
Each participant replicates this request, indicating that they will no longer accept new writes requests for this transaction. Each participant returns to the
TxnStatusManager
the timestamp replicated for this record, which is guaranteed to be higher than all ops on this participant for this transaction. -
The
TxnStatusManager
replicates the commit timestamp in the transaction status record. Past this step, the transaction can no longer be aborted. -
The
TxnStatusManager
sends requests to finalize the commit on all participants. -
This request to finalize the commit is replicated to the replicas of the participants; upon applying the commit, all state for the transaction is made visible on the participant.
-
The
TxnStatusManager
replicates a record that the commit is complete.
The TxnManagers
are the clients’ first point of contact when operating on transactions.
TxnManagers
are entities that currently live on the Kudu master nodes and serve as proxies to the
transaction status table, a distributed system table that stores metadata about every transaction in
the cluster.
TxnManagers
are mostly stateless and mostly serve to send requests that update the transaction
status table, with the exception that TxnManagers
keep track of the highest transaction ID seen so
far, allowing it to make requests to start transactions with a specific transaction ID. In doing so,
users do not have to supply a transaction ID when starting a new transaction, and partitions of the
transaction status table do not need to coordinate among themselves to determine the next
transaction ID.
The TxnManager
service exposes the following RPC endpoints to clients:
-
BeginTransaction() ⇒ { txn_id, keepalive_millis }
: Starts a transaction in theOPEN
state -
CommitTransaction(txn_id) ⇒ {}
: Transitions a transaction fromOPEN
toCOMMIT_IN_PROGRESS
-
AbortTransaction(txn_id) ⇒ {}
: Transitions a transaction fromOPEN
orCOMMIT_IN_PROGRESS
toABORT_IN_PROGRESS
-
GetTransactionState(txn_id) ⇒ { state }
: Returns the current state of a transaction -
KeepTransactionAlive(txn_id) ⇒ {}
: Signals to theTxnStatusManager
that a transaction should be kept alive
Each endpoint corresponds to a user-facing client API in the C++ and Java clients, with the exception of the keep-alive endpoint, as keep-alive heartbeating is handled automatically by transaction handles.
Under the hood, the TxnManager
wraps a KuduClient as the TxnSystemClient
, which it uses to
lazily create, open, alter the transaction status table, as well as send requests to its partitions.
By default, the table is created only once the first BeginTransaction()
request is received. The
"lazy" initialization of the transactions status table can be controlled via the
--txn_manager_lazily_initialized
master flag; when disabled, the TxnManagers
will attempt to
create the table periodically upon starting up.
The transaction status table is currently partitioned by range and starts off with a single range.
As new transactions are added and the partitions are filled up, new ranges are added to the table
automatically by the TxnManagers
. The size of these partitions can be controlled via the
--txn_manager_status_table_range_partition_span
master flag.
Requests to a given transaction status table partition are received and managed by the
TxnStatusManager
, which keeps track of the transactions and participants stored in the tablet.
Tablet replicas of tables of the TXN_STATUS_TABLE type are expected to have a specific schema:
CREATE TABLE kudu_system.kudu_transactions (
txn_id INT64,
entry_type INT8,
identifier STRING,
metadata STRING,
PRIMARY KEY (txn_id, entry_type, identifier)
) PARTITION BY RANGE (txn_id)
(
PARTITION 0 <= VALUES < 1000000
)
This schema allows the tablet replica to initialize a TxnStatusManager
by reading its records. It
maintains an in-memory map of transaction IDs to transaction metadata, allowing it to serve the
current state or update the state. The primary role of the TxnStatusManager
is to manage the
lifecycle of a transaction, transitioning it from state to state with the following allowed state
transitions.
BeginCommit FinalizeCommit CompleteCommit OPEN --> COMMIT_IN_PROGRESS --> FINALIZE_IN_PROGRESS --> COMMITTED
BeginCommit BeginAbort FinalizeAbort OPEN --> COMMIT_IN_PROGRESS --> ABORT_IN_PROGRESS --> ABORTED
AbortTxn FinalizeAbort OPEN --> ABORT_IN_PROGRESS --> ABORTED
The creation of an OPEN
transaction, and the transitions to either COMMIT_IN_PROGRESS
or
ABORT_IN_PROGRESS
are initiated by users and are synchronous. All other transitions are performed
automatically by the TxnStatusManager
background tasks, and their completion can be monitored by
clients by getting the transaction’s status.
To update a given transaction, the TxnStatusManager
writes the changes to disk, but does not yet
commit the changes to in-memory state. Once the update is replicated and persisted to the
transaction status tablet’s replicas, the transaction state transition is committed and made visible
to users.
Much like the CatalogManager, only the leader TxnStatusManager
is allowed to perform operations.
This is accomplished by taking a shared lock on in-memory Raft term that is set when a
TxnStatusManager
becomes leader of a term. If a TxnStatusManager
receives a request, it checks
to see if the current term is the same as the term set upon last becoming leader — if the term does
not match, the request is rejected, signalling leadership has changed.
In the event of a network partition where a leader has become stale but still thinks it is leader, updates to the transactions status table are protected by the underlying tablet’s write path: the attempt to write to the table will be blocked by the Raft protocol, and an error will be returned, signaling leadership has changed.
To ensure consistency of in-memory state across term changes, once a TxnStatusManager
is elected
leader, it reads the contents of the tablet, regenerating the in-memory state of all transactions.
The transitioning from COMMIT_IN_PROGRESS
or ABORT_IN_PROGRESS
to their corresponding terminal
states, as well as orchestrating the appropriate participant RPCs, is managed by a set of tasks per
transaction. A similar pattern is used for each transition, so only the transition from
COMMIT_IN_PROGRESS
to COMMITTED
is described below.
-
Once the
TxnStatusManager
sets a transaction toCOMMIT_IN_PROGRESS
, it kicks off an asynchronous RPC to each participant to begin committing. -
Upon returning, the last returned RPC writes the
FINALIZE_IN_PROGRESS
record to theTxnStatusManager
. -
Once written, the
TxnStatusManager
kicks off an asynchronous RPC to each participant to finalize the commit. -
Upon returning, the last returned RPC writes the
COMMITTED
record to theTxnStatusManager
.
Since only the leader TxnStatusManager
is meant to be updating transaction state, in the above
sequence, once returning from doing IO, or from waiting, the TxnStatusManager
checks to ensure
it’s still the leader. If not, it stops the task. Additionally, once a TxnStatusManager
becomes
leader, as it reads the existing transaction states from disk, it begins tasks for any transaction
that is in a non OPEN
, non-terminal (i.e. not COMMITTED
, not ABORTED
) state.
Clients send heartbeat messages to a TxnStatusManager
in order to let it know that a transaction
is not abandoned. Otherwise, the leader TxnStatusManager
automatically aborts abandoned
transactions. The leader TxnStatusManager
keeps track of the last heartbeat for each transaction
in-memory only, making heartbeating a relatively lightweight operation.
Each tablet server has a background thread that periodically goes through each TxnStatusManager
hosted on the server, and aborts all transactions that have not been heartbeat to within a
configured interval. Only transactions in the OPEN
state are automatically aborted.
Transaction participants keep track of local transaction state, ensuring that transactional writes are only accepted if the transaction has not yet been committed or aborted. To do this, participants persistently (i.e. via Raft replication) keep track of the transaction state, described below:
-
kInitializing
: the transaction has yet to be fully initialized. Replication may be in progress, but we we cannot consider the transaction as fully open yet. -
kOpen
: the transaction is available for writes. -
kCommitInProgress
: theTxnStatusManager
has signaled to the participant that the transaction should begin committing. The transaction can no longer accept new writes. -
kCommitted
: theTxnStatusManager
has finalized the commit. Transactional rows should be visible to clients. -
kAborted
: theTxnStatusManager
has begun aborting the transaction.
The following state transitions are expected:
BEGIN_TXN BEGIN_COMMIT FINALIZE_COMMIT kInitializing --> kOpen --> kCommitInProgress --> kCommitted
BEGIN_TXN ABORT_TXN kInitializing --> kOpen --> kAborted
As orchestration RPCs are replicated on the participant, these transitions are what are applied to
the underlying tablet, updating an in-memory registry of TxnMetadata
objects per ID, and
persisting the states with tablet metadata.
Before a participant is able to participate in a transaction, it must register itself with the
TxnStatusManager
, and replicate an op that demarcates the beginning of the transaction on all
replicas of the participant.
To ensure this happens, when a transactional write is first received by a leader participant, it first checks to see if it has already completed these preparatory steps. It does so by keeping an in-memory map of “dispatchers”, one per active transaction. A participant’s dispatcher caches whether or not the participant has completed the steps, and if so, simply lets the write through to the prepare pool.
Otherwise it keeps the write request in a queue temporarily, using the TxnSystemClient
to
asynchronously register itself as a participant with the TxnStatusManager
, and then replicate the
op to begin the transaction. Once the request is complete, the queued write requests are submitted
to the prepare threadpool. If either step returns an error, the error is passed back to the writer,
ensuring that retriable errors (e.g. leadership changes) lead to the transactional write op being
retried, and non-retriable errors (e.g. invalid transaction ID) are shown to users.
Once the transaction has begun committing on the participant, or begun aborting, the transaction’s
dispatcher is unregistered. Further attempts to write to the transaction may instantiate a new
dispatcher, but the attempt to register the participant will fail, given the registration will be
rejected by the TxnStatusManager
.
The “commit” condition is different for transactions than regular write ops, which only need to be considered “applied” to be visible to users. The goals for commit are:
-
Stop accepting writes once a
BEGIN_COMMIT
op has been replicated on a participant. -
Only show results that have been committed, as indicated by the replication of a
FINALIZE_COMMIT
op on a participant, which contains a commit timestamp selected by theTxnStatusManager
after receiving allBEGIN_COMMIT
op responses.
To accomplish this, when finishing the replication of a BEGIN_COMMIT
op, rather than demarcating
the associated MVCC op as completed, the Txn keeps track of the BEGIN_COMMIT
MVCC op timestamp,
allowing a FINALIZE_COMMIT
op to complete replicating, and then mark the BEGIN_COMMIT
MVCC op as
applied.
The commit timestamp is selected by the TxnStatusManager
to be the highest of all BEGIN_COMMIT
op timestamps, so since the commit timestamp of a transaction will be greater than all
BEGIN_COMMIT
timestamps (it is selected as the highest of all BEGIN_COMMIT
timestamps) of all
participants, if serving a scan at time t, it is sufficient to wait until all ops before t are
applied.
When reading rows, the commit condition changes depending on the kind of snapshot we are using:
-
Timestamp (as in
READ_AT_SNAPSHOT
,READ_YOUR_WRITES
, diff scan): The transaction is considered committed if theTxnMetadata
has both aBEGIN_COMMIT
op that has been applied and a commit timestamp that falls within the range of the snapshot. -
Latest (as in
READ_LATEST
): The transaction is considered committed if theTxnMetadata
has aBEGIN_COMMIT
op that has been applied, since we only apply theBEGIN_COMMIT
op after replicating theFINALIZE_COMMIT
op.
In addition to the single MRS that tablets traditionally have, tablets now maintain a map of
uncommitted transaction ID to dirty MRS, and a set of committed MRSs that were inserted as a part of
transactions. Each such MRS has a shared reference to a TxnMetadata
instance that is maintained as
a part of TabletMetadata
.
When a transaction is begun on a participant, a transactional MRS is created for it. Transactional
write ops first check all DRSs for row presence, then the main, non-transactional MRS, then finally
attempt to insert to the transactional MRS. As FINALIZE_COMMIT
ops are applied, uncommitted MRSs
are moved to the set of committed MRSs.
When scanning through a transactional MRS, when evaluating whether a base insert is relevant to a
given scan, Kudu checks to see if the MRS’s TxnMetadata
should be considered committed with the
given MVCC snapshot. Updates to the base inserts are evaluated as normal, wherein the visibility
condition is based on whether the updates have been applied (updates are not yet supported).
Transactional MemRowSets are not flushed to disk until they are fully committed, at which point the memory usage of all committed MRSs are lumped together when considering whether to flush. When flushing, all committed MRSs are taken to be the flush input, similar to a merge compaction, and DRSs are written as though we were compacting multiple MRSs. When flushed, on-disk timestamps are written as normal, using the rows’ commit timestamps, and circumventing the need to consult transaction metadata to evaluate upon further scans.
Currently, per-transaction partition-level locking is supported, in tandem with per-op row-level locking. Each tablet may participate in at most one transaction at a time.
To avoid deadlocks, if a transaction were to attempt to acquire a partition lock that is held by a
transaction with a lower transaction ID, the later transaction aborts itself — the tablet server
sends a best-effort request to the TxnStatusManager
to abort the transaction, i.e. the caller
transaction “dies”. If the later transaction had a lower transaction ID, the op should be retried,
and the caller transaction “waits”.
To ensure rows are properly locked in the presence of both transactional and non-transactional workloads, all non-transactional write ops also take the partition lock with the maximum transaction ID. This means that all transactional write ops will wait for non-transactional writes to finish, and non-transactional writes will abort in the presence of a multi-row transaction inserting rows into the same tablet.
Both the C++ and Java client leverage the existing session-based API that users have come to know.
With transactions, however, there is also the concept of transaction handles, which serve as the
initiators of transactions, the vehicle with which to create transactional sessions, the object
with which to orchestrate the commit or rollback of a transaction, and a means to signal activity on
a transaction via automatic heartbeating to prevent the TxnStatusManager
from culling stale
transactions.
shared_ptr<KuduTransaction> txn;
KUDU_RETURN_NOT_OK(client->NewTransaction(&txn));
shared_ptr<KuduSession> session;
KUDU_RETURN_NOT_OK(txn->CreateSession(&session));
// ... insert rows to 'session'
KUDU_RETURN_NOT_OK(session->Flush());
KUDU_RETURN_NOT_OK(txn->Commit());
KuduTransaction txn = client.newTransaction();
KuduSession session = txn.newKuduSession();
// ... insert rows to 'session'
session.flush();
txn.commit();
Under the hood, each newly-created transaction handle, while kept in scope, also heartbeats through
to the TxnStatusManagers
to signal activity for the transaction, preventing it from being culled
by the transaction staleness checkers. Client applications do not need to explicitly heartbeat.
Given we expect there to be distributed actors that participate in a given transaction, Kudu also
exposes a way to transmit transaction handles across processes. Rather than exposing internal
details like the transaction ID, we serialize and deserialize a TxnTokenPB
that contains metadata
about the transaction.
string txn_token;
shared_ptr<KuduTransaction> txn;
KUDU_RETURN_NOT_OK(client_->NewTransaction(&txn));
KUDU_RETURN_NOT_OK(txn->Serialize(&txn_token));
shared_ptr<KuduTransaction> same_txn;
KuduTransaction::Deserialize(client, txn_token, &same_txn)
KuduTransaction txn = client.newTransaction();
byte[] txn_token = txn.serialize();
KuduTransaction sameTxn = KuduTransaction.deserialize(txn_token, asyncClient)
Since we typically expect there to be a single driver of a transaction and multiple actors as
participants of the transaction, by default, deserialized transaction handles do not heartbeat.
The expectation is that the drivers will continue heartbeating until the transaction is complete.
This can be toggled by passing customized SerializationOptions
.