/
cmd_put.go
81 lines (72 loc) · 2.51 KB
/
cmd_put.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
// Copyright 2014 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package batcheval
import (
"context"
"time"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/fs"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)
func init() {
RegisterReadWriteCommand(kvpb.Put, declareKeysPut, Put)
}
func declareKeysPut(
rs ImmutableRangeState,
header *kvpb.Header,
req kvpb.Request,
latchSpans *spanset.SpanSet,
lockSpans *lockspanset.LockSpanSet,
maxOffset time.Duration,
) error {
args := req.(*kvpb.PutRequest)
if args.Inline {
return DefaultDeclareKeys(rs, header, req, latchSpans, lockSpans, maxOffset)
} else {
return DefaultDeclareIsolatedKeys(rs, header, req, latchSpans, lockSpans, maxOffset)
}
}
// Put sets the value for a specified key.
func Put(
ctx context.Context, readWriter storage.ReadWriter, cArgs CommandArgs, resp kvpb.Response,
) (result.Result, error) {
args := cArgs.Args.(*kvpb.PutRequest)
h := cArgs.Header
var ts hlc.Timestamp
if !args.Inline {
ts = h.Timestamp
}
opts := storage.MVCCWriteOptions{
Txn: h.Txn,
LocalTimestamp: cArgs.Now,
Stats: cArgs.Stats,
ReplayWriteTimestampProtection: h.AmbiguousReplayProtection,
OmitInRangefeeds: cArgs.OmitInRangefeeds,
MaxLockConflicts: storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV),
TargetLockConflictBytes: storage.TargetBytesPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV),
Category: fs.BatchEvalReadCategory,
}
var err error
var acq roachpb.LockAcquisition
if args.Blind {
acq, err = storage.MVCCBlindPut(ctx, readWriter, args.Key, ts, args.Value, opts)
} else {
acq, err = storage.MVCCPut(ctx, readWriter, args.Key, ts, args.Value, opts)
}
if err != nil {
return result.Result{}, err
}
return result.WithAcquiredLocks(acq), nil
}