Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
272 changes: 261 additions & 11 deletions adapter/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -2654,7 +2654,30 @@ func (t *txnContext) validateReadSet(ctx context.Context) error {
return nil
}

func (t *txnContext) commit() error {
// preparedTxnDispatch is the fully-assembled write set + read set + commit
// timestamp for a MULTI/EXEC transaction, ready to be passed to
// coordinator.Dispatch. Split out from commit() so the option-2 dedup
// path (runTransactionWithDedup) can intercept between prepare and
// dispatch — it needs to capture (elems, commitTS, readKeys) for a
// possible retry under PrevCommitTS without otherwise duplicating the
// commit-building logic. The owned ctx is the redisDispatchTimeout-
// bounded context the caller must run Dispatch under and Cancel after.
type preparedTxnDispatch struct {
elems []*kv.Elem[kv.OP]
commitTS uint64
readKeys [][]byte
ctx context.Context
cancel context.CancelFunc
}

// prepareDispatch builds everything Dispatch needs (elems, commitTS,
// readKeys, ctx) without actually calling Dispatch. Callers must always
// invoke `cancel()` on the returned prepared value once the dispatch
// attempt finishes (commit() does this via defer; the dedup path does it
// per retry iteration). When the transaction has no writes this returns
// a prepared value with empty `elems` and a no-op cancel — callers can
// check len(prepared.elems)==0 and skip the dispatch.
func (t *txnContext) prepareDispatch() (preparedTxnDispatch, error) {
elems := t.buildKeyElems()

// Pre-allocate commitTS so Delta keys can embed it in their bytes before
Expand All @@ -2663,7 +2686,7 @@ func (t *txnContext) commit() error {
listElems := t.buildListElems(commitTS)
zsetElems, err := t.buildZSetElems(commitTS)
if err != nil {
return err
return preparedTxnDispatch{cancel: func() {}}, err
}
// TTL elements: string keys have TTL embedded in value (buildKeyElems handles that),
// non-string keys get a !redis|ttl| element written in the same transaction.
Expand All @@ -2675,33 +2698,48 @@ func (t *txnContext) commit() error {
// run on the server-lifetime handlerContext, leaving its scans uncancellable
// from the request side on a slow disk or hot-key pathological commit.
ctx, cancel := context.WithTimeout(t.server.handlerContext(), redisDispatchTimeout)
defer cancel()

streamElems, err := t.buildStreamDeletionElems(ctx)
if err != nil {
return err
cancel()
return preparedTxnDispatch{cancel: func() {}}, err
}

elems = append(elems, listElems...)
elems = append(elems, zsetElems...)
elems = append(elems, ttlElems...)
elems = append(elems, streamElems...)
if len(elems) == 0 {
return nil
}

readKeys := make([][]byte, 0, len(t.readKeys))
for _, k := range t.readKeys {
readKeys = append(readKeys, k)
}
return preparedTxnDispatch{
elems: elems,
commitTS: commitTS,
readKeys: readKeys,
ctx: ctx,
cancel: cancel,
}, nil
}

func (t *txnContext) commit() error {
prepared, err := t.prepareDispatch()
if err != nil {
return err
}
defer prepared.cancel()
if len(prepared.elems) == 0 {
return nil
}
group := &kv.OperationGroup[kv.OP]{
IsTxn: true,
Elems: elems,
Elems: prepared.elems,
StartTS: t.startTS,
CommitTS: commitTS,
ReadKeys: readKeys,
CommitTS: prepared.commitTS,
ReadKeys: prepared.readKeys,
}
if _, err := t.server.coordinator.Dispatch(ctx, group); err != nil {
if _, err := t.server.coordinator.Dispatch(prepared.ctx, group); err != nil {
return errors.WithStack(err)
}
return nil
Expand Down Expand Up @@ -2978,6 +3016,10 @@ func (t *txnContext) buildTTLElems() []*kv.Elem[kv.OP] {
}

func (r *RedisServer) runTransaction(queue []redcon.Command) ([]redisResult, error) {
if r.onePhaseTxnDedup {
return r.runTransactionWithDedup(queue)
}

dispatchCtx, cancel := context.WithTimeout(r.handlerContext(), redisDispatchTimeout)
defer cancel()

Expand Down Expand Up @@ -3024,6 +3066,214 @@ func (r *RedisServer) runTransaction(queue []redcon.Command) ([]redisResult, err
return results, nil
}

// reusableExecTxn captures a dispatched MULTI/EXEC transaction so a
// subsequent retry can reuse its exact write set under a fresh
// commit_ts (carrying prev_commit_ts) and probe whether the prior
// attempt already landed. This is the EXEC analogue of
// reusableListPush (M3 R1 result reconstruction for MULTI/EXEC).
//
// `results` is computed once from attempt 1's startTS snapshot and is
// invariant across reuse for the same reason RPUSH/LPUSH's `length`
// is: the write set is fixed, so apply-vs-no-op is invisible to the
// client. Reads in the EXEC body returned values from attempt 1's
// snapshot — those values were what the client would have observed if
// attempt 1 hadn't returned an ambiguous error, so caching them is
// the right semantics for a confirmed-or-deduped commit. A
// genuine cross-txn conflict is caught by OCC on readKeys at the FSM
// apply (WriteConflict → drop pending → recompute), so the cached
// results are only returned when reuse actually represents the
// outcome of attempt 1's intent.
type reusableExecTxn struct {
elems []*kv.Elem[kv.OP]
startTS uint64
commitTS uint64
readKeys [][]byte
results []redisResult
}

// dispatchExecReuse runs one iteration of the option-2 reuse path for
// MULTI/EXEC: dispatches the captured write set under a fresh
// commit_ts (carrying pending.commitTS as PrevCommitTS so the FSM
// probes whether the prior attempt landed) and returns the cached
// client-visible results on success. The drop return signals the
// caller to clear pending — set on a genuine WriteConflict from
// another txn (after the self-conflict probe rules out our own apply)
// so the next iteration rebuilds the txn from a fresh read snapshot.
//
// Mirrors dispatchListPushReuse; the only difference is the result
// payload (cached []redisResult vs computed list length) and the lack
// of a meta re-read fallback — for EXEC there is no post-apply "what
// is the current length" question; the client-visible result IS the
// cached results array.
func (r *RedisServer) dispatchExecReuse(ctx context.Context, pending *reusableExecTxn) (results []redisResult, drop bool, err error) {
commitTS := r.coordinator.Clock().Next()
_, dispErr := r.coordinator.Dispatch(ctx, &kv.OperationGroup[kv.OP]{
IsTxn: true,
StartTS: pending.startTS,
CommitTS: commitTS,
PrevCommitTS: pending.commitTS,
ReadKeys: pending.readKeys,
Elems: pending.elems,
})
if dispErr == nil {
return pending.results, false, nil
}
if errors.Is(dispErr, store.ErrWriteConflict) {
// Self-inflicted-conflict guard (mirrors dispatchListPushReuse):
// the apply might have landed at this fresh commitTS but bubbled
// up as WriteConflict due to leadership churn. Probe whether our
// reused write set actually landed; if yes, return the cached
// results unchanged (they describe the EXEC body's outcome
// against attempt 1's snapshot, which is the outcome whether
// the bytes hit MVCC at attempt-1's commitTS or at this fresh
// commitTS — the OCC fence on readKeys guarantees no
// intervening cross-txn write slipped past).
if probeKey := firstWriteKey(pending.elems); len(probeKey) > 0 {
landed, perr := r.store.CommittedVersionAt(ctx, probeKey, commitTS)
if perr == nil && landed {
pending.commitTS = commitTS
return pending.results, false, nil
}
}
// Our attempt did not land at commitTS and a key collides with
// another txn — genuine conflict. Drop pending so the next
// iteration rebuilds from a fresh snapshot.
return nil, true, errors.WithStack(dispErr)
}
// Still ambiguous (lock / other retryable): the reuse may itself
// have landed, so the next retry must probe THIS commit_ts. Only
// advance pending.commitTS if retryRedisWrite will actually loop
// (non-retryable errors escape to the client; pending is then
// discarded with the goroutine).
if isRetryableRedisTxnErr(dispErr) {
pending.commitTS = commitTS
}
return nil, false, errors.WithStack(dispErr)
}

// runTransactionWithDedup is the option-2 retry loop for MULTI/EXEC.
// The first attempt builds the txn write set + cached results from
// the user's startTS snapshot; any retryable failure makes the next
// iteration REUSE that write set under a fresh commit_ts with
// prev_commit_ts set, so the FSM no-ops if the prior attempt already
// landed. A WriteConflict on a reuse attempt (after the self-conflict
// probe rules out our own apply) means another txn touched a read or
// write key, and we drop pending → rebuild from a fresh snapshot.
//
// Mirrors listPushCoreWithDedup at the EXEC granularity.
func (r *RedisServer) runTransactionWithDedup(queue []redcon.Command) ([]redisResult, error) {
dispatchCtx, cancel := context.WithTimeout(r.handlerContext(), redisDispatchTimeout)
defer cancel()

var results []redisResult
var pending *reusableExecTxn
err := r.retryRedisWrite(dispatchCtx, func() error {
if pending != nil {
reuseCtx, reuseCancel := context.WithTimeout(r.handlerContext(), redisDispatchTimeout)
defer reuseCancel()
res, drop, dispErr := r.dispatchExecReuse(reuseCtx, pending)
if drop {
pending = nil
}
if dispErr != nil {
return dispErr
}
results = res
return nil
}
res, next, ferr := r.firstExecAttempt(dispatchCtx, queue)
if ferr != nil {
if next != nil {
pending = next
}
return ferr
}
results = res
return nil
})
if err != nil {
return nil, err
}
return results, nil
}

// firstExecAttempt runs the initial (no-reuse) EXEC attempt: builds the
// txn snapshot, applies each command to capture the client-visible
// results, validates the read set, and dispatches. On success returns
// the results. On a retryable dispatch failure it returns a
// reusableExecTxn capturing what the retry loop needs to dispatch via
// PrevCommitTS on the next iteration; non-retryable failures return a
// nil reuse state (mirrors listPushCoreWithDedup's gating). Extracted
// from runTransactionWithDedup to keep that loop under the cyclop
// budget; the dedup rationale lives there.
func (r *RedisServer) firstExecAttempt(dispatchCtx context.Context, queue []redcon.Command) ([]redisResult, *reusableExecTxn, error) {
startTS := r.txnStartTS()
readPin := r.pinReadTS(startTS)
defer readPin.Release()

txn := &txnContext{
server: r,
ctx: dispatchCtx,
working: map[string]*txnValue{},
listStates: map[string]*listTxnState{},
zsetStates: map[string]*zsetTxnState{},
ttlStates: map[string]*ttlTxnState{},
readKeys: map[string][]byte{},
streamDeletions: map[string][]byte{},
startTS: startTS,
}

nextResults := make([]redisResult, 0, len(queue))
for _, cmd := range queue {
res, err := txn.apply(cmd)
if err != nil {
return nil, nil, err
}
nextResults = append(nextResults, res)
}

if err := txn.validateReadSet(dispatchCtx); err != nil {
return nil, nil, err
}

prepared, err := txn.prepareDispatch()
if err != nil {
return nil, nil, err
}
defer prepared.cancel()
if len(prepared.elems) == 0 {
// Read-only EXEC: nothing to dispatch, no dedup window.
return nextResults, nil, nil
}

group := &kv.OperationGroup[kv.OP]{
IsTxn: true,
Elems: prepared.elems,
StartTS: txn.startTS,
CommitTS: prepared.commitTS,
ReadKeys: prepared.readKeys,
}
if _, dispErr := r.coordinator.Dispatch(prepared.ctx, group); dispErr != nil {
// Only remember the attempt for reuse if retryRedisWrite will
// actually loop. Mirrors listPushCoreWithDedup's gating
// rationale — errors that escape the loop (transient-leader,
// context deadline, FSM apply error) leave pending pointing at
// state wasted with the goroutine; ambiguous errors that
// escape to the client are out of scope for this loop.
if isRetryableRedisTxnErr(dispErr) {
return nil, &reusableExecTxn{
elems: prepared.elems,
startTS: txn.startTS,
commitTS: prepared.commitTS,
readKeys: prepared.readKeys,
results: nextResults,
}, errors.WithStack(dispErr)
}
return nil, nil, errors.WithStack(dispErr)
}
return nextResults, nil, nil
}

func (r *RedisServer) txnStartTS() uint64 {
// store.LastCommitTS() is the authoritative safe-snapshot watermark: it is
// updated atomically only AFTER the corresponding Pebble batch commit, so
Expand Down
Loading
Loading