Skip to content

Commit

Permalink
support computesplitpointsbysize
Browse files Browse the repository at this point in the history
  • Loading branch information
danielxiaoran committed Jan 5, 2018
1 parent 69b98eb commit 734340e
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 3 deletions.
15 changes: 15 additions & 0 deletions sample/TableOperation.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,18 @@ func DescribeTableSample(client *tablestore.TableStoreClient, tableName string)
fmt.Println("DescribeTableSample finished. Table meta:", describ.TableOption.MaxVersion, describ.TableOption.TimeToAlive)
}
}

func ComputeSplitPointsBySize(client *tablestore.TableStoreClient, tableName string){
req := &tablestore.ComputeSplitPointsBySizeRequest{TableName: "table1", SplitSize: int64(1)}
va, err := client.ComputeSplitPointsBySize(req)
if err != nil {
fmt.Println(err)
}

for _, val := range (va.Splits) {
fmt.Println(val.Location)
fmt.Println(*val.LowerBound)
fmt.Println(*val.UpperBound)
}
return
}
65 changes: 62 additions & 3 deletions tablestore/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
describeStreamUri = "/DescribeStream"
getShardIteratorUri = "/GetShardIterator"
getStreamRecordUri = "/GetStreamRecord"
computeSplitPointsBySizeRequestUri = "/ComputeSplitPointsBySize"
)

// Constructor: to create the client of TableStore service.
Expand Down Expand Up @@ -287,7 +288,7 @@ func (tableStoreClient *TableStoreClient) CreateTable(request *CreateTableReques
ExpirationTime: &request.StreamSpec.ExpirationTime}
req.StreamSpec = &ss
}

resp := new(otsprotocol.CreateTableResponse)
response := &CreateTableResponse{}
if err := tableStoreClient.doRequestWithRetry(createTableUri, req, resp); err != nil {
Expand Down Expand Up @@ -841,7 +842,7 @@ func (client *TableStoreClient) ListStream(req *ListStreamRequest) (*ListStreamR
if err := client.doRequestWithRetry(listStreamUri, pbReq, &pbResp); err != nil {
return nil, err
}

resp := ListStreamResponse{}
streams := make([]Stream, len(pbResp.Streams))
for i, pbStream := range pbResp.Streams {
Expand Down Expand Up @@ -894,7 +895,7 @@ func (client *TableStoreClient) GetShardIterator(req *GetShardIteratorRequest) (
pbReq := &otsprotocol.GetShardIteratorRequest{
StreamId: (*string)(req.StreamId),
ShardId: (*string)(req.ShardId)}

pbResp:= otsprotocol.GetShardIteratorResponse{}
if err := client.doRequestWithRetry(getShardIteratorUri, pbReq, &pbResp); err != nil {
return nil, err
Expand Down Expand Up @@ -984,3 +985,61 @@ func (client TableStoreClient) GetStreamRecord(req *GetStreamRecordRequest) (*Ge
resp.Records = records
return &resp, nil
}

func (client TableStoreClient) ComputeSplitPointsBySize(req *ComputeSplitPointsBySizeRequest) (*ComputeSplitPointsBySizeResponse, error) {
pbReq := &otsprotocol.ComputeSplitPointsBySizeRequest{
TableName: &(req.TableName),
SplitSize: &(req.SplitSize),
}

pbResp:= otsprotocol.ComputeSplitPointsBySizeResponse{}
if err := client.doRequestWithRetry(computeSplitPointsBySizeRequestUri, pbReq, &pbResp); err != nil {
return nil, err
}

resp := ComputeSplitPointsBySizeResponse{}
fmt.Println(len(pbResp.SplitPoints))
fmt.Println(len(pbResp.Locations))

beginPk := &PrimaryKey{}
endPk := &PrimaryKey{}
for _, pkSchema := range pbResp.Schema {
beginPk.AddPrimaryKeyColumnWithMinValue(*pkSchema.Name)
endPk.AddPrimaryKeyColumnWithMaxValue(*pkSchema.Name)
}
lastPk := beginPk
nowPk := endPk

for _, pbRecord := range pbResp.SplitPoints {
plainRows, err := readRowsWithHeader(bytes.NewReader(pbRecord))
if err !=nil {
return nil, err
}

nowPk = &PrimaryKey{}
for _, pk := range (plainRows[0].primaryKey) {
nowPk.AddPrimaryKeyColumn(string(pk.cellName), pk.cellValue.Value)
}

newSplit := &Split{LowerBound: lastPk, UpperBound: nowPk}
resp.Splits = append(resp.Splits, newSplit)
lastPk = nowPk

}

newSplit := &Split{LowerBound: lastPk, UpperBound: nowPk}
resp.Splits = append(resp.Splits, newSplit)

index:=0
for _, pbLocation := range pbResp.Locations {
count := *pbLocation.Repeat
value := *pbLocation.Location

for i := int64(0) ; i < count; i++{
resp.Splits[index].Location = value
index++
}
}
return &resp, nil
}

17 changes: 17 additions & 0 deletions tablestore/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,23 @@ type GetStreamRecordResponse struct {
NextShardIterator *ShardIterator // optional. an indicator to be used to read more records in this shard
}


type ComputeSplitPointsBySizeRequest struct {
TableName string
SplitSize int64
}

type ComputeSplitPointsBySizeResponse struct {
SchemaEntry []*PrimaryKeySchema
Splits []*Split
}

type Split struct {
LowerBound *PrimaryKey
UpperBound *PrimaryKey
Location string
}

type StreamId string
type ShardId string
type ShardIterator string
Expand Down

0 comments on commit 734340e

Please sign in to comment.