Skip to content

Commit

Permalink
[CLIENT-1457] Support scan pagination through ScanPartitions() with P…
Browse files Browse the repository at this point in the history
…artitionFilter
  • Loading branch information
khaf committed Feb 25, 2021
1 parent ed319ec commit bc1764e
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 32 deletions.
64 changes: 64 additions & 0 deletions examples/scan_paginate/scan_paginate.go
@@ -0,0 +1,64 @@
/*
* Copyright 2014-2021 Aerospike, Inc.
*
* Portions may be licensed to Aerospike, Inc. under one or more contributor
* license agreements.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package main

import (
"log"
"time"

as "github.com/aerospike/aerospike-client-go"
shared "github.com/aerospike/aerospike-client-go/examples/shared"
)

func main() {
runExample(shared.Client)

log.Println("Example finished successfully.")
}

func runExample(client *as.Client) {
log.Printf("Scan parallel: namespace=" + *shared.Namespace + " set=" + *shared.Set)
recordCount := 0
begin := time.Now()
policy := as.NewScanPolicy()
policy.MaxRecords = 30

pf := as.NewPartitionFilterAll()

receivedRecords := 1
for receivedRecords > 0 {
receivedRecords = 0

log.Println("Scanning Page:", recordCount/int(policy.MaxRecords))
recordset, err := client.ScanPartitions(policy, pf, *shared.Namespace, *shared.Set)
shared.PanicOnError(err)

for rec := range recordset.Results() {
if rec.Err != nil {
// if there was an error, stop
shared.PanicOnError(err)
}

recordCount++
receivedRecords++
}
}

log.Println("Total records returned: ", recordCount)
log.Println("Elapsed time: ", time.Since(begin), " seconds")
}
22 changes: 9 additions & 13 deletions examples/scan_parallel/scan_parallel.go
Expand Up @@ -33,28 +33,24 @@ func main() {

func runExample(client *as.Client) {
log.Printf("Scan parallel: namespace=" + *shared.Namespace + " set=" + *shared.Set)

recordCount := 0
begin := time.Now()
policy := as.NewScanPolicy()
recordset, err := client.ScanAll(policy, *shared.Namespace, *shared.Set)
shared.PanicOnError(err)

L:
for {
select {
case rec := <-recordset.Records:
if rec == nil {
break L
}
recordCount++

if (recordCount % 10000) == 0 {
log.Println("Records ", recordCount)
}
case err := <-recordset.Errors:
for rec := range recordset.Results() {
if rec.Err != nil {
// if there was an error, stop
shared.PanicOnError(err)
}

recordCount++

if (recordCount % 100000) == 0 {
log.Println("Records ", recordCount)
}
}

end := time.Now()
Expand Down
20 changes: 17 additions & 3 deletions partition_filter.go
Expand Up @@ -17,9 +17,17 @@ package aerospike

// PartitionFilter is used in scan/queries.
type PartitionFilter struct {
begin int
count int
digest []byte
begin int
count int
digest []byte
partitions []*partitionStatus
done bool
}

// NewPartitionFilterAll creates a partition filter that
// reads all the partitions.
func NewPartitionFilterAll() *PartitionFilter {
return newPartitionFilter(0, _PARTITIONS)
}

// NewPartitionFilterById creates a partition filter by partition id.
Expand All @@ -44,3 +52,9 @@ func NewPartitionFilterByKey(key *Key) *PartitionFilter {
func newPartitionFilter(begin, count int) *PartitionFilter {
return &PartitionFilter{begin: begin, count: count}
}

// IsDone returns - if using ScanPolicy.MaxRecords or QueryPolicy,MaxRecords -
// if the previous paginated scans with this partition filter instance return all records?
func (pf *PartitionFilter) IsDone() bool {
return pf.done
}
26 changes: 26 additions & 0 deletions partition_status.go
@@ -0,0 +1,26 @@
// Copyright 2014-2021 Aerospike, Inc.
//
// Portions may be licensed to Aerospike, Inc. under one or more contributor
// license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.

package aerospike

type partitionStatus struct {
id int
done bool
digest []byte
}

func newPartitionStatus(id int) *partitionStatus {
return &partitionStatus{id: id}
}
52 changes: 37 additions & 15 deletions partition_tracker.go
Expand Up @@ -29,6 +29,7 @@ type partitionTracker struct {
partitionBegin int
nodeCapacity int
nodeFilter *Node
partitionFilter *PartitionFilter
nodePartitionsList []*nodePartitions
partitionsCapacity int
maxRecords int64
Expand All @@ -52,7 +53,8 @@ func newPartitionTrackerForNodes(policy *MultiPolicy, nodes []*Node) *partitionT
maxRecords: policy.MaxRecords,
}

pt.partitionsAll = pt.initPartitionTracker(policy, _PARTITIONS, nil)
pt.partitionsAll = pt.initPartitions(policy, _PARTITIONS, nil)
pt.initTimeout(policy)
return &pt
}

Expand All @@ -65,7 +67,8 @@ func newPartitionTrackerForNode(policy *MultiPolicy, nodeFilter *Node) *partitio
maxRecords: policy.MaxRecords,
}

pt.partitionsAll = pt.initPartitionTracker(policy, _PARTITIONS, nil)
pt.partitionsAll = pt.initPartitions(policy, _PARTITIONS, nil)
pt.initTimeout(policy)
return &pt
}

Expand Down Expand Up @@ -94,12 +97,34 @@ func newPartitionTracker(policy *MultiPolicy, filter *PartitionFilter, nodes []*
maxRecords: policy.MaxRecords,
}

pt.partitionsAll = pt.initPartitionTracker(policy, filter.count, filter.digest)
if len(filter.partitions) == 0 {
filter.partitions = pt.initPartitions(policy, filter.count, filter.digest)
} else {
for _, part := range filter.partitions {
part.done = false
}

}

pt.partitionsAll = filter.partitions
pt.partitionFilter = filter
pt.initTimeout(policy)
return pt
}

func (pt *partitionTracker) initPartitionTracker(policy *MultiPolicy, partitionCount int, digest []byte) []*partitionStatus {
func (pt *partitionTracker) initTimeout(policy *MultiPolicy) {
pt.sleepBetweenRetries = policy.SleepBetweenRetries
pt.socketTimeout = policy.SocketTimeout
pt.totalTimeout = policy.TotalTimeout
if pt.totalTimeout > 0 {
pt.deadline = time.Now().Add(pt.totalTimeout)
if pt.socketTimeout == 0 || pt.socketTimeout > pt.totalTimeout {
pt.socketTimeout = pt.totalTimeout
}
}
}

func (pt *partitionTracker) initPartitions(policy *MultiPolicy, partitionCount int, digest []byte) []*partitionStatus {
partsAll := make([]*partitionStatus, partitionCount)

for i := 0; i < partitionCount; i++ {
Expand Down Expand Up @@ -227,7 +252,14 @@ func (pt *partitionTracker) isComplete(policy *BasePolicy) (bool, error) {
partsReceived += np.partsReceived
}

if partsReceived >= partsRequested || (pt.maxRecords > 0 && recordCount >= pt.maxRecords) {
if partsReceived >= partsRequested {
if pt.partitionFilter != nil && recordCount > 0 {
pt.partitionFilter.done = true
}
return true, nil
}

if pt.maxRecords > 0 && recordCount >= pt.maxRecords {
return true, nil
}

Expand Down Expand Up @@ -320,13 +352,3 @@ func (np *nodePartitions) addPartition(part *partitionStatus) {
}
np.partsRequested++
}

type partitionStatus struct {
id int
done bool
digest []byte
}

func newPartitionStatus(id int) *partitionStatus {
return &partitionStatus{id: id}
}
23 changes: 22 additions & 1 deletion scan_test.go
Expand Up @@ -43,7 +43,7 @@ var _ = gg.Describe("Scan operations", func() {

// read all records from the channel and make sure all of them are returned
// if cancelCnt is set, it will cancel the scan after specified record count
var checkResults = func(recordset *as.Recordset, cancelCnt int, checkLDT bool) {
var checkResults = func(recordset *as.Recordset, cancelCnt int, checkLDT bool) int {
counter := 0
for res := range recordset.Results() {
gm.Expect(res.Err).ToNot(gm.HaveOccurred())
Expand Down Expand Up @@ -73,6 +73,7 @@ var _ = gg.Describe("Scan operations", func() {
}

gm.Expect(counter).To(gm.BeNumerically(">", 0))
return counter
}

gg.BeforeEach(func() {
Expand All @@ -92,6 +93,26 @@ var _ = gg.Describe("Scan operations", func() {
var scanPolicy = as.NewScanPolicy()
scanPolicy.FailOnClusterChange = failOnClusterChange

gg.It("must Scan and paginate to get all records back from all partitions concurrently", func() {
gm.Expect(len(keys)).To(gm.Equal(keyCount))

pf := as.NewPartitionFilterAll()
spolicy := as.NewScanPolicy()
spolicy.MaxRecords = 30

received := 0
for received < keyCount {
recordset, err := client.ScanPartitions(spolicy, pf, ns, set)
gm.Expect(err).ToNot(gm.HaveOccurred())

recs := checkResults(recordset, 0, false)
gm.Expect(recs).To(gm.BeNumerically("<=", int(spolicy.MaxRecords)))
received += recs
}

gm.Expect(len(keys)).To(gm.Equal(0))
})

gg.It("must Scan and get all records back from all partitions concurrently", func() {
gm.Expect(len(keys)).To(gm.Equal(keyCount))

Expand Down

0 comments on commit bc1764e

Please sign in to comment.