diff --git a/examples/scan_paginate/scan_paginate.go b/examples/scan_paginate/scan_paginate.go new file mode 100644 index 00000000..fd79c1c3 --- /dev/null +++ b/examples/scan_paginate/scan_paginate.go @@ -0,0 +1,64 @@ +/* + * Copyright 2014-2021 Aerospike, Inc. + * + * Portions may be licensed to Aerospike, Inc. under one or more contributor + * license agreements. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package main + +import ( + "log" + "time" + + as "github.com/aerospike/aerospike-client-go" + shared "github.com/aerospike/aerospike-client-go/examples/shared" +) + +func main() { + runExample(shared.Client) + + log.Println("Example finished successfully.") +} + +func runExample(client *as.Client) { + log.Printf("Scan parallel: namespace=" + *shared.Namespace + " set=" + *shared.Set) + recordCount := 0 + begin := time.Now() + policy := as.NewScanPolicy() + policy.MaxRecords = 30 + + pf := as.NewPartitionFilterAll() + + receivedRecords := 1 + for receivedRecords > 0 { + receivedRecords = 0 + + log.Println("Scanning Page:", recordCount/int(policy.MaxRecords)) + recordset, err := client.ScanPartitions(policy, pf, *shared.Namespace, *shared.Set) + shared.PanicOnError(err) + + for rec := range recordset.Results() { + if rec.Err != nil { + // if there was an error, stop + shared.PanicOnError(err) + } + + recordCount++ + receivedRecords++ + } + } + + log.Println("Total records returned: ", recordCount) + log.Println("Elapsed time: ", time.Since(begin), " seconds") +} diff --git a/examples/scan_parallel/scan_parallel.go b/examples/scan_parallel/scan_parallel.go index df2d2e31..779dd1a6 100644 --- a/examples/scan_parallel/scan_parallel.go +++ b/examples/scan_parallel/scan_parallel.go @@ -33,28 +33,24 @@ func main() { func runExample(client *as.Client) { log.Printf("Scan parallel: namespace=" + *shared.Namespace + " set=" + *shared.Set) + recordCount := 0 begin := time.Now() policy := as.NewScanPolicy() recordset, err := client.ScanAll(policy, *shared.Namespace, *shared.Set) shared.PanicOnError(err) -L: - for { - select { - case rec := <-recordset.Records: - if rec == nil { - break L - } - recordCount++ - - if (recordCount % 10000) == 0 { - log.Println("Records ", recordCount) - } - case err := <-recordset.Errors: + for rec := range recordset.Results() { + if rec.Err != nil { // if there was an error, stop shared.PanicOnError(err) } + + recordCount++ + + if (recordCount % 100000) == 0 { + log.Println("Records ", recordCount) + } } end := time.Now() diff --git a/partition_filter.go b/partition_filter.go index aa88fac8..562b1f1c 100644 --- a/partition_filter.go +++ b/partition_filter.go @@ -17,9 +17,17 @@ package aerospike // PartitionFilter is used in scan/queries. type PartitionFilter struct { - begin int - count int - digest []byte + begin int + count int + digest []byte + partitions []*partitionStatus + done bool +} + +// NewPartitionFilterAll creates a partition filter that +// reads all the partitions. +func NewPartitionFilterAll() *PartitionFilter { + return newPartitionFilter(0, _PARTITIONS) } // NewPartitionFilterById creates a partition filter by partition id. @@ -44,3 +52,9 @@ func NewPartitionFilterByKey(key *Key) *PartitionFilter { func newPartitionFilter(begin, count int) *PartitionFilter { return &PartitionFilter{begin: begin, count: count} } + +// IsDone returns - if using ScanPolicy.MaxRecords or QueryPolicy,MaxRecords - +// if the previous paginated scans with this partition filter instance return all records? +func (pf *PartitionFilter) IsDone() bool { + return pf.done +} diff --git a/partition_status.go b/partition_status.go new file mode 100644 index 00000000..20d2fe8e --- /dev/null +++ b/partition_status.go @@ -0,0 +1,26 @@ +// Copyright 2014-2021 Aerospike, Inc. +// +// Portions may be licensed to Aerospike, Inc. under one or more contributor +// license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +package aerospike + +type partitionStatus struct { + id int + done bool + digest []byte +} + +func newPartitionStatus(id int) *partitionStatus { + return &partitionStatus{id: id} +} diff --git a/partition_tracker.go b/partition_tracker.go index d9decdbb..76d17e18 100644 --- a/partition_tracker.go +++ b/partition_tracker.go @@ -29,6 +29,7 @@ type partitionTracker struct { partitionBegin int nodeCapacity int nodeFilter *Node + partitionFilter *PartitionFilter nodePartitionsList []*nodePartitions partitionsCapacity int maxRecords int64 @@ -52,7 +53,8 @@ func newPartitionTrackerForNodes(policy *MultiPolicy, nodes []*Node) *partitionT maxRecords: policy.MaxRecords, } - pt.partitionsAll = pt.initPartitionTracker(policy, _PARTITIONS, nil) + pt.partitionsAll = pt.initPartitions(policy, _PARTITIONS, nil) + pt.initTimeout(policy) return &pt } @@ -65,7 +67,8 @@ func newPartitionTrackerForNode(policy *MultiPolicy, nodeFilter *Node) *partitio maxRecords: policy.MaxRecords, } - pt.partitionsAll = pt.initPartitionTracker(policy, _PARTITIONS, nil) + pt.partitionsAll = pt.initPartitions(policy, _PARTITIONS, nil) + pt.initTimeout(policy) return &pt } @@ -94,12 +97,34 @@ func newPartitionTracker(policy *MultiPolicy, filter *PartitionFilter, nodes []* maxRecords: policy.MaxRecords, } - pt.partitionsAll = pt.initPartitionTracker(policy, filter.count, filter.digest) + if len(filter.partitions) == 0 { + filter.partitions = pt.initPartitions(policy, filter.count, filter.digest) + } else { + for _, part := range filter.partitions { + part.done = false + } + + } + pt.partitionsAll = filter.partitions + pt.partitionFilter = filter + pt.initTimeout(policy) return pt } -func (pt *partitionTracker) initPartitionTracker(policy *MultiPolicy, partitionCount int, digest []byte) []*partitionStatus { +func (pt *partitionTracker) initTimeout(policy *MultiPolicy) { + pt.sleepBetweenRetries = policy.SleepBetweenRetries + pt.socketTimeout = policy.SocketTimeout + pt.totalTimeout = policy.TotalTimeout + if pt.totalTimeout > 0 { + pt.deadline = time.Now().Add(pt.totalTimeout) + if pt.socketTimeout == 0 || pt.socketTimeout > pt.totalTimeout { + pt.socketTimeout = pt.totalTimeout + } + } +} + +func (pt *partitionTracker) initPartitions(policy *MultiPolicy, partitionCount int, digest []byte) []*partitionStatus { partsAll := make([]*partitionStatus, partitionCount) for i := 0; i < partitionCount; i++ { @@ -227,7 +252,14 @@ func (pt *partitionTracker) isComplete(policy *BasePolicy) (bool, error) { partsReceived += np.partsReceived } - if partsReceived >= partsRequested || (pt.maxRecords > 0 && recordCount >= pt.maxRecords) { + if partsReceived >= partsRequested { + if pt.partitionFilter != nil && recordCount > 0 { + pt.partitionFilter.done = true + } + return true, nil + } + + if pt.maxRecords > 0 && recordCount >= pt.maxRecords { return true, nil } @@ -320,13 +352,3 @@ func (np *nodePartitions) addPartition(part *partitionStatus) { } np.partsRequested++ } - -type partitionStatus struct { - id int - done bool - digest []byte -} - -func newPartitionStatus(id int) *partitionStatus { - return &partitionStatus{id: id} -} diff --git a/scan_test.go b/scan_test.go index 1e41d716..ae0ab903 100644 --- a/scan_test.go +++ b/scan_test.go @@ -43,7 +43,7 @@ var _ = gg.Describe("Scan operations", func() { // read all records from the channel and make sure all of them are returned // if cancelCnt is set, it will cancel the scan after specified record count - var checkResults = func(recordset *as.Recordset, cancelCnt int, checkLDT bool) { + var checkResults = func(recordset *as.Recordset, cancelCnt int, checkLDT bool) int { counter := 0 for res := range recordset.Results() { gm.Expect(res.Err).ToNot(gm.HaveOccurred()) @@ -73,6 +73,7 @@ var _ = gg.Describe("Scan operations", func() { } gm.Expect(counter).To(gm.BeNumerically(">", 0)) + return counter } gg.BeforeEach(func() { @@ -92,6 +93,26 @@ var _ = gg.Describe("Scan operations", func() { var scanPolicy = as.NewScanPolicy() scanPolicy.FailOnClusterChange = failOnClusterChange + gg.It("must Scan and paginate to get all records back from all partitions concurrently", func() { + gm.Expect(len(keys)).To(gm.Equal(keyCount)) + + pf := as.NewPartitionFilterAll() + spolicy := as.NewScanPolicy() + spolicy.MaxRecords = 30 + + received := 0 + for received < keyCount { + recordset, err := client.ScanPartitions(spolicy, pf, ns, set) + gm.Expect(err).ToNot(gm.HaveOccurred()) + + recs := checkResults(recordset, 0, false) + gm.Expect(recs).To(gm.BeNumerically("<=", int(spolicy.MaxRecords))) + received += recs + } + + gm.Expect(len(keys)).To(gm.Equal(0)) + }) + gg.It("must Scan and get all records back from all partitions concurrently", func() { gm.Expect(len(keys)).To(gm.Equal(keyCount))