Skip to content

Commit

Permalink
spanner: WIP - cleanup mockserver and mockclient
Browse files Browse the repository at this point in the history
Change-Id: Ifa5dbd4b292e8139df1822462dd755da1c45539b
  • Loading branch information
hengfengli committed Jun 10, 2020
1 parent f1e66fc commit fe8e59a
Show file tree
Hide file tree
Showing 5 changed files with 272 additions and 492 deletions.
29 changes: 18 additions & 11 deletions spanner/internal/testutil/inmem_spanner_server.go
Expand Up @@ -70,10 +70,11 @@ const (
// StatementResult represents a mocked result on the test server. The result is
// either of: a ResultSet, an update count or an error.
type StatementResult struct {
Type StatementResultType
Err error
ResultSet *spannerpb.ResultSet
UpdateCount int64
Type StatementResultType
Err error
ResultSet *spannerpb.ResultSet
UpdateCount int64
SameResumeToken bool
}

// PartialResultSetExecutionTime represents execution times and errors that
Expand All @@ -85,10 +86,10 @@ type PartialResultSetExecutionTime struct {
Err error
}

// Converts a ResultSet to a PartialResultSet. This method is used to convert
// a mocked result to a PartialResultSet when one of the streaming methods are
// called.
func (s *StatementResult) toPartialResultSets(resumeToken []byte) (result []*spannerpb.PartialResultSet, err error) {
// ToPartialResultSets converts a ResultSet to a PartialResultSet. This method
// is used to convert a mocked result to a PartialResultSet when one of the
// streaming methods are called.
func (s *StatementResult) ToPartialResultSets(resumeToken []byte) (result []*spannerpb.PartialResultSet, err error) {
var startIndex uint64
if len(resumeToken) > 0 {
if startIndex, err = DecodeResumeToken(resumeToken); err != nil {
Expand All @@ -109,10 +110,14 @@ func (s *StatementResult) toPartialResultSets(resumeToken []byte) (result []*spa
idx++
}
}
rt := EncodeResumeToken(startIndex + rowCount)
if s.SameResumeToken {
rt = resumeToken
}
result = append(result, &spannerpb.PartialResultSet{
Metadata: s.ResultSet.Metadata,
Values: values,
ResumeToken: EncodeResumeToken(startIndex + rowCount),
ResumeToken: rt,
})
startIndex += rowCount
if startIndex == totalRows {
Expand Down Expand Up @@ -796,7 +801,7 @@ func (s *inMemSpannerServer) ExecuteStreamingSql(req *spannerpb.ExecuteSqlReques
case StatementResultError:
return statementResult.Err
case StatementResultResultSet:
parts, err := statementResult.toPartialResultSets(req.ResumeToken)
parts, err := statementResult.ToPartialResultSets(req.ResumeToken)
if err != nil {
return err
}
Expand All @@ -808,7 +813,7 @@ func (s *inMemSpannerServer) ExecuteStreamingSql(req *spannerpb.ExecuteSqlReques
s.partialResultSetErrors[req.Sql] = pErrors[1:]
}
s.mu.Unlock()
for _, part := range parts {
for i, part := range parts {
if nextPartialResultSetError != nil && bytes.Equal(part.ResumeToken, nextPartialResultSetError.ResumeToken) {
if nextPartialResultSetError.ExecutionTime > 0 {
<-time.After(nextPartialResultSetError.ExecutionTime)
Expand All @@ -820,7 +825,9 @@ func (s *inMemSpannerServer) ExecuteStreamingSql(req *spannerpb.ExecuteSqlReques
if err := stream.Send(part); err != nil {
return err
}
fmt.Println("Send i:", i)
}
fmt.Println("Send an empty result set")
return nil
case StatementResultUpdateCount:
part := statementResult.updateCountToPartialResultSet(!isPartitionedDml)
Expand Down
308 changes: 0 additions & 308 deletions spanner/internal/testutil/mockclient.go

This file was deleted.

0 comments on commit fe8e59a

Please sign in to comment.