Skip to content

Commit

Permalink
Measure transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
juliannguyen4 committed Jan 3, 2024
1 parent 9c10c4a commit 35c8bd1
Show file tree
Hide file tree
Showing 10 changed files with 47 additions and 0 deletions.
24 changes: 24 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,9 @@ func (clnt *Client) BatchExists(policy *BatchPolicy, keys []*Key) ([]bool, Error

// pass nil to make sure it will be cloned and prepared
cmd := newBatchCommandExists(nil, nil, policy, keys, existsArray)

clnt.cluster.addTran()

filteredOut, err := clnt.batchExecute(policy, batchNodes, cmd)
if filteredOut > 0 {
err = chainErrors(ErrFilteredOut.err(), err)
Expand Down Expand Up @@ -518,6 +521,9 @@ func (clnt *Client) BatchGet(policy *BatchPolicy, keys []*Key, binNames ...strin
}

cmd := newBatchCommandGet(nil, nil, policy, keys, binNames, nil, records, _INFO1_READ, false)

clnt.cluster.addTran()

filteredOut, err := clnt.batchExecute(policy, batchNodes, cmd)
if err != nil && !policy.AllowPartialResults {
return nil, err
Expand Down Expand Up @@ -548,6 +554,9 @@ func (clnt *Client) BatchGetOperate(policy *BatchPolicy, keys []*Key, ops ...*Op
}

cmd := newBatchCommandGet(nil, nil, policy, keys, nil, ops, records, _INFO1_READ, true)

clnt.cluster.addTran()

filteredOut, err := clnt.batchExecute(policy, batchNodes, cmd)
if err != nil && !policy.AllowPartialResults {
return nil, err
Expand All @@ -571,6 +580,8 @@ func (clnt *Client) BatchGetComplex(policy *BatchPolicy, records []*BatchRead) E

cmd := newBatchIndexCommandGet(nil, policy, records, true)

clnt.cluster.addTran()

batchNodes, err := newBatchIndexNodeList(clnt.cluster, policy, records)
if err != nil {
return err
Expand Down Expand Up @@ -606,6 +617,9 @@ func (clnt *Client) BatchGetHeader(policy *BatchPolicy, keys []*Key) ([]*Record,
}

cmd := newBatchCommandGet(nil, nil, policy, keys, nil, nil, records, _INFO1_READ|_INFO1_NOBINDATA, false)

clnt.cluster.addTran()

filteredOut, err := clnt.batchExecute(policy, batchNodes, cmd)
if err != nil && !policy.AllowPartialResults {
return nil, err
Expand Down Expand Up @@ -642,6 +656,9 @@ func (clnt *Client) BatchDelete(policy *BatchPolicy, deletePolicy *BatchDeletePo
}

cmd := newBatchCommandDelete(nil, nil, policy, keys, records, attr)

clnt.cluster.addTran()

_, err = clnt.batchExecute(policy, batchNodes, cmd)
return records, err
}
Expand All @@ -662,6 +679,9 @@ func (clnt *Client) BatchOperate(policy *BatchPolicy, records []BatchRecordIfc)
}

cmd := newBatchCommandOperate(nil, nil, policy, records)

clnt.cluster.addTran()

_, err = clnt.batchExecute(policy, batchNodes, cmd)
return err
}
Expand Down Expand Up @@ -693,6 +713,9 @@ func (clnt *Client) BatchExecute(policy *BatchPolicy, udfPolicy *BatchUDFPolicy,
}

cmd := newBatchCommandUDF(nil, nil, policy, keys, packageName, functionName, args, records, attr)

clnt.cluster.addTran()

_, err = clnt.batchExecute(policy, batchNodes, cmd)
return records, err
}
Expand Down Expand Up @@ -750,6 +773,7 @@ func (clnt *Client) ScanPartitions(apolicy *ScanPolicy, partitionFilter *Partiti

// result recordset
res := newRecordset(policy.RecordQueueSize, 1)
clnt.cluster.addTran()
go clnt.scanPartitions(&policy, tracker, namespace, setName, res, binNames...)

return res, nil
Expand Down
6 changes: 6 additions & 0 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,12 @@ func (clstr *Cluster) enableMetrics(policy *MetricsPolicy) {
}
}

func (clstr *Cluster) addTran() {
if clstr.metricsEnabled {
clstr.tranCount.GetAndIncrement()
}
}

// String implements the stringer interface
func (clstr *Cluster) String() string {
return fmt.Sprintf("%v", clstr.GetNodes())
Expand Down
2 changes: 2 additions & 0 deletions delete_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ func newDeleteCommand(cluster *Cluster, policy *WritePolicy, key *Key) (*deleteC
policy: policy,
}

cluster.addTran()

return newDeleteCmd, nil
}

Expand Down
2 changes: 2 additions & 0 deletions exists_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ func newExistsCommand(cluster *Cluster, policy *BasePolicy, key *Key) (*existsCo
}
}

cluster.addTran()

return &existsCommand{
singleCommand: newSingleCommand(cluster, key, partition),
policy: policy,
Expand Down
2 changes: 2 additions & 0 deletions operate_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type operateCommand struct {
}

func newOperateCommand(cluster *Cluster, policy *WritePolicy, key *Key, args operateArgs) (operateCommand, Error) {
cluster.addTran()

rdCommand, err := newReadCommand(cluster, &policy.BasePolicy, key, nil, args.partition)
if err != nil {
return operateCommand{}, err
Expand Down
2 changes: 2 additions & 0 deletions read_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ func newReadCommand(cluster *Cluster, policy *BasePolicy, key *Key, binNames []s
}
}

cluster.addTran()

return readCommand{
singleCommand: newSingleCommand(cluster, key, partition),
binNames: binNames,
Expand Down
2 changes: 2 additions & 0 deletions read_header_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ func newReadHeaderCommand(cluster *Cluster, policy *BasePolicy, key *Key) (readH
policy: policy,
}

cluster.addTran()

return newReadHeaderCmd, nil
}

Expand Down
3 changes: 3 additions & 0 deletions scan_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ func (clnt *Client) scanPartitions(policy *ScanPolicy, tracker *partitionTracker
interval := policy.SleepBetweenRetries

var errs Error

clnt.cluster.addTran()

for {
list, err := tracker.assignPartitionsToNodes(clnt.Cluster(), namespace)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions touch_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ func newTouchCommand(cluster *Cluster, policy *WritePolicy, key *Key) (touchComm
policy: policy,
}

cluster.addTran()

return newTouchCmd, nil
}

Expand Down
2 changes: 2 additions & 0 deletions write_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ func newWriteCommand(cluster *Cluster,
operation: operation,
}

cluster.addTran()

return newWriteCmd, nil
}

Expand Down

0 comments on commit 35c8bd1

Please sign in to comment.