Skip to content

Commit

Permalink
client: Pass []roachpb.RequestUnion around internally
Browse files Browse the repository at this point in the history
Like the previous commit, this is in preparation for using a BatchRequest.
  • Loading branch information
tbg committed May 9, 2016
1 parent f4548d2 commit 078f91f
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 38 deletions.
51 changes: 31 additions & 20 deletions client/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type Batch struct {
// // string(b.Results[0].Rows[0].Key) == "a"
// // string(b.Results[1].Rows[0].Key) == "b"
Results []Result
reqs []roachpb.Request
reqs []roachpb.RequestUnion
// If nonzero, limits the total amount of key/values returned by all Scan/ReverseScan operations
// in the batch. This can only be used if all requests are of the same type, and that type is
// Scan or ReverseScan.
Expand Down Expand Up @@ -112,7 +112,7 @@ func (b *Batch) fillResults(br *roachpb.BatchResponse, pErr *roachpb.Error) erro
result := &b.Results[i]

for k := 0; k < result.calls; k++ {
args := b.reqs[offset+k]
args := b.reqs[offset+k].GetInner()

var reply roachpb.Response
if result.PErr == nil {
Expand Down Expand Up @@ -207,7 +207,8 @@ func (b *Batch) fillResults(br *roachpb.BatchResponse, pErr *roachpb.Error) erro

default:
if result.PErr == nil {
result.PErr = roachpb.NewErrorf("unsupported reply: %T", reply)
result.PErr = roachpb.NewErrorf("unsupported reply: %T for %T",
reply, args)
}

// Nothing to do for all methods below as they do not generate
Expand Down Expand Up @@ -240,6 +241,16 @@ func (b *Batch) fillResults(br *roachpb.BatchResponse, pErr *roachpb.Error) erro
return nil
}

func (b *Batch) appendReqs(args ...roachpb.Request) {
rus := make([]roachpb.RequestUnion, 0, len(args))
for _, arg := range args {
var ru roachpb.RequestUnion
ru.MustSetInner(arg)
rus = append(rus, ru)
}
b.reqs = append(b.reqs, rus...)
}

// InternalAddRequest adds the specified requests to the batch. It is intended
// for internal use only.
func (b *Batch) InternalAddRequest(reqs ...roachpb.Request) {
Expand All @@ -253,7 +264,7 @@ func (b *Batch) InternalAddRequest(reqs ...roachpb.Request) {
*roachpb.DeleteRequest:
numRows = 1
}
b.reqs = append(b.reqs, args)
b.appendReqs(args)
b.initResult(1 /* calls */, numRows, nil)
}
}
Expand All @@ -271,7 +282,7 @@ func (b *Batch) Get(key interface{}) {
b.initResult(0, 1, err)
return
}
b.reqs = append(b.reqs, roachpb.NewGet(k))
b.appendReqs(roachpb.NewGet(k))
b.initResult(1, 1, nil)
}

Expand All @@ -287,9 +298,9 @@ func (b *Batch) put(key, value interface{}, inline bool) {
return
}
if inline {
b.reqs = append(b.reqs, roachpb.NewPutInline(k, v))
b.appendReqs(roachpb.NewPutInline(k, v))
} else {
b.reqs = append(b.reqs, roachpb.NewPut(k, v))
b.appendReqs(roachpb.NewPut(k, v))
}
b.initResult(1, 1, nil)
}
Expand Down Expand Up @@ -345,7 +356,7 @@ func (b *Batch) CPut(key, value, expValue interface{}) {
b.initResult(0, 1, err)
return
}
b.reqs = append(b.reqs, roachpb.NewConditionalPut(k, v, ev))
b.appendReqs(roachpb.NewConditionalPut(k, v, ev))
b.initResult(1, 1, nil)
}

