Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add a flag EncodeType for response to use Arrow/Default format to decode. #12536

Merged
merged 12 commits into from Oct 9, 2019
15 changes: 6 additions & 9 deletions distsql/select_result.go
Expand Up @@ -147,18 +147,15 @@ func (r *selectResult) NextRaw(ctx context.Context) (data []byte, err error) {
func (r *selectResult) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
// Check the returned data is default/arrow format.
if r.selectResp == nil || (len(r.selectResp.RowBatchData) == 0 && r.respChkIdx == len(r.selectResp.Chunks)) {
if r.selectResp == nil || r.respChkIdx == len(r.selectResp.Chunks) {
err := r.getSelectResp()
if err != nil || r.selectResp == nil {
return err
}
// TODO(Shenghui Wu): add metrics
wshwsh12 marked this conversation as resolved.
Show resolved Hide resolved
if len(r.selectResp.RowBatchData) == 0 {
r.encodeType = tipb.EncodeType_TypeDefault
}
}

switch r.encodeType {
switch r.selectResp.EncodeType {
case tipb.EncodeType_TypeDefault:
return r.readFromDefault(ctx, chk)
case tipb.EncodeType_TypeArrow:
Expand Down Expand Up @@ -187,10 +184,10 @@ func (r *selectResult) readFromDefault(ctx context.Context, chk *chunk.Chunk) er
}

func (r *selectResult) readFromArrow(ctx context.Context, chk *chunk.Chunk) error {
rowBatchData := r.selectResp.RowBatchData
rowBatchData := r.selectResp.Chunks[r.respChkIdx].RowsData
codec := chunk.NewCodec(r.fieldTypes)
remained := codec.DecodeToChunk(rowBatchData, chk)
r.selectResp.RowBatchData = remained
_ = codec.DecodeToChunk(rowBatchData, chk)
r.respChkIdx++
return nil
}

Expand Down Expand Up @@ -227,7 +224,7 @@ func (r *selectResult) getSelectResp() error {
r.feedback.Update(re.result.GetStartKey(), r.selectResp.OutputCounts)
r.partialCount++
sc.MergeExecDetails(re.result.GetExecDetails(), nil)
if len(r.selectResp.Chunks) == 0 && len(r.selectResp.RowBatchData) == 0 {
if len(r.selectResp.Chunks) == 0 {
continue
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -43,7 +43,7 @@ require (
github.com/pingcap/parser v0.0.0-20190923031704-33636bc5e5d6
github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0
github.com/pingcap/tidb-tools v2.1.3-0.20190321065848-1e8b48f5c168+incompatible
github.com/pingcap/tipb v0.0.0-20190806070524-16909e03435e
github.com/pingcap/tipb v0.0.0-20191008064422-018b2fadf414
github.com/prometheus/client_golang v0.9.0
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910
github.com/prometheus/common v0.0.0-20181020173914-7e9e6cabbd39 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Expand Up @@ -170,8 +170,8 @@ github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0 h1:GIEq+wZfrl2bc
github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0/go.mod h1:G/6rJpnYwM0LKMec2rI82/5Kg6GaZMvlfB+e6/tvYmI=
github.com/pingcap/tidb-tools v2.1.3-0.20190321065848-1e8b48f5c168+incompatible h1:MkWCxgZpJBgY2f4HtwWMMFzSBb3+JPzeJgF3VrXE/bU=
github.com/pingcap/tidb-tools v2.1.3-0.20190321065848-1e8b48f5c168+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tipb v0.0.0-20190806070524-16909e03435e h1:H7meq8QPmWGImOkHTQYAWw82zwIqndJaCDPVUknOHbM=
github.com/pingcap/tipb v0.0.0-20190806070524-16909e03435e/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
github.com/pingcap/tipb v0.0.0-20191008064422-018b2fadf414 h1:1HaTk+HEzn0rVcsAbVz9GLB9Ft3/KeSl2Sy452tDkHk=
github.com/pingcap/tipb v0.0.0-20191008064422-018b2fadf414/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
14 changes: 10 additions & 4 deletions store/mockstore/mocktikv/cop_handler_dag.go
Expand Up @@ -609,10 +609,11 @@ func (h *rpcHandler) encodeDefault(selResp *tipb.SelectResponse, rows [][][]byte
chunks = appendRow(chunks, requestedRow, i)
}
selResp.Chunks = chunks
selResp.EncodeType = tipb.EncodeType_TypeDefault
}

func (h *rpcHandler) encodeArrow(selResp *tipb.SelectResponse, rows [][][]byte, colTypes []*types.FieldType, colOrdinal []uint32, loc *time.Location) error {
rowBatchData := make([]byte, 0, 1024)
var chunks []tipb.Chunk
respColTypes := make([]*types.FieldType, 0, len(colOrdinal))
for _, ordinal := range colOrdinal {
respColTypes = append(respColTypes, colTypes[ordinal])
Expand All @@ -628,15 +629,20 @@ func (h *rpcHandler) encodeArrow(selResp *tipb.SelectResponse, rows [][][]byte,
}
}
if i%rowsPerChunk == rowsPerChunk-1 {
rowBatchData = append(rowBatchData, encoder.Encode(chk)...)
chunks = append(chunks, tipb.Chunk{})
cur := &chunks[len(chunks)-1]
cur.RowsData = append(cur.RowsData, encoder.Encode(chk)...)
chk.Reset()
}
}
if chk.NumRows() > 0 {
rowBatchData = append(rowBatchData, encoder.Encode(chk)...)
chunks = append(chunks, tipb.Chunk{})
cur := &chunks[len(chunks)-1]
cur.RowsData = append(cur.RowsData, encoder.Encode(chk)...)
chk.Reset()
}
selResp.RowBatchData = rowBatchData
selResp.Chunks = chunks
selResp.EncodeType = tipb.EncodeType_TypeArrow
return nil
}

Expand Down