Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
82 lines (68 sloc) 2.45 KB
// Copyright 2014 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package batcheval
import (
"context"
"fmt"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/spanset"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)
func init() {
RegisterCommand(roachpb.HeartbeatTxn, declareKeysHeartbeatTransaction, HeartbeatTxn)
}
func declareKeysHeartbeatTransaction(
desc *roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet,
) {
declareKeysWriteTransaction(desc, header, req, spans)
}
// HeartbeatTxn updates the transaction status and heartbeat
// timestamp after receiving transaction heartbeat messages from
// coordinator. Returns the updated transaction.
func HeartbeatTxn(
ctx context.Context, batch engine.ReadWriter, cArgs CommandArgs, resp roachpb.Response,
) (result.Result, error) {
args := cArgs.Args.(*roachpb.HeartbeatTxnRequest)
h := cArgs.Header
reply := resp.(*roachpb.HeartbeatTxnResponse)
if err := VerifyTransaction(h, args, roachpb.PENDING, roachpb.STAGING); err != nil {
return result.Result{}, err
}
if args.Now.IsEmpty() {
return result.Result{}, fmt.Errorf("now not specified for heartbeat")
}
key := keys.TransactionKey(h.Txn.Key, h.Txn.ID)
var txn roachpb.Transaction
if ok, err := engine.MVCCGetProto(
ctx, batch, key, hlc.Timestamp{}, &txn, engine.MVCCGetOptions{},
); err != nil {
return result.Result{}, err
} else if !ok {
// No existing transaction record was found - create one by writing
// it below.
txn = *h.Txn
// Verify that it is safe to create the transaction record.
if err := CanCreateTxnRecord(cArgs.EvalCtx, &txn); err != nil {
return result.Result{}, err
}
}
if !txn.Status.IsFinalized() {
txn.LastHeartbeat.Forward(args.Now)
txnRecord := txn.AsRecord()
if err := engine.MVCCPutProto(ctx, batch, cArgs.Stats, key, hlc.Timestamp{}, nil, &txnRecord); err != nil {
return result.Result{}, err
}
}
reply.Txn = &txn
return result.Result{}, nil
}
You can’t perform that action at this time.