Expand All @@ -366,7 +377,7 @@ func (b *Batch) InitPut(key, value interface{}) {
b.initResult(0, 1, err)
return
}
b.reqs = append(b.reqs, roachpb.NewInitPut(k, v))
b.appendReqs(roachpb.NewInitPut(k, v))
b.initResult(1, 1, nil)
}

Expand All @@ -384,7 +395,7 @@ func (b *Batch) Inc(key interface{}, value int64) {
b.initResult(0, 1, err)
return
}
b.reqs = append(b.reqs, roachpb.NewIncrement(k, value))
b.appendReqs(roachpb.NewIncrement(k, value))
b.initResult(1, 1, nil)
}

Expand All @@ -400,9 +411,9 @@ func (b *Batch) scan(s, e interface{}, maxRows int64, isReverse bool) {
return
}
if !isReverse {
b.reqs = append(b.reqs, roachpb.NewScan(roachpb.Key(begin), roachpb.Key(end), maxRows))
b.appendReqs(roachpb.NewScan(roachpb.Key(begin), roachpb.Key(end), maxRows))
} else {
b.reqs = append(b.reqs, roachpb.NewReverseScan(roachpb.Key(begin), roachpb.Key(end), maxRows))
b.appendReqs(roachpb.NewReverseScan(roachpb.Key(begin), roachpb.Key(end), maxRows))
}
b.initResult(1, 0, nil)
}
Expand Down Expand Up @@ -445,7 +456,7 @@ func (b *Batch) CheckConsistency(s, e interface{}, withDiff bool) {
b.initResult(0, 0, err)
return
}
b.reqs = append(b.reqs, roachpb.NewCheckConsistency(roachpb.Key(begin), roachpb.Key(end), withDiff))
b.appendReqs(roachpb.NewCheckConsistency(roachpb.Key(begin), roachpb.Key(end), withDiff))
b.initResult(1, 0, nil)
}

Expand All @@ -462,8 +473,8 @@ func (b *Batch) ChangeFrozen(s, e interface{}, mustVersion string, frozen bool)
b.initResult(0, 0, err)
return
}
b.reqs = append(b.reqs,
roachpb.NewChangeFrozen(roachpb.Key(begin), roachpb.Key(end), frozen, mustVersion))
b.appendReqs(roachpb.NewChangeFrozen(
roachpb.Key(begin), roachpb.Key(end), frozen, mustVersion))
b.initResult(1, 1, nil)
}

Expand All @@ -474,7 +485,7 @@ func (b *Batch) ChangeFrozen(s, e interface{}, mustVersion string, frozen bool)
//
// key can be either a byte slice or a string.
func (b *Batch) Del(keys ...interface{}) {
var reqs []roachpb.Request
reqs := make([]roachpb.Request, 0, len(keys))
for _, key := range keys {
k, err := marshalKey(key)
if err != nil {
Expand All @@ -483,7 +494,7 @@ func (b *Batch) Del(keys ...interface{}) {
}
reqs = append(reqs, roachpb.NewDelete(k))
}
b.reqs = append(b.reqs, reqs...)
b.appendReqs(reqs...)
b.initResult(len(reqs), len(reqs), nil)
}

Expand All @@ -504,7 +515,7 @@ func (b *Batch) DelRange(s, e interface{}, returnKeys bool) {
b.initResult(0, 0, err)
return
}
b.reqs = append(b.reqs, roachpb.NewDeleteRange(roachpb.Key(begin), roachpb.Key(end), returnKeys))
b.appendReqs(roachpb.NewDeleteRange(roachpb.Key(begin), roachpb.Key(end), returnKeys))
b.initResult(1, 0, nil)
}

Expand All @@ -521,7 +532,7 @@ func (b *Batch) adminMerge(key interface{}) {
Key: k,
},
}
b.reqs = append(b.reqs, req)
b.appendReqs(req)
b.initResult(1, 0, nil)
}

