Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
Actimermao committed Oct 12, 2020
2 parents 55504aa + e316256 commit 0cf9b82
Show file tree
Hide file tree
Showing 16 changed files with 3,972 additions and 1,132 deletions.
7 changes: 2 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
language: go
go:
- 1.4
- 1.5
- 1.6
- 1.7
- 1.13
install:
- go get golang.org/x/tools/cmd/cover
- go get github.com/mattn/goveralls
- go get gopkg.in/check.v1
- go get github.com/golang/protobuf/proto
- go get github.com/stretchr/testify/assert
- go get github.com/stretchr/testify/assert@v1.4.0
script:
- cd tablestore
- travis_wait 30 go test -v -covermode=count -coverprofile=coverage.out -timeout=30m
Expand Down
3 changes: 3 additions & 0 deletions sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,7 @@ func main() {
sample.WriteDataForAggregationAndGroupBy(client, "agg_sample_table")
sample.AggregationSample(client, "agg_sample_table", "agg_sample_index")
sample.GroupBySample(client, "agg_sample_table", "agg_sample_index")

sample.ParallelScanSingleConcurrency(client, "scan_sample_table", "scan_sample_index")
sample.ParallelScanMultiConcurrency(client, "scan_sample_table", "scan_sample_index")
}
106 changes: 105 additions & 1 deletion sample/SearchIndexOperation.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package sample
import (
"encoding/json"
"fmt"

"sync"
"github.com/aliyun/aliyun-tablestore-go-sdk/tablestore"
"github.com/aliyun/aliyun-tablestore-go-sdk/tablestore/search"
"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -1260,4 +1260,108 @@ func GroupBySample(client *tablestore.TableStoreClient, tableName string, indexN
for _, item := range group4.Items { //遍历返回的所有分桶
fmt.Println("\t[", item.From, ", ", item.To, "), rowCount: ", item.RowCount) //打印本次分桶的行数
}
}

func computeSplits(client *tablestore.TableStoreClient, tableName string, indexName string) (*tablestore.ComputeSplitsResponse, error) {
req := &tablestore.ComputeSplitsRequest{}
req.
SetTableName(tableName).
SetSearchIndexSplitsOptions(tablestore.SearchIndexSplitsOptions{IndexName:indexName})
res, err := client.ComputeSplits(req)
if err != nil {
return nil, err
}
return res, nil
}

/**
* ParallelScan单并发
*/
func ParallelScanSingleConcurrency(client *tablestore.TableStoreClient, tableName string, indexName string) {
computeSplitsResp, err := computeSplits(client, tableName, indexName)
if err != nil {
fmt.Printf("%#v", err)
return
}

query := search.NewScanQuery().SetQuery(&search.MatchAllQuery{}).SetLimit(2)

req := &tablestore.ParallelScanRequest{}
req.SetTableName(tableName).
SetIndexName(indexName).
SetColumnsToGet(&tablestore.ColumnsToGet{ReturnAllFromIndex: false}).
SetScanQuery(query).
SetSessionId(computeSplitsResp.SessionId)

res, err := client.ParallelScan(req)
if err != nil {
fmt.Printf("%#v", err)
return
}

total := len(res.Rows)
for res.NextToken != nil {
req.SetScanQuery(query.SetToken(res.NextToken))
res, err = client.ParallelScan(req)
if err != nil {
fmt.Printf("%#v", err)
return
}

total += len(res.Rows) //process rows each loop
}
fmt.Println("total: ", total)
}

