From db6757afe1bf674e5200e9dcdd6fbc8af0dec83f Mon Sep 17 00:00:00 2001 From: Da Zhou Date: Tue, 9 Jan 2018 19:58:11 +0800 Subject: [PATCH 1/2] fix "OTSParameterInvalid Expiration time is invalid when enableStream is false" bug --- tablestore/api.go | 226 ++++++++++++++++++++++++---------------------- 1 file changed, 116 insertions(+), 110 deletions(-) diff --git a/tablestore/api.go b/tablestore/api.go index 20c1305..5eb4531 100644 --- a/tablestore/api.go +++ b/tablestore/api.go @@ -1,37 +1,38 @@ package tablestore import ( - "fmt" - "time" "bytes" - "net/http" "crypto/md5" "encoding/base64" - "net" + "fmt" "math/rand" - "github.com/golang/protobuf/proto" + "net" + "net/http" + "time" + "github.com/aliyun/aliyun-tablestore-go-sdk/tablestore/otsprotocol" + "github.com/golang/protobuf/proto" ) const ( userAgent = "aliyun-tablestore-sdk-golang/4.0.2" - createTableUri = "/CreateTable" - listTableUri = "/ListTable" - deleteTableUri = "/DeleteTable" - describeTableUri = "/DescribeTable" - updateTableUri = "/UpdateTable" - putRowUri = "/PutRow" - deleteRowUri = "/DeleteRow" - getRowUri = "/GetRow" - updateRowUri = "/UpdateRow" - batchGetRowUri = "/BatchGetRow" - batchWriteRowUri = "/BatchWriteRow" - getRangeUri = "/GetRange" - listStreamUri = "/ListStream" - describeStreamUri = "/DescribeStream" - getShardIteratorUri = "/GetShardIterator" - getStreamRecordUri = "/GetStreamRecord" + createTableUri = "/CreateTable" + listTableUri = "/ListTable" + deleteTableUri = "/DeleteTable" + describeTableUri = "/DescribeTable" + updateTableUri = "/UpdateTable" + putRowUri = "/PutRow" + deleteRowUri = "/DeleteRow" + getRowUri = "/GetRow" + updateRowUri = "/UpdateRow" + batchGetRowUri = "/BatchGetRow" + batchWriteRowUri = "/BatchWriteRow" + getRangeUri = "/GetRange" + listStreamUri = "/ListStream" + describeStreamUri = "/DescribeStream" + getShardIteratorUri = "/GetShardIterator" + getStreamRecordUri = "/GetStreamRecord" computeSplitPointsBySizeRequestUri = "/ComputeSplitPointsBySize" ) @@ -82,8 +83,8 @@ func NewClientWithConfig(endPoint, instanceName, accessKeyId, accessKeySecret st tableStoreClient.httpClient = currentGetHttpClientFunc() httpClient := &http.Client{ - Transport:tableStoreTransportProxy, - Timeout: tableStoreClient.config.HTTPTimeout.RequestTimeout, + Transport: tableStoreTransportProxy, + Timeout: tableStoreClient.config.HTTPTimeout.RequestTimeout, } tableStoreClient.httpClient.New(httpClient) @@ -105,7 +106,7 @@ func (tableStoreClient *TableStoreClient) doRequestWithRetry(uri string, req, re return err } } else { - body = nil; + body = nil } var value int64 @@ -133,7 +134,7 @@ func (tableStoreClient *TableStoreClient) doRequestWithRetry(uri string, req, re if errn != nil { return fmt.Errorf("decode resp failed: %s: %s: %s %s", errn, err, string(respBody), requestId) } else { - return fmt.Errorf("%s %s %s", *e.Code, *e.Message ,requestId) + return fmt.Errorf("%s %s %s", *e.Code, *e.Message, requestId) } } @@ -159,7 +160,7 @@ func getNextPause(tableStoreClient *TableStoreClient, err error, serverError *ot } else if err == nil && !shouldRetry(*serverError.Code, *serverError.Message, action, statusCode) { return 0 } else { - value := lastInterval * 2 + tableStoreClient.random.Int63n(DefaultRetryInterval - 1) + 1 + value := lastInterval*2 + tableStoreClient.random.Int63n(DefaultRetryInterval-1) + 1 if value > MaxRetryInterval { return MaxRetryInterval } @@ -170,34 +171,34 @@ func getNextPause(tableStoreClient *TableStoreClient, err error, serverError *ot func shouldRetry(errorCode string, errorMsg string, action string, httpStatus int) bool { if retryNotMatterActions(errorCode, errorMsg) == true { - return true; + return true } - serverError := httpStatus >= 500 && httpStatus <= 599; - if (isIdempotent(action) && - ( errorCode == STORAGE_TIMEOUT || errorCode == INTERNAL_SERVER_ERROR || errorCode == SERVER_UNAVAILABLE || serverError)) { - return true; + serverError := httpStatus >= 500 && httpStatus <= 599 + if isIdempotent(action) && + (errorCode == STORAGE_TIMEOUT || errorCode == INTERNAL_SERVER_ERROR || errorCode == SERVER_UNAVAILABLE || serverError) { + return true } - return false; + return false } func retryNotMatterActions(errorCode string, errorMsg string) bool { - if (errorCode == ROW_OPERATION_CONFLICT || errorCode == NOT_ENOUGH_CAPACITY_UNIT || + if errorCode == ROW_OPERATION_CONFLICT || errorCode == NOT_ENOUGH_CAPACITY_UNIT || errorCode == TABLE_NOT_READY || errorCode == PARTITION_UNAVAILABLE || - errorCode == SERVER_BUSY || (errorCode == QUOTA_EXHAUSTED && errorMsg == "Too frequent table operations.")) { - return true; + errorCode == SERVER_BUSY || (errorCode == QUOTA_EXHAUSTED && errorMsg == "Too frequent table operations.") { + return true } else { - return false; + return false } } func isIdempotent(action string) bool { - if (action == batchGetRowUri || action == describeTableUri || + if action == batchGetRowUri || action == describeTableUri || action == getRangeUri || action == getRowUri || - action == listTableUri) { - return true; + action == listTableUri { + return true } else { - return false; + return false } } @@ -232,7 +233,9 @@ func (tableStoreClient *TableStoreClient) doRequest(url string, uri string, body otshead.set(xOtsInstanceName, tableStoreClient.instanceName) sign, err := otshead.signature(uri, "POST", tableStoreClient.accessKeySecret) - if err != nil { return nil, err, 0 , ""} + if err != nil { + return nil, err, 0, "" + } hreq.Header.Set(xOtsSignature, sign) /* end set headers */ @@ -263,11 +266,11 @@ func (tableStoreClient *TableStoreClient) CreateTable(request *CreateTableReques req.TableMeta = new(otsprotocol.TableMeta) req.TableMeta.TableName = proto.String(request.TableMeta.TableName) - for _, key := range (request.TableMeta.SchemaEntry) { + for _, key := range request.TableMeta.SchemaEntry { keyType := otsprotocol.PrimaryKeyType(*key.Type) if key.Option != nil { keyOption := otsprotocol.PrimaryKeyOption(*key.Option) - req.TableMeta.PrimaryKey = append(req.TableMeta.PrimaryKey, &otsprotocol.PrimaryKeySchema{Name: key.Name, Type: &keyType, Option: &keyOption }) + req.TableMeta.PrimaryKey = append(req.TableMeta.PrimaryKey, &otsprotocol.PrimaryKeySchema{Name: key.Name, Type: &keyType, Option: &keyOption}) } else { req.TableMeta.PrimaryKey = append(req.TableMeta.PrimaryKey, &otsprotocol.PrimaryKeySchema{Name: key.Name, Type: &keyType}) } @@ -284,7 +287,7 @@ func (tableStoreClient *TableStoreClient) CreateTable(request *CreateTableReques if request.StreamSpec != nil { ss := otsprotocol.StreamSpecification{ - EnableStream: &request.StreamSpec.EnableStream, + EnableStream: &request.StreamSpec.EnableStream, ExpirationTime: &request.StreamSpec.ExpirationTime} req.StreamSpec = &ss } @@ -349,23 +352,23 @@ func (tableStoreClient *TableStoreClient) DescribeTable(request *DescribeTableRe responseTableMeta := new(TableMeta) responseTableMeta.TableName = *resp.TableMeta.TableName - for _, key := range (resp.TableMeta.PrimaryKey) { + for _, key := range resp.TableMeta.PrimaryKey { keyType := PrimaryKeyType(*key.Type) // enable it when we support kep option in describe table if key.Option != nil { keyOption := PrimaryKeyOption(*key.Option) - responseTableMeta.SchemaEntry = append(responseTableMeta.SchemaEntry, &PrimaryKeySchema{Name: key.Name, Type: &keyType, Option: &keyOption }) + responseTableMeta.SchemaEntry = append(responseTableMeta.SchemaEntry, &PrimaryKeySchema{Name: key.Name, Type: &keyType, Option: &keyOption}) } else { - responseTableMeta.SchemaEntry = append(responseTableMeta.SchemaEntry, &PrimaryKeySchema{Name: key.Name, Type: &keyType }) + responseTableMeta.SchemaEntry = append(responseTableMeta.SchemaEntry, &PrimaryKeySchema{Name: key.Name, Type: &keyType}) } } response.TableMeta = responseTableMeta response.TableOption = &TableOption{TimeToAlive: int(*resp.TableOptions.TimeToLive), MaxVersion: int(*resp.TableOptions.MaxVersions)} if resp.StreamDetails != nil && *resp.StreamDetails.EnableStream { response.StreamDetails = &StreamDetails{ - EnableStream: *resp.StreamDetails.EnableStream, - StreamId: (*StreamId)(resp.StreamDetails.StreamId), + EnableStream: *resp.StreamDetails.EnableStream, + StreamId: (*StreamId)(resp.StreamDetails.StreamId), ExpirationTime: *resp.StreamDetails.ExpirationTime, LastEnableTime: *resp.StreamDetails.LastEnableTime} } else { @@ -383,23 +386,28 @@ func (tableStoreClient *TableStoreClient) UpdateTable(request *UpdateTableReques req := new(otsprotocol.UpdateTableRequest) req.TableName = proto.String(request.TableName) - if (request.ReservedThroughput != nil) { + if request.ReservedThroughput != nil { req.ReservedThroughput = new(otsprotocol.ReservedThroughput) req.ReservedThroughput.CapacityUnit = new(otsprotocol.CapacityUnit) req.ReservedThroughput.CapacityUnit.Read = proto.Int32(int32(request.ReservedThroughput.Readcap)) req.ReservedThroughput.CapacityUnit.Write = proto.Int32(int32(request.ReservedThroughput.Writecap)) } - if (request.TableOption != nil) { + if request.TableOption != nil { req.TableOptions = new(otsprotocol.TableOptions) req.TableOptions.TimeToLive = proto.Int32(int32(request.TableOption.TimeToAlive)) req.TableOptions.MaxVersions = proto.Int32(int32(request.TableOption.MaxVersion)) } if request.StreamSpec != nil { + var expirationTime *int32 + if request.StreamSpec.EnableStream { + expirationTime = &request.StreamSpec.ExpirationTime + } req.StreamSpec = &otsprotocol.StreamSpecification{ - EnableStream: &request.StreamSpec.EnableStream, - ExpirationTime: &request.StreamSpec.ExpirationTime} + EnableStream: &request.StreamSpec.EnableStream, + ExpirationTime: expirationTime, + } } resp := new(otsprotocol.UpdateTableResponse) @@ -410,15 +418,15 @@ func (tableStoreClient *TableStoreClient) UpdateTable(request *UpdateTableReques response := new(UpdateTableResponse) response.ReservedThroughput = &ReservedThroughput{ - Readcap: int(*(resp.ReservedThroughputDetails.CapacityUnit.Read)), + Readcap: int(*(resp.ReservedThroughputDetails.CapacityUnit.Read)), Writecap: int(*(resp.ReservedThroughputDetails.CapacityUnit.Write))} response.TableOption = &TableOption{ TimeToAlive: int(*resp.TableOptions.TimeToLive), - MaxVersion: int(*resp.TableOptions.MaxVersions)} + MaxVersion: int(*resp.TableOptions.MaxVersions)} if *resp.StreamDetails.EnableStream { response.StreamDetails = &StreamDetails{ - EnableStream: *resp.StreamDetails.EnableStream, - StreamId: (*StreamId)(resp.StreamDetails.StreamId), + EnableStream: *resp.StreamDetails.EnableStream, + StreamId: (*StreamId)(resp.StreamDetails.StreamId), ExpirationTime: *resp.StreamDetails.ExpirationTime, LastEnableTime: *resp.StreamDetails.LastEnableTime} } else { @@ -455,7 +463,7 @@ func (tableStoreClient *TableStoreClient) PutRow(request *PutRowRequest) (*PutRo } if request.PutRowChange.ReturnType == ReturnType_RT_PK { - content := otsprotocol.ReturnContent{ ReturnType: otsprotocol.ReturnType_RT_PK.Enum() } + content := otsprotocol.ReturnContent{ReturnType: otsprotocol.ReturnType_RT_PK.Enum()} req.ReturnContent = &content } @@ -525,13 +533,13 @@ func (tableStoreClient *TableStoreClient) GetRow(request *GetRowRequest) (*GetRo req.MaxVersions = proto.Int32(int32(request.SingleRowQueryCriteria.MaxVersion)) } - if request.SingleRowQueryCriteria.StartColumn !=nil { + if request.SingleRowQueryCriteria.StartColumn != nil { req.StartColumn = request.SingleRowQueryCriteria.StartColumn } if request.SingleRowQueryCriteria.TimeRange != nil { - if (request.SingleRowQueryCriteria.TimeRange.Specific != 0) { - req.TimeRange = &otsprotocol.TimeRange{SpecificTime : proto.Int64(request.SingleRowQueryCriteria.TimeRange.Specific)} + if request.SingleRowQueryCriteria.TimeRange.Specific != 0 { + req.TimeRange = &otsprotocol.TimeRange{SpecificTime: proto.Int64(request.SingleRowQueryCriteria.TimeRange.Specific)} } else { req.TimeRange = &otsprotocol.TimeRange{StartTime: proto.Int64(request.SingleRowQueryCriteria.TimeRange.Start), EndTime: proto.Int64(request.SingleRowQueryCriteria.TimeRange.End)} } @@ -547,8 +555,7 @@ func (tableStoreClient *TableStoreClient) GetRow(request *GetRowRequest) (*GetRo return nil, err } - - response := &GetRowResponse{ConsumedCapacityUnit:&ConsumedCapacityUnit{}} + response := &GetRowResponse{ConsumedCapacityUnit: &ConsumedCapacityUnit{}} if len(resp.Row) == 0 { return response, nil } @@ -558,13 +565,13 @@ func (tableStoreClient *TableStoreClient) GetRow(request *GetRowRequest) (*GetRo return nil, err } - for _, pk := range (rows[0].primaryKey) { + for _, pk := range rows[0].primaryKey { pkColumn := &PrimaryKeyColumn{ColumnName: string(pk.cellName), Value: pk.cellValue.Value} response.PrimaryKey.PrimaryKeys = append(response.PrimaryKey.PrimaryKeys, pkColumn) } - for _, cell := range (rows[0].cells) { - dataColumn := &AttributeColumn{ColumnName: string(cell.cellName), Value: cell.cellValue.Value, Timestamp:cell.cellTimestamp} + for _, cell := range rows[0].cells { + dataColumn := &AttributeColumn{ColumnName: string(cell.cellName), Value: cell.cellValue.Value, Timestamp: cell.cellTimestamp} response.Columns = append(response.Columns, dataColumn) } @@ -587,7 +594,7 @@ func (tableStoreClient *TableStoreClient) UpdateRow(request *UpdateRowRequest) ( return nil, err } - response := &UpdateRowResponse{ConsumedCapacityUnit : &ConsumedCapacityUnit{}} + response := &UpdateRowResponse{ConsumedCapacityUnit: &ConsumedCapacityUnit{}} response.ConsumedCapacityUnit.Read = *resp.Consumed.CapacityUnit.Read response.ConsumedCapacityUnit.Write = *resp.Consumed.CapacityUnit.Write return response, nil @@ -600,7 +607,7 @@ func (tableStoreClient *TableStoreClient) BatchGetRow(request *BatchGetRowReques var tablesInBatch []*otsprotocol.TableInBatchGetRowRequest - for _, Criteria := range (request.MultiRowQueryCriteria) { + for _, Criteria := range request.MultiRowQueryCriteria { table := new(otsprotocol.TableInBatchGetRowRequest) table.TableName = proto.String(Criteria.TableName) table.ColumnsToGet = Criteria.ColumnsToGet @@ -608,13 +615,13 @@ func (tableStoreClient *TableStoreClient) BatchGetRow(request *BatchGetRowReques if Criteria.Filter != nil { table.Filter = Criteria.Filter.Serialize() } - if (Criteria.MaxVersion != 0) { + if Criteria.MaxVersion != 0 { table.MaxVersions = proto.Int32(int32(Criteria.MaxVersion)) } if Criteria.TimeRange != nil { - if (Criteria.TimeRange.Specific != 0) { - table.TimeRange = &otsprotocol.TimeRange{SpecificTime : proto.Int64(Criteria.TimeRange.Specific)} + if Criteria.TimeRange.Specific != 0 { + table.TimeRange = &otsprotocol.TimeRange{SpecificTime: proto.Int64(Criteria.TimeRange.Specific)} } else { table.TimeRange = &otsprotocol.TimeRange{StartTime: proto.Int64(Criteria.TimeRange.Start), EndTime: proto.Int64(Criteria.TimeRange.End)} } @@ -622,7 +629,7 @@ func (tableStoreClient *TableStoreClient) BatchGetRow(request *BatchGetRowReques return nil, errInvalidInput } - for _, pk := range (Criteria.PrimaryKey) { + for _, pk := range Criteria.PrimaryKey { pkWithBytes := pk.Build(false) table.PrimaryKey = append(table.PrimaryKey, pkWithBytes) } @@ -637,30 +644,30 @@ func (tableStoreClient *TableStoreClient) BatchGetRow(request *BatchGetRowReques return nil, err } - response := &BatchGetRowResponse{TableToRowsResult:make(map[string][]RowResult) } + response := &BatchGetRowResponse{TableToRowsResult: make(map[string][]RowResult)} - for _, table := range (resp.Tables) { + for _, table := range resp.Tables { index := int32(0) - for _, row := range (table.Rows) { - rowResult := &RowResult{TableName: *table.TableName, IsSucceed: *row.IsOk, ConsumedCapacityUnit : &ConsumedCapacityUnit{}, Index: index} + for _, row := range table.Rows { + rowResult := &RowResult{TableName: *table.TableName, IsSucceed: *row.IsOk, ConsumedCapacityUnit: &ConsumedCapacityUnit{}, Index: index} index++ if *row.IsOk == false { - rowResult.Error = Error{Code: *row.Error.Code, Message: *row.Error.Message } + rowResult.Error = Error{Code: *row.Error.Code, Message: *row.Error.Message} } else { // len == 0 means row not exist - if (len(row.Row) > 0) { + if len(row.Row) > 0 { rows, err := readRowsWithHeader(bytes.NewReader(row.Row)) if err != nil { return nil, err } - for _, pk := range (rows[0].primaryKey) { + for _, pk := range rows[0].primaryKey { pkColumn := &PrimaryKeyColumn{ColumnName: string(pk.cellName), Value: pk.cellValue.Value} rowResult.PrimaryKey.PrimaryKeys = append(rowResult.PrimaryKey.PrimaryKeys, pkColumn) } - for _, cell := range (rows[0].cells) { - dataColumn := &AttributeColumn{ColumnName: string(cell.cellName), Value: cell.cellValue.Value, Timestamp:cell.cellTimestamp} + for _, cell := range rows[0].cells { + dataColumn := &AttributeColumn{ColumnName: string(cell.cellName), Value: cell.cellValue.Value, Timestamp: cell.cellTimestamp} rowResult.Columns = append(rowResult.Columns, dataColumn) } } @@ -683,11 +690,11 @@ func (tableStoreClient *TableStoreClient) BatchWriteRow(request *BatchWriteRowRe var tablesInBatch []*otsprotocol.TableInBatchWriteRowRequest - for key, value := range (request.RowChangesGroupByTable) { + for key, value := range request.RowChangesGroupByTable { table := new(otsprotocol.TableInBatchWriteRowRequest) table.TableName = proto.String(key) - for _, row := range (value) { + for _, row := range value { rowInBatch := &otsprotocol.RowInBatchWriteRowRequest{} rowInBatch.Condition = row.getCondition() rowInBatch.RowChange = row.Serialize() @@ -706,15 +713,15 @@ func (tableStoreClient *TableStoreClient) BatchWriteRow(request *BatchWriteRowRe return nil, err } - response := &BatchWriteRowResponse{TableToRowsResult:make(map[string][]RowResult) } + response := &BatchWriteRowResponse{TableToRowsResult: make(map[string][]RowResult)} - for _, table := range (resp.Tables) { + for _, table := range resp.Tables { index := int32(0) - for _, row := range (table.Rows) { - rowResult := &RowResult{TableName: *table.TableName, IsSucceed: *row.IsOk, ConsumedCapacityUnit : &ConsumedCapacityUnit{}, Index: index} + for _, row := range table.Rows { + rowResult := &RowResult{TableName: *table.TableName, IsSucceed: *row.IsOk, ConsumedCapacityUnit: &ConsumedCapacityUnit{}, Index: index} index++ if *row.IsOk == false { - rowResult.Error = Error{Code: *row.Error.Code, Message: *row.Error.Message } + rowResult.Error = Error{Code: *row.Error.Code, Message: *row.Error.Message} } else { rowResult.ConsumedCapacityUnit.Read = *row.Consumed.CapacityUnit.Read rowResult.ConsumedCapacityUnit.Write = *row.Consumed.CapacityUnit.Write @@ -756,8 +763,8 @@ func (tableStoreClient *TableStoreClient) GetRange(request *GetRangeRequest) (*G } if request.RangeRowQueryCriteria.TimeRange != nil { - if (request.RangeRowQueryCriteria.TimeRange.Specific != 0) { - req.TimeRange = &otsprotocol.TimeRange{SpecificTime : proto.Int64(request.RangeRowQueryCriteria.TimeRange.Specific)} + if request.RangeRowQueryCriteria.TimeRange.Specific != 0 { + req.TimeRange = &otsprotocol.TimeRange{SpecificTime: proto.Int64(request.RangeRowQueryCriteria.TimeRange.Specific)} } else { req.TimeRange = &otsprotocol.TimeRange{StartTime: proto.Int64(request.RangeRowQueryCriteria.TimeRange.Start), EndTime: proto.Int64(request.RangeRowQueryCriteria.TimeRange.End)} } @@ -786,7 +793,7 @@ func (tableStoreClient *TableStoreClient) GetRange(request *GetRangeRequest) (*G return nil, err } - response := &GetRangeResponse{ConsumedCapacityUnit:&ConsumedCapacityUnit{}} + response := &GetRangeResponse{ConsumedCapacityUnit: &ConsumedCapacityUnit{}} response.ConsumedCapacityUnit.Read = *resp.Consumed.CapacityUnit.Read response.ConsumedCapacityUnit.Write = *resp.Consumed.CapacityUnit.Write if len(resp.NextStartPrimaryKey) != 0 { @@ -796,7 +803,7 @@ func (tableStoreClient *TableStoreClient) GetRange(request *GetRangeRequest) (*G } response.NextStartPrimaryKey = &PrimaryKey{} - for _, pk := range (currentRows[0].primaryKey) { + for _, pk := range currentRows[0].primaryKey { pkColumn := &PrimaryKeyColumn{ColumnName: string(pk.cellName), Value: pk.cellValue.Value} response.NextStartPrimaryKey.PrimaryKeys = append(response.NextStartPrimaryKey.PrimaryKeys, pkColumn) } @@ -814,15 +821,15 @@ func (tableStoreClient *TableStoreClient) GetRange(request *GetRangeRequest) (*G for _, row := range rows { currentRow := &Row{} currentpk := new(PrimaryKey) - for _, pk := range (row.primaryKey) { + for _, pk := range row.primaryKey { pkColumn := &PrimaryKeyColumn{ColumnName: string(pk.cellName), Value: pk.cellValue.Value} currentpk.PrimaryKeys = append(currentpk.PrimaryKeys, pkColumn) } currentRow.PrimaryKey = currentpk - for _, cell := range (row.cells) { - dataColumn := &AttributeColumn{ColumnName: string(cell.cellName), Value: cell.cellValue.Value, Timestamp:cell.cellTimestamp} + for _, cell := range row.cells { + dataColumn := &AttributeColumn{ColumnName: string(cell.cellName), Value: cell.cellValue.Value, Timestamp: cell.cellTimestamp} currentRow.Columns = append(currentRow.Columns, dataColumn) } @@ -844,10 +851,10 @@ func (client *TableStoreClient) ListStream(req *ListStreamRequest) (*ListStreamR resp := ListStreamResponse{} streams := make([]Stream, len(pbResp.Streams)) - for i, pbStream := range pbResp.Streams { + for i, pbStream := range pbResp.Streams { streams[i] = Stream{ - Id: (*StreamId)(pbStream.StreamId), - TableName: pbStream.TableName, + Id: (*StreamId)(pbStream.StreamId), + TableName: pbStream.TableName, CreationTime: *pbStream.CreationTime} } resp.Streams = streams[:] @@ -861,7 +868,7 @@ func (client *TableStoreClient) DescribeStream(req *DescribeStreamRequest) (*Des pbReq.InclusiveStartShardId = (*string)(req.InclusiveStartShardId) pbReq.ShardLimit = req.ShardLimit } - pbResp:= otsprotocol.DescribeStreamResponse{} + pbResp := otsprotocol.DescribeStreamResponse{} if err := client.doRequestWithRetry(describeStreamUri, pbReq, &pbResp); err != nil { return nil, err } @@ -882,7 +889,7 @@ func (client *TableStoreClient) DescribeStream(req *DescribeStreamRequest) (*Des shards := make([]*StreamShard, len(pbResp.Shards)) for i, pbShard := range pbResp.Shards { shards[i] = &StreamShard{ - SelfShard: (*ShardId)(pbShard.ShardId), + SelfShard: (*ShardId)(pbShard.ShardId), FatherShard: (*ShardId)(pbShard.ParentId), MotherShard: (*ShardId)(pbShard.ParentSiblingId)} } @@ -893,9 +900,9 @@ func (client *TableStoreClient) DescribeStream(req *DescribeStreamRequest) (*Des func (client *TableStoreClient) GetShardIterator(req *GetShardIteratorRequest) (*GetShardIteratorResponse, error) { pbReq := &otsprotocol.GetShardIteratorRequest{ StreamId: (*string)(req.StreamId), - ShardId: (*string)(req.ShardId)} + ShardId: (*string)(req.ShardId)} - pbResp:= otsprotocol.GetShardIteratorResponse{} + pbResp := otsprotocol.GetShardIteratorResponse{} if err := client.doRequestWithRetry(getShardIteratorUri, pbReq, &pbResp); err != nil { return nil, err } @@ -912,7 +919,7 @@ func (client TableStoreClient) GetStreamRecord(req *GetStreamRecordRequest) (*Ge pbReq.Limit = req.Limit } - pbResp:= otsprotocol.GetStreamRecordResponse{} + pbResp := otsprotocol.GetStreamRecordResponse{} if err := client.doRequestWithRetry(getStreamRecordUri, pbReq, &pbResp); err != nil { return nil, err } @@ -948,7 +955,7 @@ func (client TableStoreClient) GetStreamRecord(req *GetStreamRecordRequest) (*Ge for i, pk := range plainRow.primaryKey { pkc := PrimaryKeyColumn{ ColumnName: string(pk.cellName), - Value: pk.cellValue.Value} + Value: pk.cellValue.Value} pkey.PrimaryKeys[i] = &pkc } Assert(plainRow.extension != nil, @@ -991,7 +998,7 @@ func (client TableStoreClient) ComputeSplitPointsBySize(req *ComputeSplitPointsB SplitSize: &(req.SplitSize), } - pbResp:= otsprotocol.ComputeSplitPointsBySizeResponse{} + pbResp := otsprotocol.ComputeSplitPointsBySizeResponse{} if err := client.doRequestWithRetry(computeSplitPointsBySizeRequestUri, pbReq, &pbResp); err != nil { return nil, err } @@ -1011,12 +1018,12 @@ func (client TableStoreClient) ComputeSplitPointsBySize(req *ComputeSplitPointsB for _, pbRecord := range pbResp.SplitPoints { plainRows, err := readRowsWithHeader(bytes.NewReader(pbRecord)) - if err !=nil { + if err != nil { return nil, err } nowPk = &PrimaryKey{} - for _, pk := range (plainRows[0].primaryKey) { + for _, pk := range plainRows[0].primaryKey { nowPk.AddPrimaryKeyColumn(string(pk.cellName), pk.cellValue.Value) } @@ -1029,16 +1036,15 @@ func (client TableStoreClient) ComputeSplitPointsBySize(req *ComputeSplitPointsB newSplit := &Split{LowerBound: lastPk, UpperBound: nowPk} resp.Splits = append(resp.Splits, newSplit) - index:=0 + index := 0 for _, pbLocation := range pbResp.Locations { count := *pbLocation.Repeat value := *pbLocation.Location - for i := int64(0) ; i < count; i++{ + for i := int64(0); i < count; i++ { resp.Splits[index].Location = value index++ } } return &resp, nil } - From e0edec567fa993be6715a9867e4ae53e12899142 Mon Sep 17 00:00:00 2001 From: Da Zhou Date: Wed, 10 Jan 2018 10:57:59 +0800 Subject: [PATCH 2/2] remove the changes made by gofmt --- tablestore/api.go | 219 +++++++++++++++++++++++----------------------- 1 file changed, 109 insertions(+), 110 deletions(-) diff --git a/tablestore/api.go b/tablestore/api.go index 5eb4531..55019a2 100644 --- a/tablestore/api.go +++ b/tablestore/api.go @@ -1,38 +1,37 @@ package tablestore import ( + "fmt" + "time" "bytes" + "net/http" "crypto/md5" "encoding/base64" - "fmt" - "math/rand" "net" - "net/http" - "time" - - "github.com/aliyun/aliyun-tablestore-go-sdk/tablestore/otsprotocol" + "math/rand" "github.com/golang/protobuf/proto" + "github.com/aliyun/aliyun-tablestore-go-sdk/tablestore/otsprotocol" ) const ( userAgent = "aliyun-tablestore-sdk-golang/4.0.2" - createTableUri = "/CreateTable" - listTableUri = "/ListTable" - deleteTableUri = "/DeleteTable" - describeTableUri = "/DescribeTable" - updateTableUri = "/UpdateTable" - putRowUri = "/PutRow" - deleteRowUri = "/DeleteRow" - getRowUri = "/GetRow" - updateRowUri = "/UpdateRow" - batchGetRowUri = "/BatchGetRow" - batchWriteRowUri = "/BatchWriteRow" - getRangeUri = "/GetRange" - listStreamUri = "/ListStream" - describeStreamUri = "/DescribeStream" - getShardIteratorUri = "/GetShardIterator" - getStreamRecordUri = "/GetStreamRecord" + createTableUri = "/CreateTable" + listTableUri = "/ListTable" + deleteTableUri = "/DeleteTable" + describeTableUri = "/DescribeTable" + updateTableUri = "/UpdateTable" + putRowUri = "/PutRow" + deleteRowUri = "/DeleteRow" + getRowUri = "/GetRow" + updateRowUri = "/UpdateRow" + batchGetRowUri = "/BatchGetRow" + batchWriteRowUri = "/BatchWriteRow" + getRangeUri = "/GetRange" + listStreamUri = "/ListStream" + describeStreamUri = "/DescribeStream" + getShardIteratorUri = "/GetShardIterator" + getStreamRecordUri = "/GetStreamRecord" computeSplitPointsBySizeRequestUri = "/ComputeSplitPointsBySize" ) @@ -83,8 +82,8 @@ func NewClientWithConfig(endPoint, instanceName, accessKeyId, accessKeySecret st tableStoreClient.httpClient = currentGetHttpClientFunc() httpClient := &http.Client{ - Transport: tableStoreTransportProxy, - Timeout: tableStoreClient.config.HTTPTimeout.RequestTimeout, + Transport:tableStoreTransportProxy, + Timeout: tableStoreClient.config.HTTPTimeout.RequestTimeout, } tableStoreClient.httpClient.New(httpClient) @@ -106,7 +105,7 @@ func (tableStoreClient *TableStoreClient) doRequestWithRetry(uri string, req, re return err } } else { - body = nil + body = nil; } var value int64 @@ -134,7 +133,7 @@ func (tableStoreClient *TableStoreClient) doRequestWithRetry(uri string, req, re if errn != nil { return fmt.Errorf("decode resp failed: %s: %s: %s %s", errn, err, string(respBody), requestId) } else { - return fmt.Errorf("%s %s %s", *e.Code, *e.Message, requestId) + return fmt.Errorf("%s %s %s", *e.Code, *e.Message ,requestId) } } @@ -160,7 +159,7 @@ func getNextPause(tableStoreClient *TableStoreClient, err error, serverError *ot } else if err == nil && !shouldRetry(*serverError.Code, *serverError.Message, action, statusCode) { return 0 } else { - value := lastInterval*2 + tableStoreClient.random.Int63n(DefaultRetryInterval-1) + 1 + value := lastInterval * 2 + tableStoreClient.random.Int63n(DefaultRetryInterval - 1) + 1 if value > MaxRetryInterval { return MaxRetryInterval } @@ -171,34 +170,34 @@ func getNextPause(tableStoreClient *TableStoreClient, err error, serverError *ot func shouldRetry(errorCode string, errorMsg string, action string, httpStatus int) bool { if retryNotMatterActions(errorCode, errorMsg) == true { - return true + return true; } - serverError := httpStatus >= 500 && httpStatus <= 599 - if isIdempotent(action) && - (errorCode == STORAGE_TIMEOUT || errorCode == INTERNAL_SERVER_ERROR || errorCode == SERVER_UNAVAILABLE || serverError) { - return true + serverError := httpStatus >= 500 && httpStatus <= 599; + if (isIdempotent(action) && + ( errorCode == STORAGE_TIMEOUT || errorCode == INTERNAL_SERVER_ERROR || errorCode == SERVER_UNAVAILABLE || serverError)) { + return true; } - return false + return false; } func retryNotMatterActions(errorCode string, errorMsg string) bool { - if errorCode == ROW_OPERATION_CONFLICT || errorCode == NOT_ENOUGH_CAPACITY_UNIT || + if (errorCode == ROW_OPERATION_CONFLICT || errorCode == NOT_ENOUGH_CAPACITY_UNIT || errorCode == TABLE_NOT_READY || errorCode == PARTITION_UNAVAILABLE || - errorCode == SERVER_BUSY || (errorCode == QUOTA_EXHAUSTED && errorMsg == "Too frequent table operations.") { - return true + errorCode == SERVER_BUSY || (errorCode == QUOTA_EXHAUSTED && errorMsg == "Too frequent table operations.")) { + return true; } else { - return false + return false; } } func isIdempotent(action string) bool { - if action == batchGetRowUri || action == describeTableUri || + if (action == batchGetRowUri || action == describeTableUri || action == getRangeUri || action == getRowUri || - action == listTableUri { - return true + action == listTableUri) { + return true; } else { - return false + return false; } } @@ -233,9 +232,7 @@ func (tableStoreClient *TableStoreClient) doRequest(url string, uri string, body otshead.set(xOtsInstanceName, tableStoreClient.instanceName) sign, err := otshead.signature(uri, "POST", tableStoreClient.accessKeySecret) - if err != nil { - return nil, err, 0, "" - } + if err != nil { return nil, err, 0 , ""} hreq.Header.Set(xOtsSignature, sign) /* end set headers */ @@ -266,11 +263,11 @@ func (tableStoreClient *TableStoreClient) CreateTable(request *CreateTableReques req.TableMeta = new(otsprotocol.TableMeta) req.TableMeta.TableName = proto.String(request.TableMeta.TableName) - for _, key := range request.TableMeta.SchemaEntry { + for _, key := range (request.TableMeta.SchemaEntry) { keyType := otsprotocol.PrimaryKeyType(*key.Type) if key.Option != nil { keyOption := otsprotocol.PrimaryKeyOption(*key.Option) - req.TableMeta.PrimaryKey = append(req.TableMeta.PrimaryKey, &otsprotocol.PrimaryKeySchema{Name: key.Name, Type: &keyType, Option: &keyOption}) + req.TableMeta.PrimaryKey = append(req.TableMeta.PrimaryKey, &otsprotocol.PrimaryKeySchema{Name: key.Name, Type: &keyType, Option: &keyOption }) } else { req.TableMeta.PrimaryKey = append(req.TableMeta.PrimaryKey, &otsprotocol.PrimaryKeySchema{Name: key.Name, Type: &keyType}) } @@ -287,7 +284,7 @@ func (tableStoreClient *TableStoreClient) CreateTable(request *CreateTableReques if request.StreamSpec != nil { ss := otsprotocol.StreamSpecification{ - EnableStream: &request.StreamSpec.EnableStream, + EnableStream: &request.StreamSpec.EnableStream, ExpirationTime: &request.StreamSpec.ExpirationTime} req.StreamSpec = &ss } @@ -352,23 +349,23 @@ func (tableStoreClient *TableStoreClient) DescribeTable(request *DescribeTableRe responseTableMeta := new(TableMeta) responseTableMeta.TableName = *resp.TableMeta.TableName - for _, key := range resp.TableMeta.PrimaryKey { + for _, key := range (resp.TableMeta.PrimaryKey) { keyType := PrimaryKeyType(*key.Type) // enable it when we support kep option in describe table if key.Option != nil { keyOption := PrimaryKeyOption(*key.Option) - responseTableMeta.SchemaEntry = append(responseTableMeta.SchemaEntry, &PrimaryKeySchema{Name: key.Name, Type: &keyType, Option: &keyOption}) + responseTableMeta.SchemaEntry = append(responseTableMeta.SchemaEntry, &PrimaryKeySchema{Name: key.Name, Type: &keyType, Option: &keyOption }) } else { - responseTableMeta.SchemaEntry = append(responseTableMeta.SchemaEntry, &PrimaryKeySchema{Name: key.Name, Type: &keyType}) + responseTableMeta.SchemaEntry = append(responseTableMeta.SchemaEntry, &PrimaryKeySchema{Name: key.Name, Type: &keyType }) } } response.TableMeta = responseTableMeta response.TableOption = &TableOption{TimeToAlive: int(*resp.TableOptions.TimeToLive), MaxVersion: int(*resp.TableOptions.MaxVersions)} if resp.StreamDetails != nil && *resp.StreamDetails.EnableStream { response.StreamDetails = &StreamDetails{ - EnableStream: *resp.StreamDetails.EnableStream, - StreamId: (*StreamId)(resp.StreamDetails.StreamId), + EnableStream: *resp.StreamDetails.EnableStream, + StreamId: (*StreamId)(resp.StreamDetails.StreamId), ExpirationTime: *resp.StreamDetails.ExpirationTime, LastEnableTime: *resp.StreamDetails.LastEnableTime} } else { @@ -386,14 +383,14 @@ func (tableStoreClient *TableStoreClient) UpdateTable(request *UpdateTableReques req := new(otsprotocol.UpdateTableRequest) req.TableName = proto.String(request.TableName) - if request.ReservedThroughput != nil { + if (request.ReservedThroughput != nil) { req.ReservedThroughput = new(otsprotocol.ReservedThroughput) req.ReservedThroughput.CapacityUnit = new(otsprotocol.CapacityUnit) req.ReservedThroughput.CapacityUnit.Read = proto.Int32(int32(request.ReservedThroughput.Readcap)) req.ReservedThroughput.CapacityUnit.Write = proto.Int32(int32(request.ReservedThroughput.Writecap)) } - if request.TableOption != nil { + if (request.TableOption != nil) { req.TableOptions = new(otsprotocol.TableOptions) req.TableOptions.TimeToLive = proto.Int32(int32(request.TableOption.TimeToAlive)) req.TableOptions.MaxVersions = proto.Int32(int32(request.TableOption.MaxVersion)) @@ -405,7 +402,7 @@ func (tableStoreClient *TableStoreClient) UpdateTable(request *UpdateTableReques expirationTime = &request.StreamSpec.ExpirationTime } req.StreamSpec = &otsprotocol.StreamSpecification{ - EnableStream: &request.StreamSpec.EnableStream, + EnableStream: &request.StreamSpec.EnableStream, ExpirationTime: expirationTime, } } @@ -418,15 +415,15 @@ func (tableStoreClient *TableStoreClient) UpdateTable(request *UpdateTableReques response := new(UpdateTableResponse) response.ReservedThroughput = &ReservedThroughput{ - Readcap: int(*(resp.ReservedThroughputDetails.CapacityUnit.Read)), + Readcap: int(*(resp.ReservedThroughputDetails.CapacityUnit.Read)), Writecap: int(*(resp.ReservedThroughputDetails.CapacityUnit.Write))} response.TableOption = &TableOption{ TimeToAlive: int(*resp.TableOptions.TimeToLive), - MaxVersion: int(*resp.TableOptions.MaxVersions)} + MaxVersion: int(*resp.TableOptions.MaxVersions)} if *resp.StreamDetails.EnableStream { response.StreamDetails = &StreamDetails{ - EnableStream: *resp.StreamDetails.EnableStream, - StreamId: (*StreamId)(resp.StreamDetails.StreamId), + EnableStream: *resp.StreamDetails.EnableStream, + StreamId: (*StreamId)(resp.StreamDetails.StreamId), ExpirationTime: *resp.StreamDetails.ExpirationTime, LastEnableTime: *resp.StreamDetails.LastEnableTime} } else { @@ -463,7 +460,7 @@ func (tableStoreClient *TableStoreClient) PutRow(request *PutRowRequest) (*PutRo } if request.PutRowChange.ReturnType == ReturnType_RT_PK { - content := otsprotocol.ReturnContent{ReturnType: otsprotocol.ReturnType_RT_PK.Enum()} + content := otsprotocol.ReturnContent{ ReturnType: otsprotocol.ReturnType_RT_PK.Enum() } req.ReturnContent = &content } @@ -533,13 +530,13 @@ func (tableStoreClient *TableStoreClient) GetRow(request *GetRowRequest) (*GetRo req.MaxVersions = proto.Int32(int32(request.SingleRowQueryCriteria.MaxVersion)) } - if request.SingleRowQueryCriteria.StartColumn != nil { + if request.SingleRowQueryCriteria.StartColumn !=nil { req.StartColumn = request.SingleRowQueryCriteria.StartColumn } if request.SingleRowQueryCriteria.TimeRange != nil { - if request.SingleRowQueryCriteria.TimeRange.Specific != 0 { - req.TimeRange = &otsprotocol.TimeRange{SpecificTime: proto.Int64(request.SingleRowQueryCriteria.TimeRange.Specific)} + if (request.SingleRowQueryCriteria.TimeRange.Specific != 0) { + req.TimeRange = &otsprotocol.TimeRange{SpecificTime : proto.Int64(request.SingleRowQueryCriteria.TimeRange.Specific)} } else { req.TimeRange = &otsprotocol.TimeRange{StartTime: proto.Int64(request.SingleRowQueryCriteria.TimeRange.Start), EndTime: proto.Int64(request.SingleRowQueryCriteria.TimeRange.End)} } @@ -555,7 +552,8 @@ func (tableStoreClient *TableStoreClient) GetRow(request *GetRowRequest) (*GetRo return nil, err } - response := &GetRowResponse{ConsumedCapacityUnit: &ConsumedCapacityUnit{}} + + response := &GetRowResponse{ConsumedCapacityUnit:&ConsumedCapacityUnit{}} if len(resp.Row) == 0 { return response, nil } @@ -565,13 +563,13 @@ func (tableStoreClient *TableStoreClient) GetRow(request *GetRowRequest) (*GetRo return nil, err } - for _, pk := range rows[0].primaryKey { + for _, pk := range (rows[0].primaryKey) { pkColumn := &PrimaryKeyColumn{ColumnName: string(pk.cellName), Value: pk.cellValue.Value} response.PrimaryKey.PrimaryKeys = append(response.PrimaryKey.PrimaryKeys, pkColumn) } - for _, cell := range rows[0].cells { - dataColumn := &AttributeColumn{ColumnName: string(cell.cellName), Value: cell.cellValue.Value, Timestamp: cell.cellTimestamp} + for _, cell := range (rows[0].cells) { + dataColumn := &AttributeColumn{ColumnName: string(cell.cellName), Value: cell.cellValue.Value, Timestamp:cell.cellTimestamp} response.Columns = append(response.Columns, dataColumn) } @@ -594,7 +592,7 @@ func (tableStoreClient *TableStoreClient) UpdateRow(request *UpdateRowRequest) ( return nil, err } - response := &UpdateRowResponse{ConsumedCapacityUnit: &ConsumedCapacityUnit{}} + response := &UpdateRowResponse{ConsumedCapacityUnit : &ConsumedCapacityUnit{}} response.ConsumedCapacityUnit.Read = *resp.Consumed.CapacityUnit.Read response.ConsumedCapacityUnit.Write = *resp.Consumed.CapacityUnit.Write return response, nil @@ -607,7 +605,7 @@ func (tableStoreClient *TableStoreClient) BatchGetRow(request *BatchGetRowReques var tablesInBatch []*otsprotocol.TableInBatchGetRowRequest - for _, Criteria := range request.MultiRowQueryCriteria { + for _, Criteria := range (request.MultiRowQueryCriteria) { table := new(otsprotocol.TableInBatchGetRowRequest) table.TableName = proto.String(Criteria.TableName) table.ColumnsToGet = Criteria.ColumnsToGet @@ -615,13 +613,13 @@ func (tableStoreClient *TableStoreClient) BatchGetRow(request *BatchGetRowReques if Criteria.Filter != nil { table.Filter = Criteria.Filter.Serialize() } - if Criteria.MaxVersion != 0 { + if (Criteria.MaxVersion != 0) { table.MaxVersions = proto.Int32(int32(Criteria.MaxVersion)) } if Criteria.TimeRange != nil { - if Criteria.TimeRange.Specific != 0 { - table.TimeRange = &otsprotocol.TimeRange{SpecificTime: proto.Int64(Criteria.TimeRange.Specific)} + if (Criteria.TimeRange.Specific != 0) { + table.TimeRange = &otsprotocol.TimeRange{SpecificTime : proto.Int64(Criteria.TimeRange.Specific)} } else { table.TimeRange = &otsprotocol.TimeRange{StartTime: proto.Int64(Criteria.TimeRange.Start), EndTime: proto.Int64(Criteria.TimeRange.End)} } @@ -629,7 +627,7 @@ func (tableStoreClient *TableStoreClient) BatchGetRow(request *BatchGetRowReques return nil, errInvalidInput } - for _, pk := range Criteria.PrimaryKey { + for _, pk := range (Criteria.PrimaryKey) { pkWithBytes := pk.Build(false) table.PrimaryKey = append(table.PrimaryKey, pkWithBytes) } @@ -644,30 +642,30 @@ func (tableStoreClient *TableStoreClient) BatchGetRow(request *BatchGetRowReques return nil, err } - response := &BatchGetRowResponse{TableToRowsResult: make(map[string][]RowResult)} + response := &BatchGetRowResponse{TableToRowsResult:make(map[string][]RowResult) } - for _, table := range resp.Tables { + for _, table := range (resp.Tables) { index := int32(0) - for _, row := range table.Rows { - rowResult := &RowResult{TableName: *table.TableName, IsSucceed: *row.IsOk, ConsumedCapacityUnit: &ConsumedCapacityUnit{}, Index: index} + for _, row := range (table.Rows) { + rowResult := &RowResult{TableName: *table.TableName, IsSucceed: *row.IsOk, ConsumedCapacityUnit : &ConsumedCapacityUnit{}, Index: index} index++ if *row.IsOk == false { - rowResult.Error = Error{Code: *row.Error.Code, Message: *row.Error.Message} + rowResult.Error = Error{Code: *row.Error.Code, Message: *row.Error.Message } } else { // len == 0 means row not exist - if len(row.Row) > 0 { + if (len(row.Row) > 0) { rows, err := readRowsWithHeader(bytes.NewReader(row.Row)) if err != nil { return nil, err } - for _, pk := range rows[0].primaryKey { + for _, pk := range (rows[0].primaryKey) { pkColumn := &PrimaryKeyColumn{ColumnName: string(pk.cellName), Value: pk.cellValue.Value} rowResult.PrimaryKey.PrimaryKeys = append(rowResult.PrimaryKey.PrimaryKeys, pkColumn) } - for _, cell := range rows[0].cells { - dataColumn := &AttributeColumn{ColumnName: string(cell.cellName), Value: cell.cellValue.Value, Timestamp: cell.cellTimestamp} + for _, cell := range (rows[0].cells) { + dataColumn := &AttributeColumn{ColumnName: string(cell.cellName), Value: cell.cellValue.Value, Timestamp:cell.cellTimestamp} rowResult.Columns = append(rowResult.Columns, dataColumn) } } @@ -690,11 +688,11 @@ func (tableStoreClient *TableStoreClient) BatchWriteRow(request *BatchWriteRowRe var tablesInBatch []*otsprotocol.TableInBatchWriteRowRequest - for key, value := range request.RowChangesGroupByTable { + for key, value := range (request.RowChangesGroupByTable) { table := new(otsprotocol.TableInBatchWriteRowRequest) table.TableName = proto.String(key) - for _, row := range value { + for _, row := range (value) { rowInBatch := &otsprotocol.RowInBatchWriteRowRequest{} rowInBatch.Condition = row.getCondition() rowInBatch.RowChange = row.Serialize() @@ -713,15 +711,15 @@ func (tableStoreClient *TableStoreClient) BatchWriteRow(request *BatchWriteRowRe return nil, err } - response := &BatchWriteRowResponse{TableToRowsResult: make(map[string][]RowResult)} + response := &BatchWriteRowResponse{TableToRowsResult:make(map[string][]RowResult) } - for _, table := range resp.Tables { + for _, table := range (resp.Tables) { index := int32(0) - for _, row := range table.Rows { - rowResult := &RowResult{TableName: *table.TableName, IsSucceed: *row.IsOk, ConsumedCapacityUnit: &ConsumedCapacityUnit{}, Index: index} + for _, row := range (table.Rows) { + rowResult := &RowResult{TableName: *table.TableName, IsSucceed: *row.IsOk, ConsumedCapacityUnit : &ConsumedCapacityUnit{}, Index: index} index++ if *row.IsOk == false { - rowResult.Error = Error{Code: *row.Error.Code, Message: *row.Error.Message} + rowResult.Error = Error{Code: *row.Error.Code, Message: *row.Error.Message } } else { rowResult.ConsumedCapacityUnit.Read = *row.Consumed.CapacityUnit.Read rowResult.ConsumedCapacityUnit.Write = *row.Consumed.CapacityUnit.Write @@ -763,8 +761,8 @@ func (tableStoreClient *TableStoreClient) GetRange(request *GetRangeRequest) (*G } if request.RangeRowQueryCriteria.TimeRange != nil { - if request.RangeRowQueryCriteria.TimeRange.Specific != 0 { - req.TimeRange = &otsprotocol.TimeRange{SpecificTime: proto.Int64(request.RangeRowQueryCriteria.TimeRange.Specific)} + if (request.RangeRowQueryCriteria.TimeRange.Specific != 0) { + req.TimeRange = &otsprotocol.TimeRange{SpecificTime : proto.Int64(request.RangeRowQueryCriteria.TimeRange.Specific)} } else { req.TimeRange = &otsprotocol.TimeRange{StartTime: proto.Int64(request.RangeRowQueryCriteria.TimeRange.Start), EndTime: proto.Int64(request.RangeRowQueryCriteria.TimeRange.End)} } @@ -793,7 +791,7 @@ func (tableStoreClient *TableStoreClient) GetRange(request *GetRangeRequest) (*G return nil, err } - response := &GetRangeResponse{ConsumedCapacityUnit: &ConsumedCapacityUnit{}} + response := &GetRangeResponse{ConsumedCapacityUnit:&ConsumedCapacityUnit{}} response.ConsumedCapacityUnit.Read = *resp.Consumed.CapacityUnit.Read response.ConsumedCapacityUnit.Write = *resp.Consumed.CapacityUnit.Write if len(resp.NextStartPrimaryKey) != 0 { @@ -803,7 +801,7 @@ func (tableStoreClient *TableStoreClient) GetRange(request *GetRangeRequest) (*G } response.NextStartPrimaryKey = &PrimaryKey{} - for _, pk := range currentRows[0].primaryKey { + for _, pk := range (currentRows[0].primaryKey) { pkColumn := &PrimaryKeyColumn{ColumnName: string(pk.cellName), Value: pk.cellValue.Value} response.NextStartPrimaryKey.PrimaryKeys = append(response.NextStartPrimaryKey.PrimaryKeys, pkColumn) } @@ -821,15 +819,15 @@ func (tableStoreClient *TableStoreClient) GetRange(request *GetRangeRequest) (*G for _, row := range rows { currentRow := &Row{} currentpk := new(PrimaryKey) - for _, pk := range row.primaryKey { + for _, pk := range (row.primaryKey) { pkColumn := &PrimaryKeyColumn{ColumnName: string(pk.cellName), Value: pk.cellValue.Value} currentpk.PrimaryKeys = append(currentpk.PrimaryKeys, pkColumn) } currentRow.PrimaryKey = currentpk - for _, cell := range row.cells { - dataColumn := &AttributeColumn{ColumnName: string(cell.cellName), Value: cell.cellValue.Value, Timestamp: cell.cellTimestamp} + for _, cell := range (row.cells) { + dataColumn := &AttributeColumn{ColumnName: string(cell.cellName), Value: cell.cellValue.Value, Timestamp:cell.cellTimestamp} currentRow.Columns = append(currentRow.Columns, dataColumn) } @@ -851,10 +849,10 @@ func (client *TableStoreClient) ListStream(req *ListStreamRequest) (*ListStreamR resp := ListStreamResponse{} streams := make([]Stream, len(pbResp.Streams)) - for i, pbStream := range pbResp.Streams { + for i, pbStream := range pbResp.Streams { streams[i] = Stream{ - Id: (*StreamId)(pbStream.StreamId), - TableName: pbStream.TableName, + Id: (*StreamId)(pbStream.StreamId), + TableName: pbStream.TableName, CreationTime: *pbStream.CreationTime} } resp.Streams = streams[:] @@ -868,7 +866,7 @@ func (client *TableStoreClient) DescribeStream(req *DescribeStreamRequest) (*Des pbReq.InclusiveStartShardId = (*string)(req.InclusiveStartShardId) pbReq.ShardLimit = req.ShardLimit } - pbResp := otsprotocol.DescribeStreamResponse{} + pbResp:= otsprotocol.DescribeStreamResponse{} if err := client.doRequestWithRetry(describeStreamUri, pbReq, &pbResp); err != nil { return nil, err } @@ -889,7 +887,7 @@ func (client *TableStoreClient) DescribeStream(req *DescribeStreamRequest) (*Des shards := make([]*StreamShard, len(pbResp.Shards)) for i, pbShard := range pbResp.Shards { shards[i] = &StreamShard{ - SelfShard: (*ShardId)(pbShard.ShardId), + SelfShard: (*ShardId)(pbShard.ShardId), FatherShard: (*ShardId)(pbShard.ParentId), MotherShard: (*ShardId)(pbShard.ParentSiblingId)} } @@ -900,9 +898,9 @@ func (client *TableStoreClient) DescribeStream(req *DescribeStreamRequest) (*Des func (client *TableStoreClient) GetShardIterator(req *GetShardIteratorRequest) (*GetShardIteratorResponse, error) { pbReq := &otsprotocol.GetShardIteratorRequest{ StreamId: (*string)(req.StreamId), - ShardId: (*string)(req.ShardId)} + ShardId: (*string)(req.ShardId)} - pbResp := otsprotocol.GetShardIteratorResponse{} + pbResp:= otsprotocol.GetShardIteratorResponse{} if err := client.doRequestWithRetry(getShardIteratorUri, pbReq, &pbResp); err != nil { return nil, err } @@ -919,7 +917,7 @@ func (client TableStoreClient) GetStreamRecord(req *GetStreamRecordRequest) (*Ge pbReq.Limit = req.Limit } - pbResp := otsprotocol.GetStreamRecordResponse{} + pbResp:= otsprotocol.GetStreamRecordResponse{} if err := client.doRequestWithRetry(getStreamRecordUri, pbReq, &pbResp); err != nil { return nil, err } @@ -955,7 +953,7 @@ func (client TableStoreClient) GetStreamRecord(req *GetStreamRecordRequest) (*Ge for i, pk := range plainRow.primaryKey { pkc := PrimaryKeyColumn{ ColumnName: string(pk.cellName), - Value: pk.cellValue.Value} + Value: pk.cellValue.Value} pkey.PrimaryKeys[i] = &pkc } Assert(plainRow.extension != nil, @@ -998,7 +996,7 @@ func (client TableStoreClient) ComputeSplitPointsBySize(req *ComputeSplitPointsB SplitSize: &(req.SplitSize), } - pbResp := otsprotocol.ComputeSplitPointsBySizeResponse{} + pbResp:= otsprotocol.ComputeSplitPointsBySizeResponse{} if err := client.doRequestWithRetry(computeSplitPointsBySizeRequestUri, pbReq, &pbResp); err != nil { return nil, err } @@ -1018,12 +1016,12 @@ func (client TableStoreClient) ComputeSplitPointsBySize(req *ComputeSplitPointsB for _, pbRecord := range pbResp.SplitPoints { plainRows, err := readRowsWithHeader(bytes.NewReader(pbRecord)) - if err != nil { + if err !=nil { return nil, err } nowPk = &PrimaryKey{} - for _, pk := range plainRows[0].primaryKey { + for _, pk := range (plainRows[0].primaryKey) { nowPk.AddPrimaryKeyColumn(string(pk.cellName), pk.cellValue.Value) } @@ -1036,15 +1034,16 @@ func (client TableStoreClient) ComputeSplitPointsBySize(req *ComputeSplitPointsB newSplit := &Split{LowerBound: lastPk, UpperBound: nowPk} resp.Splits = append(resp.Splits, newSplit) - index := 0 + index:=0 for _, pbLocation := range pbResp.Locations { count := *pbLocation.Repeat value := *pbLocation.Location - for i := int64(0); i < count; i++ { + for i := int64(0) ; i < count; i++{ resp.Splits[index].Location = value index++ } } return &resp, nil } +