Expand All @@ -539,6 +550,6 @@ func (b *Batch) adminSplit(splitKey interface{}) {
},
}
req.SplitKey = k
b.reqs = append(b.reqs, req)
b.appendReqs(req)
b.initResult(1, 0, nil)
}
15 changes: 10 additions & 5 deletions client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ func (db *DB) CheckConsistency(begin, end interface{}, withDiff bool) error {
// returning the appropriate error which is either from the first failing call,
// or an "internal" error.
func sendAndFill(
send func(roachpb.Header, ...roachpb.Request) (*roachpb.BatchResponse, *roachpb.Error),
send func(roachpb.Header, []roachpb.RequestUnion) (*roachpb.BatchResponse, *roachpb.Error),
b *Batch,
) (*roachpb.BatchResponse, error) {
// Errors here will be attached to the results, so we will get them from
Expand All @@ -427,7 +427,11 @@ func sendAndFill(
var ba roachpb.BatchRequest // TODO
ba.MaxScanResults = b.MaxScanResults
ba.ReadConsistency = b.ReadConsistency
br, pErr := send(ba.Header, b.reqs...)
// TODO(tschottdorf): this nonsensical copy is required since at the time
// of writing, the chunking and masking code in DistSender operates on the
// original data (as can readily be seen by a whole bunch of test failures.
reqs := append([]roachpb.RequestUnion(nil), b.reqs...) // HACK
br, pErr := send(ba.Header, reqs)
if pErr != nil {
// Discard errors from fillResults.
_ = b.fillResults(nil, pErr)
Expand Down Expand Up @@ -484,13 +488,14 @@ func (db *DB) Txn(retryable func(txn *Txn) error) error {
// send runs the specified calls synchronously in a single batch and returns
// any errors. Returns a nil response for empty input (no requests).
func (db *DB) send(h roachpb.Header,
reqs ...roachpb.Request) (*roachpb.BatchResponse, *roachpb.Error) {
reqs []roachpb.RequestUnion) (*roachpb.BatchResponse, *roachpb.Error) {
if len(reqs) == 0 {
return nil, nil
}

if h.ReadConsistency == roachpb.INCONSISTENT {
for _, req := range reqs {
for _, ru := range reqs {
req := ru.GetInner()
if req.Method() != roachpb.Get && req.Method() != roachpb.Scan &&
req.Method() != roachpb.ReverseScan {
return nil, roachpb.NewErrorf("method %s not allowed with INCONSISTENT batch", req.Method)
Expand All @@ -501,7 +506,7 @@ func (db *DB) send(h roachpb.Header,
ba := roachpb.BatchRequest{
Header: h,
}
ba.Add(reqs...)
ba.Requests = reqs

if db.userPriority != 1 {
ba.UserPriority = db.userPriority
Expand Down
28 changes: 17 additions & 11 deletions client/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ func (txn *Txn) CommitInBatchWithResponse(b *Batch) (*roachpb.BatchResponse, err
if txn != b.txn {
return nil, util.Errorf("a batch b can only be committed by b.txn")
}
b.reqs = append(b.reqs, endTxnReq(true /* commit */, txn.deadline, txn.SystemConfigTrigger()))
b.appendReqs(endTxnReq(true /* commit */, txn.deadline, txn.SystemConfigTrigger()))
b.initResult(1, 0, nil)
resp, err := txn.RunWithResponse(b)
if err == nil {
Expand Down Expand Up @@ -451,7 +451,9 @@ func (txn *Txn) Rollback() error {
}

func (txn *Txn) sendEndTxnReq(commit bool, deadline *roachpb.Timestamp) error {
_, err := txn.send(roachpb.Header{}, endTxnReq(commit, deadline, txn.SystemConfigTrigger()))
var ru roachpb.RequestUnion
ru.MustSetInner(endTxnReq(commit, deadline, txn.SystemConfigTrigger()))
_, err := txn.send(roachpb.Header{}, []roachpb.RequestUnion{ru})
return err.GoError()
}

Expand Down Expand Up @@ -594,7 +596,7 @@ RetryLoop:
// EndTransaction call is silently dropped, allowing the caller to
// always commit or clean-up explicitly even when that may not be
// required (or even erroneous).
func (txn *Txn) send(h roachpb.Header, reqs ...roachpb.Request) (
func (txn *Txn) send(h roachpb.Header, reqs []roachpb.RequestUnion) (
*roachpb.BatchResponse, *roachpb.Error) {

if txn.Proto.Status != roachpb.PENDING || txn.IsFinalized() {
Expand Down Expand Up @@ -622,7 +624,8 @@ func (txn *Txn) send(h roachpb.Header, reqs ...roachpb.Request) (
firstWriteIndex := -1
var firstWriteKey roachpb.Key

for i, args := range reqs {
for i, ru := range reqs {
args := ru.GetInner()
if i < lastIndex {
if _, ok := args.(*roachpb.EndTransactionRequest); ok {
return nil, roachpb.NewErrorf("%s sent as non-terminal call", args.Method())
Expand All @@ -635,32 +638,35 @@ func (txn *Txn) send(h roachpb.Header, reqs ...roachpb.Request) (
}

haveTxnWrite := firstWriteIndex != -1
endTxnRequest, haveEndTxn := reqs[lastIndex].(*roachpb.EndTransactionRequest)
endTxnRequest, haveEndTxn := reqs[lastIndex].GetInner().(*roachpb.EndTransactionRequest)
needBeginTxn := !txn.Proto.Writing && haveTxnWrite
needEndTxn := txn.Proto.Writing || haveTxnWrite
elideEndTxn := haveEndTxn && !needEndTxn

// If we're not yet writing in this txn, but intend to, insert a
// begin transaction request before the first write command.
if needBeginTxn {
bt := &roachpb.BeginTransactionRequest{
// If the transaction already has a key (we're in a restart), make
// sure we set the key in the begin transaction request to the original.
req := &roachpb.BeginTransactionRequest{
Span: roachpb.Span{
Key: firstWriteKey,
},
}
// If the transaction already has a key (we're in a restart), make
// sure we set the key in the begin transaction request to the original.
if txn.Proto.Key != nil {
bt.Key = txn.Proto.Key
req.Key = txn.Proto.Key
}
reqs = append(append(append([]roachpb.Request(nil), reqs[:firstWriteIndex]...), bt), reqs[firstWriteIndex:]...)

var bt roachpb.RequestUnion
bt.MustSetInner(req)
reqs = append(append(append([]roachpb.RequestUnion(nil), reqs[:firstWriteIndex]...), bt), reqs[firstWriteIndex:]...)
}

if elideEndTxn {
reqs = reqs[:lastIndex]
}

br, pErr := txn.db.send(h, reqs...)
br, pErr := txn.db.send(h, reqs)
if elideEndTxn && pErr == nil {
// Check that read only transactions do not violate their deadline.
if endTxnRequest.Deadline != nil {
Expand Down
6 changes: 4 additions & 2 deletions roachpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,15 +150,17 @@ func (br *BatchResponse) Combine(otherBatch *BatchResponse) error {
return nil
}

// Add adds a request to the batch request.
// Add adds a request to the batch request. It's a convenience method;
// requests may also be added directly into the slice.
func (ba *BatchRequest) Add(requests ...Request) {
for _, args := range requests {
ba.Requests = append(ba.Requests, RequestUnion{})
ba.Requests[len(ba.Requests)-1].MustSetInner(args)
}
}

// Add adds a response to the batch response.
// Add adds a response to the batch response. It's a convenience method;
// responses may also be added directly.
func (br *BatchResponse) Add(reply Response) {
br.Responses = append(br.Responses, ResponseUnion{})
br.Responses[len(br.Responses)-1].MustSetInner(reply)
Expand Down

0 comments on commit 078f91f

Please sign in to comment.