/**
* ParallelScan多并发
*/
func ParallelScanMultiConcurrency(client *tablestore.TableStoreClient, tableName string, indexName string) {
computeSplitsResp, err := computeSplits(client, tableName, indexName)
if err != nil {
fmt.Printf("%#v", err)
return
}

var wg sync.WaitGroup
wg.Add(int(computeSplitsResp.SplitsSize))

for i := int32(0); i < computeSplitsResp.SplitsSize; i++ {
current := i
go func() {
defer wg.Done()
query := search.NewScanQuery().
SetQuery(&search.MatchAllQuery{}).
SetCurrentParallelID(current).
SetMaxParallel(computeSplitsResp.SplitsSize).
SetLimit(2)

req := &tablestore.ParallelScanRequest{}
req.SetTableName(tableName).
SetIndexName(indexName).
SetColumnsToGet(&tablestore.ColumnsToGet{ReturnAllFromIndex: false}).
SetScanQuery(query).
SetSessionId(computeSplitsResp.SessionId)

res, err := client.ParallelScan(req)
if err != nil {
fmt.Printf("%#v", err)
return
}

total := len(res.Rows)
for res.NextToken != nil {
req.SetScanQuery(query.SetToken(res.NextToken))
res, err = client.ParallelScan(req)
if err != nil {
fmt.Printf("%#v", err)
return
}

total += len(res.Rows) //process rows each loop
}
fmt.Println("total: ", total)
}()
}
wg.Wait()
}
30 changes: 29 additions & 1 deletion tablestore/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ const (
listSearchIndexUri = "/ListSearchIndex"
deleteSearchIndexUri = "/DeleteSearchIndex"
describeSearchIndexUri = "/DescribeSearchIndex"
computeSplitsUri = "/ComputeSplits"
parallelScanUri = "/ParallelScan"

createIndexUri = "/CreateIndex"
dropIndexUri = "/DropIndex"
Expand Down Expand Up @@ -239,7 +241,8 @@ func isIdempotent(action string) bool {
if action == batchGetRowUri || action == describeTableUri ||
action == getRangeUri || action == getRowUri ||
action == listTableUri || action == listStreamUri ||
action == getStreamRecordUri || action == describeStreamUri {
action == getStreamRecordUri || action == describeStreamUri ||
action == computeSplitsUri || action == parallelScanUri {
return true
} else {
return false
Expand Down Expand Up @@ -1390,3 +1393,28 @@ func (client *TableStoreClient) DescribeDeliveryTask(request *DescribeDeliveryTa
response.TaskSyncStat = toTaskSyncStat(pbResp.TaskSyncStat)
return response, nil
}

func (client *TableStoreClient) ComputeSplits(request *ComputeSplitsRequest) (*ComputeSplitsResponse, error) {
req := new(otsprotocol.ComputeSplitsRequest)
resp := new(otsprotocol.ComputeSplitsResponse)

req.TableName = proto.String(request.TableName)
req.SearchIndexSplitsOptions = new(otsprotocol.SearchIndexSplitsOptions)
if request.searchIndexSplitsOptions != nil {
req.SearchIndexSplitsOptions.IndexName = proto.String(request.searchIndexSplitsOptions.IndexName)
}

response := &ComputeSplitsResponse{}
if err := client.doRequestWithRetry(computeSplitsUri, req, resp, &response.ResponseInfo); err != nil {
return nil, err
}

if resp.SessionId != nil && len(resp.SessionId) > 0 {
response.SessionId = resp.SessionId
}
if resp.SplitsSize != nil {
response.SplitsSize = *resp.SplitsSize
}

return response, nil
}
3 changes: 3 additions & 0 deletions tablestore/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,7 @@ type TableStoreApi interface {
ListSearchIndex(request *ListSearchIndexRequest) (*ListSearchIndexResponse, error)
DescribeSearchIndex(request *DescribeSearchIndexRequest) (*DescribeSearchIndexResponse, error)
Search(request *SearchRequest) (*SearchResponse, error)

ComputeSplits(request *ComputeSplitsRequest) (*ComputeSplitsResponse, error)
ParallelScan(request *ParallelScanRequest) (*ParallelScanResponse, error)
}
26 changes: 26 additions & 0 deletions tablestore/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,4 +875,30 @@ type AbortTransactionRequest struct {

type AbortTransactionResponse struct {
ResponseInfo
}

// compute splits
type SearchIndexSplitsOptions struct {
IndexName string
}

type ComputeSplitsRequest struct {
TableName string
searchIndexSplitsOptions *SearchIndexSplitsOptions
}

type ComputeSplitsResponse struct {
SessionId []byte
SplitsSize int32
ResponseInfo
}

func (r *ComputeSplitsRequest) SetTableName(tableName string) *ComputeSplitsRequest {
r.TableName = tableName
return r
}

func (r *ComputeSplitsRequest) SetSearchIndexSplitsOptions(options SearchIndexSplitsOptions) *ComputeSplitsRequest {
r.searchIndexSplitsOptions = &options
return r
}

0 comments on commit 0cf9b82

Please sign in to comment.