From 3e3624941b28b9bf2ed4be00b1b988bfa9dd791c Mon Sep 17 00:00:00 2001 From: Hengfeng Li Date: Wed, 13 May 2020 09:21:05 +1000 Subject: [PATCH] spanner: WIP - cleanup mockserver and mockclient --- .../internal/testutil/inmem_spanner_server.go | 63 +- spanner/internal/testutil/mockclient.go | 308 ------- spanner/internal/testutil/mockserver.go | 280 ------- spanner/read_test.go | 768 +++++++++--------- 4 files changed, 445 insertions(+), 974 deletions(-) delete mode 100644 spanner/internal/testutil/mockclient.go delete mode 100644 spanner/internal/testutil/mockserver.go diff --git a/spanner/internal/testutil/inmem_spanner_server.go b/spanner/internal/testutil/inmem_spanner_server.go index 6583c656c11..505bbc89503 100644 --- a/spanner/internal/testutil/inmem_spanner_server.go +++ b/spanner/internal/testutil/inmem_spanner_server.go @@ -17,6 +17,7 @@ package testutil import ( "bytes" "context" + "encoding/binary" "fmt" "math/rand" "sort" @@ -35,6 +36,24 @@ import ( gstatus "google.golang.org/grpc/status" ) +var ( + // KvMeta is the Metadata for mocked KV table. + KvMeta = spannerpb.ResultSetMetadata{ + RowType: &spannerpb.StructType{ + Fields: []*spannerpb.StructType_Field{ + { + Name: "Key", + Type: &spannerpb.Type{Code: spannerpb.TypeCode_STRING}, + }, + { + Name: "Value", + Type: &spannerpb.Type{Code: spannerpb.TypeCode_STRING}, + }, + }, + }, + } +) + // StatementResultType indicates the type of result returned by a SQL // statement. type StatementResultType int @@ -70,10 +89,11 @@ const ( // StatementResult represents a mocked result on the test server. The result is // either of: a ResultSet, an update count or an error. type StatementResult struct { - Type StatementResultType - Err error - ResultSet *spannerpb.ResultSet - UpdateCount int64 + Type StatementResultType + Err error + ResultSet *spannerpb.ResultSet + UpdateCount int64 + ResumeTokens [][]byte } // PartialResultSetExecutionTime represents execution times and errors that @@ -85,10 +105,10 @@ type PartialResultSetExecutionTime struct { Err error } -// Converts a ResultSet to a PartialResultSet. This method is used to convert -// a mocked result to a PartialResultSet when one of the streaming methods are -// called. -func (s *StatementResult) toPartialResultSets(resumeToken []byte) (result []*spannerpb.PartialResultSet, err error) { +// ToPartialResultSets converts a ResultSet to a PartialResultSet. This method +// is used to convert a mocked result to a PartialResultSet when one of the +// streaming methods are called. +func (s *StatementResult) ToPartialResultSets(resumeToken []byte) (result []*spannerpb.PartialResultSet, err error) { var startIndex uint64 if len(resumeToken) > 0 { if startIndex, err = DecodeResumeToken(resumeToken); err != nil { @@ -109,11 +129,18 @@ func (s *StatementResult) toPartialResultSets(resumeToken []byte) (result []*spa idx++ } } + var rt []byte + if len(s.ResumeTokens) == 0 { + rt = EncodeResumeToken(startIndex + rowCount) + } else { + rt = s.ResumeTokens[startIndex] + } result = append(result, &spannerpb.PartialResultSet{ Metadata: s.ResultSet.Metadata, Values: values, - ResumeToken: EncodeResumeToken(startIndex + rowCount), + ResumeToken: rt, }) + startIndex += rowCount if startIndex == totalRows { break @@ -796,7 +823,7 @@ func (s *inMemSpannerServer) ExecuteStreamingSql(req *spannerpb.ExecuteSqlReques case StatementResultError: return statementResult.Err case StatementResultResultSet: - parts, err := statementResult.toPartialResultSets(req.ResumeToken) + parts, err := statementResult.ToPartialResultSets(req.ResumeToken) if err != nil { return err } @@ -1014,3 +1041,19 @@ func (s *inMemSpannerServer) PartitionRead(ctx context.Context, req *spannerpb.P s.mu.Unlock() return nil, gstatus.Error(codes.Unimplemented, "Method not yet implemented") } + +// EncodeResumeToken return mock resume token encoding for an uint64 integer. +func EncodeResumeToken(t uint64) []byte { + rt := make([]byte, 16) + binary.PutUvarint(rt, t) + return rt +} + +// DecodeResumeToken decodes a mock resume token into an uint64 integer. +func DecodeResumeToken(t []byte) (uint64, error) { + s, n := binary.Uvarint(t) + if n <= 0 { + return 0, fmt.Errorf("invalid resume token: %v", t) + } + return s, nil +} diff --git a/spanner/internal/testutil/mockclient.go b/spanner/internal/testutil/mockclient.go deleted file mode 100644 index ed912b7dab3..00000000000 --- a/spanner/internal/testutil/mockclient.go +++ /dev/null @@ -1,308 +0,0 @@ -/* -Copyright 2017 Google LLC - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package testutil - -import ( - "context" - "errors" - "fmt" - "sync" - "testing" - "time" - - "github.com/golang/protobuf/proto" - "github.com/golang/protobuf/ptypes/empty" - proto3 "github.com/golang/protobuf/ptypes/struct" - pbt "github.com/golang/protobuf/ptypes/timestamp" - pbs "google.golang.org/genproto/googleapis/rpc/status" - sppb "google.golang.org/genproto/googleapis/spanner/v1" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" -) - -// MockCloudSpannerClient is a mock implementation of sppb.SpannerClient. -type MockCloudSpannerClient struct { - sppb.SpannerClient - - mu sync.Mutex - t *testing.T - // Live sessions on the client. - sessions map[string]bool - // Session ping history. - pings []string - // Client will stall on any requests. - freezed chan struct{} - - // Expected set of actions that have been executed by the client. These - // interfaces should be type reflected against with *Request types in sppb, - // such as sppb.GetSessionRequest. Buffered to a large degree. - ReceivedRequests chan interface{} -} - -// NewMockCloudSpannerClient creates new MockCloudSpannerClient instance. -func NewMockCloudSpannerClient(t *testing.T) *MockCloudSpannerClient { - mc := &MockCloudSpannerClient{ - t: t, - sessions: map[string]bool{}, - ReceivedRequests: make(chan interface{}, 100000), - } - - // Produce a closed channel, so the default action of ready is to not block. - mc.Freeze() - mc.Unfreeze() - - return mc -} - -// DumpPings dumps the ping history. -func (m *MockCloudSpannerClient) DumpPings() []string { - m.mu.Lock() - defer m.mu.Unlock() - return append([]string(nil), m.pings...) -} - -// DumpSessions dumps the internal session table. -func (m *MockCloudSpannerClient) DumpSessions() map[string]bool { - m.mu.Lock() - defer m.mu.Unlock() - st := map[string]bool{} - for s, v := range m.sessions { - st[s] = v - } - return st -} - -// CreateSession is a placeholder for SpannerClient.CreateSession. -func (m *MockCloudSpannerClient) CreateSession(ctx context.Context, r *sppb.CreateSessionRequest, opts ...grpc.CallOption) (*sppb.Session, error) { - m.ready() - m.ReceivedRequests <- r - - m.mu.Lock() - defer m.mu.Unlock() - s := &sppb.Session{} - if r.Database != "mockdb" { - // Reject other databases - return s, status.Errorf(codes.NotFound, fmt.Sprintf("database not found: %v", r.Database)) - } - // Generate & record session name. - s.Name = fmt.Sprintf("mockdb-%v", time.Now().UnixNano()) - m.sessions[s.Name] = true - return s, nil -} - -// GetSession is a placeholder for SpannerClient.GetSession. -func (m *MockCloudSpannerClient) GetSession(ctx context.Context, r *sppb.GetSessionRequest, opts ...grpc.CallOption) (*sppb.Session, error) { - m.ready() - m.ReceivedRequests <- r - - m.mu.Lock() - defer m.mu.Unlock() - m.pings = append(m.pings, r.Name) - if _, ok := m.sessions[r.Name]; !ok { - return nil, newSessionNotFoundError(r.Name) - } - return &sppb.Session{Name: r.Name}, nil -} - -// DeleteSession is a placeholder for SpannerClient.DeleteSession. -func (m *MockCloudSpannerClient) DeleteSession(ctx context.Context, r *sppb.DeleteSessionRequest, opts ...grpc.CallOption) (*empty.Empty, error) { - m.ready() - m.ReceivedRequests <- r - - m.mu.Lock() - defer m.mu.Unlock() - if _, ok := m.sessions[r.Name]; !ok { - // Session not found. - return &empty.Empty{}, newSessionNotFoundError(r.Name) - } - // Delete session from in-memory table. - delete(m.sessions, r.Name) - return &empty.Empty{}, nil -} - -// ExecuteSql is a placeholder for SpannerClient.ExecuteSql. -func (m *MockCloudSpannerClient) ExecuteSql(ctx context.Context, r *sppb.ExecuteSqlRequest, opts ...grpc.CallOption) (*sppb.ResultSet, error) { - m.ready() - m.ReceivedRequests <- r - - m.mu.Lock() - defer m.mu.Unlock() - return &sppb.ResultSet{Stats: &sppb.ResultSetStats{RowCount: &sppb.ResultSetStats_RowCountExact{7}}}, nil -} - -// ExecuteBatchDml is a placeholder for SpannerClient.ExecuteBatchDml. -func (m *MockCloudSpannerClient) ExecuteBatchDml(ctx context.Context, r *sppb.ExecuteBatchDmlRequest, opts ...grpc.CallOption) (*sppb.ExecuteBatchDmlResponse, error) { - m.ready() - m.ReceivedRequests <- r - - m.mu.Lock() - defer m.mu.Unlock() - return &sppb.ExecuteBatchDmlResponse{Status: &pbs.Status{Code: 0}, ResultSets: []*sppb.ResultSet{}}, nil -} - -// ExecuteStreamingSql is a mock implementation of SpannerClient.ExecuteStreamingSql. -func (m *MockCloudSpannerClient) ExecuteStreamingSql(ctx context.Context, r *sppb.ExecuteSqlRequest, opts ...grpc.CallOption) (sppb.Spanner_ExecuteStreamingSqlClient, error) { - m.ready() - m.ReceivedRequests <- r - - m.mu.Lock() - defer m.mu.Unlock() - wantReq := &sppb.ExecuteSqlRequest{ - Session: "mocksession", - Transaction: &sppb.TransactionSelector{ - Selector: &sppb.TransactionSelector_SingleUse{ - SingleUse: &sppb.TransactionOptions{ - Mode: &sppb.TransactionOptions_ReadOnly_{ - ReadOnly: &sppb.TransactionOptions_ReadOnly{ - TimestampBound: &sppb.TransactionOptions_ReadOnly_Strong{ - Strong: true, - }, - ReturnReadTimestamp: false, - }, - }, - }, - }, - }, - Sql: "mockquery", - Params: &proto3.Struct{ - Fields: map[string]*proto3.Value{"var1": {Kind: &proto3.Value_StringValue{StringValue: "abc"}}}, - }, - ParamTypes: map[string]*sppb.Type{"var1": {Code: sppb.TypeCode_STRING}}, - } - if !proto.Equal(r, wantReq) { - return nil, fmt.Errorf("got query request: %v, want: %v", r, wantReq) - } - return nil, errors.New("query never succeeds on mock client") -} - -// StreamingRead is a placeholder for SpannerClient.StreamingRead. -func (m *MockCloudSpannerClient) StreamingRead(ctx context.Context, r *sppb.ReadRequest, opts ...grpc.CallOption) (sppb.Spanner_StreamingReadClient, error) { - m.ready() - m.ReceivedRequests <- r - - m.mu.Lock() - defer m.mu.Unlock() - wantReq := &sppb.ReadRequest{ - Session: "mocksession", - Transaction: &sppb.TransactionSelector{ - Selector: &sppb.TransactionSelector_SingleUse{ - SingleUse: &sppb.TransactionOptions{ - Mode: &sppb.TransactionOptions_ReadOnly_{ - ReadOnly: &sppb.TransactionOptions_ReadOnly{ - TimestampBound: &sppb.TransactionOptions_ReadOnly_Strong{ - Strong: true, - }, - ReturnReadTimestamp: false, - }, - }, - }, - }, - }, - Table: "t_mock", - Columns: []string{"col1", "col2"}, - KeySet: &sppb.KeySet{ - Keys: []*proto3.ListValue{ - { - Values: []*proto3.Value{ - {Kind: &proto3.Value_StringValue{StringValue: "foo"}}, - }, - }, - }, - Ranges: []*sppb.KeyRange{}, - All: false, - }, - } - if !proto.Equal(r, wantReq) { - return nil, fmt.Errorf("got query request: %v, want: %v", r, wantReq) - } - return nil, errors.New("read never succeeds on mock client") -} - -// BeginTransaction is a placeholder for SpannerClient.BeginTransaction. -func (m *MockCloudSpannerClient) BeginTransaction(ctx context.Context, r *sppb.BeginTransactionRequest, opts ...grpc.CallOption) (*sppb.Transaction, error) { - m.ready() - m.ReceivedRequests <- r - - m.mu.Lock() - defer m.mu.Unlock() - resp := &sppb.Transaction{Id: []byte("transaction-1")} - if _, ok := r.Options.Mode.(*sppb.TransactionOptions_ReadOnly_); ok { - resp.ReadTimestamp = &pbt.Timestamp{Seconds: 3, Nanos: 4} - } - return resp, nil -} - -// Commit is a placeholder for SpannerClient.Commit. -func (m *MockCloudSpannerClient) Commit(ctx context.Context, r *sppb.CommitRequest, opts ...grpc.CallOption) (*sppb.CommitResponse, error) { - m.ready() - m.ReceivedRequests <- r - - m.mu.Lock() - defer m.mu.Unlock() - return &sppb.CommitResponse{CommitTimestamp: &pbt.Timestamp{Seconds: 1, Nanos: 2}}, nil -} - -// Rollback is a placeholder for SpannerClient.Rollback. -func (m *MockCloudSpannerClient) Rollback(ctx context.Context, r *sppb.RollbackRequest, opts ...grpc.CallOption) (*empty.Empty, error) { - m.ready() - m.ReceivedRequests <- r - - m.mu.Lock() - defer m.mu.Unlock() - return nil, nil -} - -// PartitionQuery is a placeholder for SpannerServer.PartitionQuery. -func (m *MockCloudSpannerClient) PartitionQuery(ctx context.Context, r *sppb.PartitionQueryRequest, opts ...grpc.CallOption) (*sppb.PartitionResponse, error) { - m.ready() - m.ReceivedRequests <- r - - return nil, errors.New("Unimplemented") -} - -// PartitionRead is a placeholder for SpannerServer.PartitionRead. -func (m *MockCloudSpannerClient) PartitionRead(ctx context.Context, r *sppb.PartitionReadRequest, opts ...grpc.CallOption) (*sppb.PartitionResponse, error) { - m.ready() - m.ReceivedRequests <- r - - return nil, errors.New("Unimplemented") -} - -// Freeze stalls all requests. -func (m *MockCloudSpannerClient) Freeze() { - m.mu.Lock() - defer m.mu.Unlock() - m.freezed = make(chan struct{}) -} - -// Unfreeze restores processing requests. -func (m *MockCloudSpannerClient) Unfreeze() { - m.mu.Lock() - defer m.mu.Unlock() - close(m.freezed) -} - -// ready checks conditions before executing requests -// TODO: add checks for injected errors, actions -func (m *MockCloudSpannerClient) ready() { - m.mu.Lock() - freezed := m.freezed - m.mu.Unlock() - // check if client should be freezed - <-freezed -} diff --git a/spanner/internal/testutil/mockserver.go b/spanner/internal/testutil/mockserver.go deleted file mode 100644 index d389059cac7..00000000000 --- a/spanner/internal/testutil/mockserver.go +++ /dev/null @@ -1,280 +0,0 @@ -/* -Copyright 2017 Google LLC - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package testutil - -import ( - "context" - "encoding/binary" - "fmt" - "io" - "net" - "sync" - "testing" - "time" - - "github.com/golang/protobuf/ptypes/empty" - proto3 "github.com/golang/protobuf/ptypes/struct" - pbt "github.com/golang/protobuf/ptypes/timestamp" - sppb "google.golang.org/genproto/googleapis/spanner/v1" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" -) - -var ( - // KvMeta is the Metadata for mocked KV table. - KvMeta = sppb.ResultSetMetadata{ - RowType: &sppb.StructType{ - Fields: []*sppb.StructType_Field{ - { - Name: "Key", - Type: &sppb.Type{Code: sppb.TypeCode_STRING}, - }, - { - Name: "Value", - Type: &sppb.Type{Code: sppb.TypeCode_STRING}, - }, - }, - }, - } -) - -// MockCtlMsg encapsulates PartialResultSet/error that might be sent to -// client -type MockCtlMsg struct { - // If ResumeToken == true, mock server will generate a row with - // resume token. - ResumeToken bool - // If Err != nil, mock server will return error in RPC response. - Err error -} - -// MockCloudSpanner is a mock implementation of SpannerServer interface. -// TODO: make MockCloudSpanner a full-fleged Cloud Spanner implementation. -type MockCloudSpanner struct { - sppb.SpannerServer - - s *grpc.Server - t *testing.T - addr string - msgs chan MockCtlMsg - readTs time.Time - - mu sync.Mutex - next int - nextSession int - sessions map[string]*sppb.Session -} - -// Addr returns the listening address of mock server. -func (m *MockCloudSpanner) Addr() string { - return m.addr -} - -// AddMsg generates a new mocked row which can be received by client. -func (m *MockCloudSpanner) AddMsg(err error, resumeToken bool) { - msg := MockCtlMsg{ - ResumeToken: resumeToken, - Err: err, - } - if err == io.EOF { - close(m.msgs) - } else { - m.msgs <- msg - } -} - -// Done signals an end to a mocked stream. -func (m *MockCloudSpanner) Done() { - close(m.msgs) -} - -// BatchCreateSessions is a placeholder for SpannerServer.BatchCreateSessions. -func (m *MockCloudSpanner) BatchCreateSessions(c context.Context, r *sppb.BatchCreateSessionsRequest) (*sppb.BatchCreateSessionsResponse, error) { - m.mu.Lock() - defer m.mu.Unlock() - sessions := make([]*sppb.Session, r.SessionCount) - var i int32 - for i = 0; i < r.SessionCount; i++ { - name := fmt.Sprintf("session-%d", m.nextSession) - m.nextSession++ - sessions[i] = &sppb.Session{Name: name} - m.sessions[name] = sessions[i] - } - return &sppb.BatchCreateSessionsResponse{Session: sessions}, nil -} - -// CreateSession is a placeholder for SpannerServer.CreateSession. -func (m *MockCloudSpanner) CreateSession(c context.Context, r *sppb.CreateSessionRequest) (*sppb.Session, error) { - m.mu.Lock() - defer m.mu.Unlock() - name := fmt.Sprintf("session-%d", m.nextSession) - m.nextSession++ - s := &sppb.Session{Name: name} - m.sessions[name] = s - return s, nil -} - -// GetSession is a placeholder for SpannerServer.GetSession. -func (m *MockCloudSpanner) GetSession(c context.Context, r *sppb.GetSessionRequest) (*sppb.Session, error) { - m.mu.Lock() - defer m.mu.Unlock() - if s, ok := m.sessions[r.Name]; ok { - return s, nil - } - return nil, status.Errorf(codes.NotFound, "not found") -} - -// DeleteSession is a placeholder for SpannerServer.DeleteSession. -func (m *MockCloudSpanner) DeleteSession(c context.Context, r *sppb.DeleteSessionRequest) (*empty.Empty, error) { - m.mu.Lock() - defer m.mu.Unlock() - delete(m.sessions, r.Name) - return &empty.Empty{}, nil -} - -// EncodeResumeToken return mock resume token encoding for an uint64 integer. -func EncodeResumeToken(t uint64) []byte { - rt := make([]byte, 16) - binary.PutUvarint(rt, t) - return rt -} - -// DecodeResumeToken decodes a mock resume token into an uint64 integer. -func DecodeResumeToken(t []byte) (uint64, error) { - s, n := binary.Uvarint(t) - if n <= 0 { - return 0, fmt.Errorf("invalid resume token: %v", t) - } - return s, nil -} - -// ExecuteStreamingSql is a mock implementation of SpannerServer.ExecuteStreamingSql. -func (m *MockCloudSpanner) ExecuteStreamingSql(r *sppb.ExecuteSqlRequest, s sppb.Spanner_ExecuteStreamingSqlServer) error { - switch r.Sql { - case "SELECT * from t_unavailable": - return status.Errorf(codes.Unavailable, "mock table unavailable") - - case "UPDATE t SET x = 2 WHERE x = 1": - err := s.Send(&sppb.PartialResultSet{ - Stats: &sppb.ResultSetStats{RowCount: &sppb.ResultSetStats_RowCountLowerBound{3}}, - }) - if err != nil { - panic(err) - } - return nil - - case "SELECT t.key key, t.value value FROM t_mock t": - if r.ResumeToken != nil { - s, err := DecodeResumeToken(r.ResumeToken) - if err != nil { - return err - } - m.mu.Lock() - m.next = int(s) + 1 - m.mu.Unlock() - } - for { - msg, more := <-m.msgs - if !more { - break - } - if msg.Err == nil { - var rt []byte - if msg.ResumeToken { - m.mu.Lock() - rt = EncodeResumeToken(uint64(m.next)) - m.mu.Unlock() - } - meta := KvMeta - meta.Transaction = &sppb.Transaction{ - ReadTimestamp: &pbt.Timestamp{ - Seconds: m.readTs.Unix(), - Nanos: int32(m.readTs.Nanosecond()), - }, - } - m.mu.Lock() - next := m.next - m.next++ - m.mu.Unlock() - err := s.Send(&sppb.PartialResultSet{ - Metadata: &meta, - Values: []*proto3.Value{ - {Kind: &proto3.Value_StringValue{StringValue: fmt.Sprintf("foo-%02d", next)}}, - {Kind: &proto3.Value_StringValue{StringValue: fmt.Sprintf("bar-%02d", next)}}, - }, - ResumeToken: rt, - }) - if err != nil { - return err - } - continue - } - return msg.Err - } - return nil - default: - return fmt.Errorf("unsupported SQL: %v", r.Sql) - } -} - -// StreamingRead is a placeholder for SpannerServer.StreamingRead. -func (m *MockCloudSpanner) StreamingRead(r *sppb.ReadRequest, s sppb.Spanner_StreamingReadServer) error { - return s.Send(&sppb.PartialResultSet{}) -} - -// Serve runs a MockCloudSpanner listening on a random localhost address. -func (m *MockCloudSpanner) Serve() { - m.s = grpc.NewServer() - if m.addr == "" { - m.addr = "localhost:0" - } - lis, err := net.Listen("tcp", m.addr) - if err != nil { - m.t.Fatalf("Failed to listen: %v", err) - } - _, port, err := net.SplitHostPort(lis.Addr().String()) - if err != nil { - m.t.Fatalf("Failed to parse listener address: %v", err) - } - sppb.RegisterSpannerServer(m.s, m) - m.addr = "localhost:" + port - go m.s.Serve(lis) -} - -// BeginTransaction is a placeholder for SpannerServer.BeginTransaction. -func (m *MockCloudSpanner) BeginTransaction(_ context.Context, r *sppb.BeginTransactionRequest) (*sppb.Transaction, error) { - m.mu.Lock() - defer m.mu.Unlock() - return &sppb.Transaction{}, nil -} - -// Stop terminates MockCloudSpanner and closes the serving port. -func (m *MockCloudSpanner) Stop() { - m.s.Stop() -} - -// NewMockCloudSpanner creates a new MockCloudSpanner instance. -func NewMockCloudSpanner(t *testing.T, ts time.Time) *MockCloudSpanner { - mcs := &MockCloudSpanner{ - t: t, - msgs: make(chan MockCtlMsg, 1000), - readTs: ts, - sessions: map[string]*sppb.Session{}, - } - return mcs -} diff --git a/spanner/read_test.go b/spanner/read_test.go index 74c4122daed..ed33f660ea2 100644 --- a/spanner/read_test.go +++ b/spanner/read_test.go @@ -20,18 +20,18 @@ import ( "context" "errors" "fmt" - "io" "sync/atomic" "testing" "time" + vkit "cloud.google.com/go/spanner/apiv1" . "cloud.google.com/go/spanner/internal/testutil" "github.com/golang/protobuf/proto" proto3 "github.com/golang/protobuf/ptypes/struct" + structpb "github.com/golang/protobuf/ptypes/struct" "github.com/googleapis/gax-go/v2" "google.golang.org/api/iterator" sppb "google.golang.org/genproto/googleapis/spanner/v1" - "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -641,10 +641,11 @@ func TestRsdNonblockingStates(t *testing.T) { restore := setMaxBytesBetweenResumeTokens() defer restore() tests := []struct { - name string - msgs []MockCtlMsg - rpc func(ct context.Context, resumeToken []byte) (streamingReceiver, error) - sql string + name string + resumeTokens [][]byte + prsErrors []PartialResultSetExecutionTime + rpc func(ct context.Context, resumeToken []byte) (streamingReceiver, error) + sql string // Expected values want []*sppb.PartialResultSet // PartialResultSets that should be returned to caller queue []*sppb.PartialResultSet // PartialResultSets that should be buffered @@ -654,13 +655,9 @@ func TestRsdNonblockingStates(t *testing.T) { }{ { // unConnected->queueingRetryable->finished - name: "unConnected->queueingRetryable->finished", - msgs: []MockCtlMsg{ - {}, - {}, - {Err: io.EOF, ResumeToken: false}, - }, - sql: "SELECT t.key key, t.value value FROM t_mock t", + name: "unConnected->queueingRetryable->finished", + resumeTokens: make([][]byte, 2), + sql: "SELECT t.key key, t.value value FROM t_mock t", want: []*sppb.PartialResultSet{ { Metadata: kvMeta, @@ -688,13 +685,12 @@ func TestRsdNonblockingStates(t *testing.T) { }, { // unConnected->queueingRetryable->aborted - name: "unConnected->queueingRetryable->aborted", - msgs: []MockCtlMsg{ - {}, - {Err: nil, ResumeToken: true}, - {}, - {Err: errors.New("I quit"), ResumeToken: false}, - }, + name: "unConnected->queueingRetryable->aborted", + resumeTokens: [][]byte{{}, EncodeResumeToken(1), {}, EncodeResumeToken(2)}, + prsErrors: []PartialResultSetExecutionTime{{ + ResumeToken: EncodeResumeToken(2), + Err: status.Error(codes.Unknown, "I quit"), + }}, sql: "SELECT t.key key, t.value value FROM t_mock t", want: []*sppb.PartialResultSet{ { @@ -725,14 +721,9 @@ func TestRsdNonblockingStates(t *testing.T) { }, { // unConnected->queueingRetryable->queueingUnretryable->queueingUnretryable - name: "unConnected->queueingRetryable->queueingUnretryable->queueingUnretryable", - msgs: func() (m []MockCtlMsg) { - for i := 0; i < maxBuffers+1; i++ { - m = append(m, MockCtlMsg{}) - } - return m - }(), - sql: "SELECT t.key key, t.value value FROM t_mock t", + name: "unConnected->queueingRetryable->queueingUnretryable->queueingUnretryable", + resumeTokens: make([][]byte, maxBuffers+1), + sql: "SELECT t.key key, t.value value FROM t_mock t", want: func() (s []*sppb.PartialResultSet) { for i := 0; i < maxBuffers+1; i++ { s = append(s, &sppb.PartialResultSet{ @@ -760,13 +751,15 @@ func TestRsdNonblockingStates(t *testing.T) { { // unConnected->queueingRetryable->queueingUnretryable->aborted name: "unConnected->queueingRetryable->queueingUnretryable->aborted", - msgs: func() (m []MockCtlMsg) { - for i := 0; i < maxBuffers; i++ { - m = append(m, MockCtlMsg{}) - } - m = append(m, MockCtlMsg{Err: errors.New("Just Abort It"), ResumeToken: false}) - return m + resumeTokens: func() (rts [][]byte) { + rts = make([][]byte, maxBuffers+1) + rts[maxBuffers] = EncodeResumeToken(1) + return rts }(), + prsErrors: []PartialResultSetExecutionTime{{ + ResumeToken: EncodeResumeToken(1), + Err: status.Error(codes.Unknown, "Just Abort It"), + }}, sql: "SELECT t.key key, t.value value FROM t_mock t", want: func() (s []*sppb.PartialResultSet) { for i := 0; i < maxBuffers; i++ { @@ -794,12 +787,22 @@ func TestRsdNonblockingStates(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - ms := NewMockCloudSpanner(t, trxTs) - ms.Serve() - mc := sppb.NewSpannerClient(dialMock(t, ms)) + server, c, teardown := setupMockedTestServer(t) + defer teardown() + mc, err := c.sc.nextClient() + if err != nil { + t.Fatalf("failed to create a grpc client") + } + + session, err := createSession(mc) + if err != nil { + t.Fatalf("failed to create a session") + } + if test.rpc == nil { test.rpc = func(ct context.Context, resumeToken []byte) (streamingReceiver, error) { return mc.ExecuteStreamingSql(ct, &sppb.ExecuteSqlRequest{ + Session: session.Name, Sql: test.sql, ResumeToken: resumeToken, }) @@ -831,8 +834,16 @@ func TestRsdNonblockingStates(t *testing.T) { } } // Let mock server stream given messages to resumableStreamDecoder. - for _, m := range test.msgs { - ms.AddMsg(m.Err, m.ResumeToken) + err = setupStatementResult(t, server, test.sql, len(test.resumeTokens), test.resumeTokens) + if err != nil { + t.Fatalf("failed to set up a result for a statement: %v", err) + } + + for _, et := range test.prsErrors { + server.TestSpanner.AddPartialResultSetError( + test.sql, + et, + ) } var rs []*sppb.PartialResultSet for { @@ -891,10 +902,10 @@ func TestRsdBlockingStates(t *testing.T) { restore := setMaxBytesBetweenResumeTokens() defer restore() for _, test := range []struct { - name string - msgs []MockCtlMsg - rpc func(ct context.Context, resumeToken []byte) (streamingReceiver, error) - sql string + name string + resumeTokens [][]byte + rpc func(ct context.Context, resumeToken []byte) (streamingReceiver, error) + sql string // Expected values want []*sppb.PartialResultSet // PartialResultSets that should be returned to caller queue []*sppb.PartialResultSet // PartialResultSets that should be buffered @@ -920,14 +931,9 @@ func TestRsdBlockingStates(t *testing.T) { }, { // unConnected->queueingRetryable->queueingRetryable - name: "unConnected->queueingRetryable->queueingRetryable", - msgs: []MockCtlMsg{ - {}, - {Err: nil, ResumeToken: true}, - {Err: nil, ResumeToken: true}, - {}, - }, - sql: "SELECT t.key key, t.value value FROM t_mock t", + name: "unConnected->queueingRetryable->queueingRetryable", + resumeTokens: [][]byte{{}, EncodeResumeToken(1), EncodeResumeToken(2), {}}, + sql: "SELECT t.key key, t.value value FROM t_mock t", want: []*sppb.PartialResultSet{ { Metadata: kvMeta, @@ -952,6 +958,15 @@ func TestRsdBlockingStates(t *testing.T) { }, ResumeToken: EncodeResumeToken(2), }, + // The server sends an io.EOF at last and the decoder will + // flush out all messages in the internal queue. + { + Metadata: kvMeta, + Values: []*proto3.Value{ + {Kind: &proto3.Value_StringValue{StringValue: keyStr(3)}}, + {Kind: &proto3.Value_StringValue{StringValue: valStr(3)}}, + }, + }, }, queue: []*sppb.PartialResultSet{ { @@ -976,17 +991,20 @@ func TestRsdBlockingStates(t *testing.T) { { // unConnected->queueingRetryable->queueingUnretryable->queueingRetryable->queueingRetryable name: "unConnected->queueingRetryable->queueingUnretryable->queueingRetryable->queueingRetryable", - msgs: func() (m []MockCtlMsg) { - for i := 0; i < maxBuffers+1; i++ { - m = append(m, MockCtlMsg{}) - } - m = append(m, MockCtlMsg{Err: nil, ResumeToken: true}) - m = append(m, MockCtlMsg{}) - return m + resumeTokens: func() (rts [][]byte) { + rts = make([][]byte, maxBuffers+3) + rts[maxBuffers+1] = EncodeResumeToken(maxBuffers + 1) + return rts }(), sql: "SELECT t.key key, t.value value FROM t_mock t", want: func() (s []*sppb.PartialResultSet) { - for i := 0; i < maxBuffers+2; i++ { + // The server sends an io.EOF at last and the decoder will + // flush out all messages in the internal queue. Although the + // last message is supposed to be queued and the decoder waits + // for the next resume token, an io.EOF leads to a `finished` + // state that the last message will be removed from the queue + // and be read by the client side. + for i := 0; i < maxBuffers+3; i++ { s = append(s, &sppb.PartialResultSet{ Metadata: kvMeta, Values: []*proto3.Value{ @@ -1027,15 +1045,9 @@ func TestRsdBlockingStates(t *testing.T) { }, { // unConnected->queueingRetryable->queueingUnretryable->finished - name: "unConnected->queueingRetryable->queueingUnretryable->finished", - msgs: func() (m []MockCtlMsg) { - for i := 0; i < maxBuffers; i++ { - m = append(m, MockCtlMsg{}) - } - m = append(m, MockCtlMsg{Err: io.EOF, ResumeToken: false}) - return m - }(), - sql: "SELECT t.key key, t.value value FROM t_mock t", + name: "unConnected->queueingRetryable->queueingUnretryable->finished", + resumeTokens: make([][]byte, maxBuffers), + sql: "SELECT t.key key, t.value value FROM t_mock t", want: func() (s []*sppb.PartialResultSet) { for i := 0; i < maxBuffers; i++ { s = append(s, &sppb.PartialResultSet{ @@ -1060,16 +1072,25 @@ func TestRsdBlockingStates(t *testing.T) { }, } { t.Run(test.name, func(t *testing.T) { - ms := NewMockCloudSpanner(t, trxTs) - ms.Serve() - cc := dialMock(t, ms) - mc := sppb.NewSpannerClient(cc) + server, c, teardown := setupMockedTestServer(t) + defer teardown() + mc, err := c.sc.nextClient() + if err != nil { + t.Fatalf("failed to create a grpc client") + } + + session, err := createSession(mc) + if err != nil { + t.Fatalf("failed to create a session") + } + if test.rpc == nil { // Avoid using test.sql directly in closure because for loop changes // test. sql := test.sql test.rpc = func(ct context.Context, resumeToken []byte) (streamingReceiver, error) { return mc.ExecuteStreamingSql(ct, &sppb.ExecuteSqlRequest{ + Session: session.Name, Sql: sql, ResumeToken: resumeToken, }) @@ -1115,8 +1136,9 @@ func TestRsdBlockingStates(t *testing.T) { } } // Let mock server stream given messages to resumableStreamDecoder. - for _, m := range test.msgs { - ms.AddMsg(m.Err, m.ResumeToken) + err = setupStatementResult(t, server, test.sql, len(test.resumeTokens), test.resumeTokens) + if err != nil { + t.Fatalf("failed to set up a result for a statement: %v", err) } var rs []*sppb.PartialResultSet go func() { @@ -1157,10 +1179,6 @@ func TestRsdBlockingStates(t *testing.T) { case <-time.After(1 * time.Second): t.Fatal("Timeout in waiting for state change") } - ms.Stop() - if err := cc.Close(); err != nil { - t.Fatal(err) - } }) } } @@ -1199,23 +1217,39 @@ func (sr *sReceiver) waitn(n int) error { func TestQueueBytes(t *testing.T) { restore := setMaxBytesBetweenResumeTokens() defer restore() - ms := NewMockCloudSpanner(t, trxTs) - ms.Serve() - defer ms.Stop() - cc := dialMock(t, ms) - defer cc.Close() - mc := sppb.NewSpannerClient(cc) + + server, c, teardown := setupMockedTestServer(t) + defer teardown() + mc, err := c.sc.nextClient() + if err != nil { + t.Fatalf("failed to create a grpc client") + } + + rt1 := EncodeResumeToken(1) + rt2 := EncodeResumeToken(2) + rt3 := EncodeResumeToken(3) + resumeTokens := [][]byte{rt1, rt1, rt1, rt2, rt2, rt3} + err = setupStatementResult(t, server, "SELECT t.key key, t.value value FROM t_mock t", len(resumeTokens), resumeTokens) + if err != nil { + t.Fatalf("failed to set up a result for a statement: %v", err) + } + + session, err := createSession(mc) + if err != nil { + t.Fatalf("failed to create a session") + } + sr := &sReceiver{ c: make(chan int, 1000), // will never block in this test } - wantQueueBytes := 0 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - r := newResumableStreamDecoder( + decoder := newResumableStreamDecoder( ctx, nil, func(ct context.Context, resumeToken []byte) (streamingReceiver, error) { r, err := mc.ExecuteStreamingSql(ct, &sppb.ExecuteSqlRequest{ + Session: session.Name, Sql: "SELECT t.key key, t.value value FROM t_mock t", ResumeToken: resumeToken, }) @@ -1224,61 +1258,36 @@ func TestQueueBytes(t *testing.T) { }, nil, ) - go func() { - for r.next() { - } - }() - // Let server send maxBuffers / 2 rows. - for i := 0; i < maxBuffers/2; i++ { - wantQueueBytes += proto.Size(&sppb.PartialResultSet{ - Metadata: kvMeta, - Values: []*proto3.Value{ - {Kind: &proto3.Value_StringValue{StringValue: keyStr(i)}}, - {Kind: &proto3.Value_StringValue{StringValue: valStr(i)}}, - }, - }) - ms.AddMsg(nil, false) - } - if err := sr.waitn(maxBuffers/2 + 1); err != nil { - t.Fatalf("failed to wait for the first %v recv() calls: %v", maxBuffers, err) - } - if int32(wantQueueBytes) != r.bytesBetweenResumeTokens { - t.Errorf("r.bytesBetweenResumeTokens = %v, want %v", r.bytesBetweenResumeTokens, wantQueueBytes) - } - // Now send a resume token to drain the queue. - ms.AddMsg(nil, true) - // Wait for all rows to be processes. - if err := sr.waitn(1); err != nil { - t.Fatalf("failed to wait for rows to be processed: %v", err) - } - if r.bytesBetweenResumeTokens != 0 { - t.Errorf("r.bytesBetweenResumeTokens = %v, want 0", r.bytesBetweenResumeTokens) - } - // Let server send maxBuffers - 1 rows. - wantQueueBytes = 0 - for i := 0; i < maxBuffers-1; i++ { - wantQueueBytes += proto.Size(&sppb.PartialResultSet{ - Metadata: kvMeta, - Values: []*proto3.Value{ - {Kind: &proto3.Value_StringValue{StringValue: keyStr(i)}}, - {Kind: &proto3.Value_StringValue{StringValue: valStr(i)}}, - }, - }) - ms.AddMsg(nil, false) - } - if err := sr.waitn(maxBuffers - 1); err != nil { - t.Fatalf("failed to wait for %v rows to be processed: %v", maxBuffers-1, err) + + sizeOfPRS := proto.Size(&sppb.PartialResultSet{ + Metadata: kvMeta, + Values: []*proto3.Value{ + {Kind: &proto3.Value_StringValue{StringValue: keyStr(0)}}, + {Kind: &proto3.Value_StringValue{StringValue: valStr(0)}}, + }, + ResumeToken: rt1, + }) + + decoder.next() + decoder.next() + decoder.next() + if got, want := decoder.bytesBetweenResumeTokens, int32(2*sizeOfPRS); got != want { + t.Errorf("r.bytesBetweenResumeTokens = %v, want %v", got, want) } - if int32(wantQueueBytes) != r.bytesBetweenResumeTokens { - t.Errorf("r.bytesBetweenResumeTokens = %v, want 0", r.bytesBetweenResumeTokens) + + decoder.next() + if decoder.bytesBetweenResumeTokens != 0 { + t.Errorf("r.bytesBetweenResumeTokens = %v, want 0", decoder.bytesBetweenResumeTokens) } - // Trigger a state transition: queueingRetryable -> queueingUnretryable. - ms.AddMsg(nil, false) - if err := sr.waitn(1); err != nil { - t.Fatalf("failed to wait for state transition: %v", err) + + decoder.next() + if got, want := decoder.bytesBetweenResumeTokens, int32(sizeOfPRS); got != want { + t.Errorf("r.bytesBetweenResumeTokens = %v, want %v", got, want) } - if r.bytesBetweenResumeTokens != 0 { - t.Errorf("r.bytesBetweenResumeTokens = %v, want 0", r.bytesBetweenResumeTokens) + + decoder.next() + if decoder.bytesBetweenResumeTokens != 0 { + t.Errorf("r.bytesBetweenResumeTokens = %v, want 0", decoder.bytesBetweenResumeTokens) } } @@ -1286,23 +1295,59 @@ func TestQueueBytes(t *testing.T) { func TestResumeToken(t *testing.T) { restore := setMaxBytesBetweenResumeTokens() defer restore() - ms := NewMockCloudSpanner(t, trxTs) - ms.Serve() - defer ms.Stop() - cc := dialMock(t, ms) - defer cc.Close() - mc := sppb.NewSpannerClient(cc) + query := "SELECT t.key key, t.value value FROM t_mock t" + server, c, teardown := setupMockedTestServer(t) + defer teardown() + mc, err := c.sc.nextClient() + if err != nil { + t.Fatalf("failed to create a grpc client") + } + + rt1 := EncodeResumeToken(1) + rt2 := EncodeResumeToken(2) + resumeTokens := make([][]byte, 3+maxBuffers) + resumeTokens[1] = rt1 + resumeTokens[3+maxBuffers-1] = rt2 + err = setupStatementResult(t, server, query, len(resumeTokens), resumeTokens) + if err != nil { + t.Fatalf("failed to set up a result for a statement: %v", err) + } + + // The first error will be retried. + server.TestSpanner.AddPartialResultSetError( + query, + PartialResultSetExecutionTime{ + ResumeToken: rt1, + Err: status.Error(codes.Unavailable, "mock server unavailable"), + }, + ) + // The second error will not be retried because maxBytesBetweenResumeTokens + // is reached and the state of resumableStreamDecoder: + // queueingRetryable -> queueingUnretryable. The query will just fail. + server.TestSpanner.AddPartialResultSetError( + query, + PartialResultSetExecutionTime{ + ResumeToken: rt2, + Err: status.Error(codes.Unavailable, "mock server wants some sleep"), + }, + ) + + session, err := createSession(mc) + if err != nil { + t.Fatalf("failed to create a session") + } + sr := &sReceiver{ c: make(chan int, 1000), // will never block in this test } rows := []*Row{} - done := make(chan error) - streaming := func() { - // Establish a stream to mock cloud spanner server. - iter := stream(context.Background(), nil, + + streaming := func() *RowIterator { + return stream(context.Background(), nil, func(ct context.Context, resumeToken []byte) (streamingReceiver, error) { r, err := mc.ExecuteStreamingSql(ct, &sppb.ExecuteSqlRequest{ - Sql: "SELECT t.key key, t.value value FROM t_mock t", + Session: session.Name, + Sql: query, ResumeToken: resumeToken, }) sr.rpcReceiver = r @@ -1310,38 +1355,22 @@ func TestResumeToken(t *testing.T) { }, nil, func(error) {}) - defer iter.Stop() - var err error - for { - var row *Row - row, err = iter.Next() - if err == iterator.Done { - err = nil - break - } - if err != nil { - break - } - rows = append(rows, row) - } - done <- err } - go streaming() - // Server streaming row 0 - 2, only row 1 has resume token. - // Client will receive row 0 - 2, so it will try receiving for - // 4 times (the last recv will block), and only row 0 - 1 will - // be yielded. + + // Establish a stream to mock cloud spanner server. + iter := streaming() + defer iter.Stop() + var row *Row + + // Read first two rows. for i := 0; i < 3; i++ { - if i == 1 { - ms.AddMsg(nil, true) - } else { - ms.AddMsg(nil, false) + row, err = iter.Next() + if err != nil { + t.Fatalf("failed to get next value: %v", err) } + rows = append(rows, row) } - // Wait for 4 receive attempts, as explained above. - if err := sr.waitn(4); err != nil { - t.Fatalf("failed to wait for row 0 - 2: %v", err) - } + want := []*Row{ { fields: kvMeta.RowType.Fields, @@ -1357,119 +1386,69 @@ func TestResumeToken(t *testing.T) { {Kind: &proto3.Value_StringValue{StringValue: valStr(1)}}, }, }, - } - if !testEqual(rows, want) { - t.Errorf("received rows: \n%v\n; but want\n%v\n", rows, want) - } - // Inject resumable failure. - ms.AddMsg( - status.Errorf(codes.Unavailable, "mock server unavailable"), - false, - ) - // Test if client detects the resumable failure and retries. - if err := sr.waitn(1); err != nil { - t.Fatalf("failed to wait for client to retry: %v", err) - } - // Client has resumed the query, now server resend row 2. - ms.AddMsg(nil, true) - if err := sr.waitn(1); err != nil { - t.Fatalf("failed to wait for resending row 2: %v", err) - } - // Now client should have received row 0 - 2. - want = append(want, &Row{ - fields: kvMeta.RowType.Fields, - vals: []*proto3.Value{ - {Kind: &proto3.Value_StringValue{StringValue: keyStr(2)}}, - {Kind: &proto3.Value_StringValue{StringValue: valStr(2)}}, - }, - }) - if !testEqual(rows, want) { - t.Errorf("received rows: \n%v\n, want\n%v\n", rows, want) - } - // Sending 3rd - (maxBuffers+1)th rows without resume tokens, client should buffer them. - for i := 3; i < maxBuffers+2; i++ { - ms.AddMsg(nil, false) - } - if err := sr.waitn(maxBuffers - 1); err != nil { - t.Fatalf("failed to wait for row 3-%v: %v", maxBuffers+1, err) - } - // Received rows should be unchanged. - if !testEqual(rows, want) { - t.Errorf("receive rows: \n%v\n, want\n%v\n", rows, want) - } - // Send (maxBuffers+2)th row to trigger state change of resumableStreamDecoder: - // queueingRetryable -> queueingUnretryable - ms.AddMsg(nil, false) - if err := sr.waitn(1); err != nil { - t.Fatalf("failed to wait for row %v: %v", maxBuffers+2, err) - } - // Client should yield row 3rd - (maxBuffers+2)th to application. Therefore, - // application should see row 0 - (maxBuffers+2)th so far. - for i := 3; i < maxBuffers+3; i++ { - want = append(want, &Row{ + { fields: kvMeta.RowType.Fields, vals: []*proto3.Value{ - {Kind: &proto3.Value_StringValue{StringValue: keyStr(i)}}, - {Kind: &proto3.Value_StringValue{StringValue: valStr(i)}}, + {Kind: &proto3.Value_StringValue{StringValue: keyStr(2)}}, + {Kind: &proto3.Value_StringValue{StringValue: valStr(2)}}, }, - }) + }, } if !testEqual(rows, want) { - t.Errorf("received rows: \n%v\n; want\n%v\n", rows, want) + t.Errorf("received rows: \n%v\n; but want\n%v\n", rows, want) } - // Inject resumable error, but since resumableStreamDecoder is already at - // queueingUnretryable state, query will just fail. - ms.AddMsg( - status.Errorf(codes.Unavailable, "mock server wants some sleep"), - false, - ) - var gotErr error - select { - case gotErr = <-done: - case <-time.After(10 * time.Second): - t.Fatalf("timeout in waiting for failed query to return.") + + // Trigger state change of resumableStreamDecoder: + // queueingRetryable -> queueingUnretryable + for i := 0; i < maxBuffers-1; i++ { + row, err = iter.Next() + if err != nil { + t.Fatalf("failed to get next value: %v", err) + } + rows = append(rows, row) } - if wantErr := spannerErrorf(codes.Unavailable, "mock server wants some sleep"); !testEqual(gotErr, wantErr) { - t.Fatalf("stream() returns error: %v, but want error: %v", gotErr, wantErr) + + // Since resumableStreamDecoder is already at queueingUnretryable state, + // query will just fail. + _, err = iter.Next() + if wantErr := spannerErrorf(codes.Unavailable, "mock server wants some sleep"); !testEqual(err, wantErr) { + t.Fatalf("stream() returns error: %v, but want error: %v", err, wantErr) } - // Reconnect to mock Cloud Spanner. - rows = []*Row{} - go streaming() // Let server send two rows without resume token. - for i := maxBuffers + 3; i < maxBuffers+5; i++ { - ms.AddMsg(nil, false) - } - if err := sr.waitn(3); err != nil { - t.Fatalf("failed to wait for row %v - %v: %v", maxBuffers+3, maxBuffers+5, err) - } - if len(rows) > 0 { - t.Errorf("client received some rows unexpectedly: %v, want nothing", rows) - } - // Let server end the query. - ms.AddMsg(io.EOF, false) - select { - case gotErr = <-done: - case <-time.After(10 * time.Second): - t.Fatalf("timeout in waiting for failed query to return") + resumeTokens = make([][]byte, 2) + err = setupStatementResult(t, server, query, len(resumeTokens), resumeTokens) + if err != nil { + t.Fatalf("failed to set up a result for a statement: %v", err) } - if gotErr != nil { - t.Fatalf("stream() returns unexpected error: %v, but want no error", gotErr) + + // Reconnect to mock Cloud Spanner. + rows = []*Row{} + iter = streaming() + defer iter.Stop() + + for i := 0; i < 2; i++ { + row, err = iter.Next() + if err != nil { + t.Fatalf("failed to get next value: %v", err) + } + rows = append(rows, row) } + // Verify if a normal server side EOF flushes all queued rows. want = []*Row{ { fields: kvMeta.RowType.Fields, vals: []*proto3.Value{ - {Kind: &proto3.Value_StringValue{StringValue: keyStr(maxBuffers + 3)}}, - {Kind: &proto3.Value_StringValue{StringValue: valStr(maxBuffers + 3)}}, + {Kind: &proto3.Value_StringValue{StringValue: keyStr(0)}}, + {Kind: &proto3.Value_StringValue{StringValue: valStr(0)}}, }, }, { fields: kvMeta.RowType.Fields, vals: []*proto3.Value{ - {Kind: &proto3.Value_StringValue{StringValue: keyStr(maxBuffers + 4)}}, - {Kind: &proto3.Value_StringValue{StringValue: valStr(maxBuffers + 4)}}, + {Kind: &proto3.Value_StringValue{StringValue: keyStr(1)}}, + {Kind: &proto3.Value_StringValue{StringValue: valStr(1)}}, }, }, } @@ -1483,66 +1462,57 @@ func TestResumeToken(t *testing.T) { func TestGrpcReconnect(t *testing.T) { restore := setMaxBytesBetweenResumeTokens() defer restore() - ms := NewMockCloudSpanner(t, trxTs) - ms.Serve() - defer ms.Stop() - cc := dialMock(t, ms) - defer cc.Close() - mc := sppb.NewSpannerClient(cc) - retry := make(chan int) - row := make(chan int) - var err error - go func() { - r := 0 - // Establish a stream to mock cloud spanner server. - iter := stream(context.Background(), nil, - func(ct context.Context, resumeToken []byte) (streamingReceiver, error) { - if r > 0 { - // This RPC attempt is a retry, signal it. - retry <- r - } - r++ - return mc.ExecuteStreamingSql(ct, &sppb.ExecuteSqlRequest{ - Sql: "SELECT t.key key, t.value value FROM t_mock t", - ResumeToken: resumeToken, - }) - }, - nil, - func(error) {}) - defer iter.Stop() - for { - _, err = iter.Next() - if err == iterator.Done { - err = nil - break - } - if err != nil { - break - } - row <- 0 - } - }() - // Add a message and wait for the receipt. - ms.AddMsg(nil, true) - select { - case <-row: - case <-time.After(10 * time.Second): - t.Fatalf("expect stream to be established within 10 seconds, but it didn't") + server, c, teardown := setupMockedTestServer(t) + defer teardown() + mc, err := c.sc.nextClient() + if err != nil { + t.Fatalf("failed to create a grpc client") } - // Error injection: force server to close all connections. - ms.Stop() - // Test to see if client respond to the real RPC failure correctly by - // retrying RPC. - select { - case r, ok := <-retry: - if ok && r == 1 { + + session, err := createSession(mc) + if err != nil { + t.Fatalf("failed to create a session") + } + + // Simulate an unavailable error to interrupt the stream of PartialResultSet + // in order to test the grpc retrying mechanism. + server.TestSpanner.AddPartialResultSetError( + SelectSingerIDAlbumIDAlbumTitleFromAlbums, + PartialResultSetExecutionTime{ + ResumeToken: EncodeResumeToken(2), + Err: status.Errorf(codes.Unavailable, "server is unavailable"), + }, + ) + + // The retry is counted from the second call. + r := -1 + // Establish a stream to mock cloud spanner server. + iter := stream(context.Background(), nil, + func(ct context.Context, resumeToken []byte) (streamingReceiver, error) { + r++ + return mc.ExecuteStreamingSql(ct, &sppb.ExecuteSqlRequest{ + Session: session.Name, + Sql: SelectSingerIDAlbumIDAlbumTitleFromAlbums, + ResumeToken: resumeToken, + }) + + }, + nil, + func(error) {}) + defer iter.Stop() + for { + _, err := iter.Next() + if err == iterator.Done { + err = nil + break + } + if err != nil { break } + } + if r != 1 { t.Errorf("retry count = %v, want 1", r) - case <-time.After(10 * time.Second): - t.Errorf("client library failed to respond after 10 seconds, aborting") - return } } @@ -1550,27 +1520,32 @@ func TestGrpcReconnect(t *testing.T) { func TestCancelTimeout(t *testing.T) { restore := setMaxBytesBetweenResumeTokens() defer restore() - ms := NewMockCloudSpanner(t, trxTs) - ms.Serve() - defer ms.Stop() - cc := dialMock(t, ms) - defer cc.Close() - mc := sppb.NewSpannerClient(cc) + server, c, teardown := setupMockedTestServer(t) + defer teardown() + server.TestSpanner.PutExecutionTime( + MethodExecuteStreamingSql, + SimulatedExecutionTime{MinimumExecutionTime: 1 * time.Second}, + ) + mc, err := c.sc.nextClient() + if err != nil { + t.Fatalf("failed to create a grpc client") + } + + session, err := createSession(mc) + if err != nil { + t.Fatalf("failed to create a session") + } done := make(chan int) - go func() { - for { - ms.AddMsg(nil, true) - } - }() + // Test cancelling query. ctx, cancel := context.WithCancel(context.Background()) - var err error go func() { // Establish a stream to mock cloud spanner server. iter := stream(ctx, nil, func(ct context.Context, resumeToken []byte) (streamingReceiver, error) { return mc.ExecuteStreamingSql(ct, &sppb.ExecuteSqlRequest{ - Sql: "SELECT t.key key, t.value value FROM t_mock t", + Session: session.Name, + Sql: SelectSingerIDAlbumIDAlbumTitleFromAlbums, ResumeToken: resumeToken, }) }, @@ -1597,15 +1572,17 @@ func TestCancelTimeout(t *testing.T) { case <-time.After(1 * time.Second): t.Errorf("query doesn't exit timely after being cancelled") } + // Test query timeout. - ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second) + ctx, cancel = context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() go func() { // Establish a stream to mock cloud spanner server. iter := stream(ctx, nil, func(ct context.Context, resumeToken []byte) (streamingReceiver, error) { return mc.ExecuteStreamingSql(ct, &sppb.ExecuteSqlRequest{ - Sql: "SELECT t.key key, t.value value FROM t_mock t", + Session: session.Name, + Sql: SelectSingerIDAlbumIDAlbumTitleFromAlbums, ResumeToken: resumeToken, }) }, @@ -1634,31 +1611,64 @@ func TestCancelTimeout(t *testing.T) { } } +func setupStatementResult(t *testing.T, server *MockedSpannerInMemTestServer, stmt string, rowCount int, resumeTokens [][]byte) error { + selectValues := make([][]string, rowCount) + for i := 0; i < rowCount; i++ { + selectValues[i] = []string{keyStr(i), valStr(i)} + } + + rows := make([]*structpb.ListValue, len(selectValues)) + for i, values := range selectValues { + rowValues := make([]*structpb.Value, len(kvMeta.RowType.Fields)) + for j, value := range values { + rowValues[j] = &structpb.Value{ + Kind: &structpb.Value_StringValue{StringValue: value}, + } + } + rows[i] = &structpb.ListValue{ + Values: rowValues, + } + } + resultSet := &sppb.ResultSet{ + Metadata: kvMeta, + Rows: rows, + } + result := &StatementResult{ + Type: StatementResultResultSet, + ResultSet: resultSet, + ResumeTokens: resumeTokens, + } + return server.TestSpanner.PutStatementResult(stmt, result) +} + func TestRowIteratorDo(t *testing.T) { restore := setMaxBytesBetweenResumeTokens() defer restore() - ms := NewMockCloudSpanner(t, trxTs) - ms.Serve() - defer ms.Stop() - cc := dialMock(t, ms) - defer cc.Close() - mc := sppb.NewSpannerClient(cc) - for i := 0; i < 3; i++ { - ms.AddMsg(nil, false) + _, c, teardown := setupMockedTestServer(t) + defer teardown() + mc, err := c.sc.nextClient() + if err != nil { + t.Fatalf("failed to create a grpc client") + } + + session, err := createSession(mc) + if err != nil { + t.Fatalf("failed to create a session") } - ms.AddMsg(io.EOF, true) + nRows := 0 iter := stream(context.Background(), nil, func(ct context.Context, resumeToken []byte) (streamingReceiver, error) { return mc.ExecuteStreamingSql(ct, &sppb.ExecuteSqlRequest{ - Sql: "SELECT t.key key, t.value value FROM t_mock t", + Session: session.Name, + Sql: SelectSingerIDAlbumIDAlbumTitleFromAlbums, ResumeToken: resumeToken, }) }, nil, func(error) {}) - err := iter.Do(func(r *Row) error { nRows++; return nil }) + err = iter.Do(func(r *Row) error { nRows++; return nil }) if err != nil { t.Errorf("Using Do: %v", err) } @@ -1670,28 +1680,31 @@ func TestRowIteratorDo(t *testing.T) { func TestRowIteratorDoWithError(t *testing.T) { restore := setMaxBytesBetweenResumeTokens() defer restore() - ms := NewMockCloudSpanner(t, trxTs) - ms.Serve() - defer ms.Stop() - cc := dialMock(t, ms) - defer cc.Close() - mc := sppb.NewSpannerClient(cc) - for i := 0; i < 3; i++ { - ms.AddMsg(nil, false) + _, c, teardown := setupMockedTestServer(t) + defer teardown() + mc, err := c.sc.nextClient() + if err != nil { + t.Fatalf("failed to create a grpc client") + } + + session, err := createSession(mc) + if err != nil { + t.Fatalf("failed to create a session") } - ms.AddMsg(io.EOF, true) + iter := stream(context.Background(), nil, func(ct context.Context, resumeToken []byte) (streamingReceiver, error) { return mc.ExecuteStreamingSql(ct, &sppb.ExecuteSqlRequest{ - Sql: "SELECT t.key key, t.value value FROM t_mock t", + Session: session.Name, + Sql: SelectSingerIDAlbumIDAlbumTitleFromAlbums, ResumeToken: resumeToken, }) }, nil, func(error) {}) injected := errors.New("Failed iterator") - err := iter.Do(func(r *Row) error { return injected }) + err = iter.Do(func(r *Row) error { return injected }) if err != injected { t.Errorf("got <%v>, want <%v>", err, injected) } @@ -1701,27 +1714,30 @@ func TestIteratorStopEarly(t *testing.T) { ctx := context.Background() restore := setMaxBytesBetweenResumeTokens() defer restore() - ms := NewMockCloudSpanner(t, trxTs) - ms.Serve() - defer ms.Stop() - cc := dialMock(t, ms) - defer cc.Close() - mc := sppb.NewSpannerClient(cc) - ms.AddMsg(nil, false) - ms.AddMsg(nil, false) - ms.AddMsg(io.EOF, true) + _, c, teardown := setupMockedTestServer(t) + defer teardown() + mc, err := c.sc.nextClient() + if err != nil { + t.Fatalf("failed to create a grpc client") + } + + session, err := createSession(mc) + if err != nil { + t.Fatalf("failed to create a session") + } iter := stream(ctx, nil, func(ct context.Context, resumeToken []byte) (streamingReceiver, error) { return mc.ExecuteStreamingSql(ct, &sppb.ExecuteSqlRequest{ - Sql: "SELECT t.key key, t.value value FROM t_mock t", + Session: session.Name, + Sql: SelectSingerIDAlbumIDAlbumTitleFromAlbums, ResumeToken: resumeToken, }) }, nil, func(error) {}) - _, err := iter.Next() + _, err = iter.Next() if err != nil { t.Fatalf("before Stop: %v", err) } @@ -1742,10 +1758,10 @@ func TestIteratorWithError(t *testing.T) { } } -func dialMock(t *testing.T, ms *MockCloudSpanner) *grpc.ClientConn { - cc, err := grpc.Dial(ms.Addr(), grpc.WithInsecure(), grpc.WithBlock()) - if err != nil { - t.Fatalf("Dial(%q) = %v", ms.Addr(), err) +func createSession(client *vkit.Client) (*sppb.Session, error) { + var formattedDatabase string = fmt.Sprintf("projects/%s/instances/%s/databases/%s", "[PROJECT]", "[INSTANCE]", "[DATABASE]") + var request = &sppb.CreateSessionRequest{ + Database: formattedDatabase, } - return cc + return client.CreateSession(context.Background(), request) }