diff --git a/spanner/internal/testutil/inmem_spanner_server.go b/spanner/internal/testutil/inmem_spanner_server.go index 6583c656c114..416ff8ad40f0 100644 --- a/spanner/internal/testutil/inmem_spanner_server.go +++ b/spanner/internal/testutil/inmem_spanner_server.go @@ -70,10 +70,11 @@ const ( // StatementResult represents a mocked result on the test server. The result is // either of: a ResultSet, an update count or an error. type StatementResult struct { - Type StatementResultType - Err error - ResultSet *spannerpb.ResultSet - UpdateCount int64 + Type StatementResultType + Err error + ResultSet *spannerpb.ResultSet + UpdateCount int64 + SameResumeToken bool } // PartialResultSetExecutionTime represents execution times and errors that @@ -85,10 +86,10 @@ type PartialResultSetExecutionTime struct { Err error } -// Converts a ResultSet to a PartialResultSet. This method is used to convert -// a mocked result to a PartialResultSet when one of the streaming methods are -// called. -func (s *StatementResult) toPartialResultSets(resumeToken []byte) (result []*spannerpb.PartialResultSet, err error) { +// ToPartialResultSets converts a ResultSet to a PartialResultSet. This method +// is used to convert a mocked result to a PartialResultSet when one of the +// streaming methods are called. +func (s *StatementResult) ToPartialResultSets(resumeToken []byte) (result []*spannerpb.PartialResultSet, err error) { var startIndex uint64 if len(resumeToken) > 0 { if startIndex, err = DecodeResumeToken(resumeToken); err != nil { @@ -109,10 +110,14 @@ func (s *StatementResult) toPartialResultSets(resumeToken []byte) (result []*spa idx++ } } + rt := EncodeResumeToken(startIndex + rowCount) + if s.SameResumeToken { + rt = resumeToken + } result = append(result, &spannerpb.PartialResultSet{ Metadata: s.ResultSet.Metadata, Values: values, - ResumeToken: EncodeResumeToken(startIndex + rowCount), + ResumeToken: rt, }) startIndex += rowCount if startIndex == totalRows { @@ -796,7 +801,7 @@ func (s *inMemSpannerServer) ExecuteStreamingSql(req *spannerpb.ExecuteSqlReques case StatementResultError: return statementResult.Err case StatementResultResultSet: - parts, err := statementResult.toPartialResultSets(req.ResumeToken) + parts, err := statementResult.ToPartialResultSets(req.ResumeToken) if err != nil { return err } @@ -808,7 +813,7 @@ func (s *inMemSpannerServer) ExecuteStreamingSql(req *spannerpb.ExecuteSqlReques s.partialResultSetErrors[req.Sql] = pErrors[1:] } s.mu.Unlock() - for _, part := range parts { + for i, part := range parts { if nextPartialResultSetError != nil && bytes.Equal(part.ResumeToken, nextPartialResultSetError.ResumeToken) { if nextPartialResultSetError.ExecutionTime > 0 { <-time.After(nextPartialResultSetError.ExecutionTime) @@ -820,7 +825,9 @@ func (s *inMemSpannerServer) ExecuteStreamingSql(req *spannerpb.ExecuteSqlReques if err := stream.Send(part); err != nil { return err } + fmt.Println("Send i:", i) } + fmt.Println("Send an empty result set") return nil case StatementResultUpdateCount: part := statementResult.updateCountToPartialResultSet(!isPartitionedDml) diff --git a/spanner/internal/testutil/mockclient.go b/spanner/internal/testutil/mockclient.go deleted file mode 100644 index ed912b7dab3d..000000000000 --- a/spanner/internal/testutil/mockclient.go +++ /dev/null @@ -1,308 +0,0 @@ -/* -Copyright 2017 Google LLC - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package testutil - -import ( - "context" - "errors" - "fmt" - "sync" - "testing" - "time" - - "github.com/golang/protobuf/proto" - "github.com/golang/protobuf/ptypes/empty" - proto3 "github.com/golang/protobuf/ptypes/struct" - pbt "github.com/golang/protobuf/ptypes/timestamp" - pbs "google.golang.org/genproto/googleapis/rpc/status" - sppb "google.golang.org/genproto/googleapis/spanner/v1" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" -) - -// MockCloudSpannerClient is a mock implementation of sppb.SpannerClient. -type MockCloudSpannerClient struct { - sppb.SpannerClient - - mu sync.Mutex - t *testing.T - // Live sessions on the client. - sessions map[string]bool - // Session ping history. - pings []string - // Client will stall on any requests. - freezed chan struct{} - - // Expected set of actions that have been executed by the client. These - // interfaces should be type reflected against with *Request types in sppb, - // such as sppb.GetSessionRequest. Buffered to a large degree. - ReceivedRequests chan interface{} -} - -// NewMockCloudSpannerClient creates new MockCloudSpannerClient instance. -func NewMockCloudSpannerClient(t *testing.T) *MockCloudSpannerClient { - mc := &MockCloudSpannerClient{ - t: t, - sessions: map[string]bool{}, - ReceivedRequests: make(chan interface{}, 100000), - } - - // Produce a closed channel, so the default action of ready is to not block. - mc.Freeze() - mc.Unfreeze() - - return mc -} - -// DumpPings dumps the ping history. -func (m *MockCloudSpannerClient) DumpPings() []string { - m.mu.Lock() - defer m.mu.Unlock() - return append([]string(nil), m.pings...) -} - -// DumpSessions dumps the internal session table. -func (m *MockCloudSpannerClient) DumpSessions() map[string]bool { - m.mu.Lock() - defer m.mu.Unlock() - st := map[string]bool{} - for s, v := range m.sessions { - st[s] = v - } - return st -} - -// CreateSession is a placeholder for SpannerClient.CreateSession. -func (m *MockCloudSpannerClient) CreateSession(ctx context.Context, r *sppb.CreateSessionRequest, opts ...grpc.CallOption) (*sppb.Session, error) { - m.ready() - m.ReceivedRequests <- r - - m.mu.Lock() - defer m.mu.Unlock() - s := &sppb.Session{} - if r.Database != "mockdb" { - // Reject other databases - return s, status.Errorf(codes.NotFound, fmt.Sprintf("database not found: %v", r.Database)) - } - // Generate & record session name. - s.Name = fmt.Sprintf("mockdb-%v", time.Now().UnixNano()) - m.sessions[s.Name] = true - return s, nil -} - -// GetSession is a placeholder for SpannerClient.GetSession. -func (m *MockCloudSpannerClient) GetSession(ctx context.Context, r *sppb.GetSessionRequest, opts ...grpc.CallOption) (*sppb.Session, error) { - m.ready() - m.ReceivedRequests <- r - - m.mu.Lock() - defer m.mu.Unlock() - m.pings = append(m.pings, r.Name) - if _, ok := m.sessions[r.Name]; !ok { - return nil, newSessionNotFoundError(r.Name) - } - return &sppb.Session{Name: r.Name}, nil -} - -// DeleteSession is a placeholder for SpannerClient.DeleteSession. -func (m *MockCloudSpannerClient) DeleteSession(ctx context.Context, r *sppb.DeleteSessionRequest, opts ...grpc.CallOption) (*empty.Empty, error) { - m.ready() - m.ReceivedRequests <- r - - m.mu.Lock() - defer m.mu.Unlock() - if _, ok := m.sessions[r.Name]; !ok { - // Session not found. - return &empty.Empty{}, newSessionNotFoundError(r.Name) - } - // Delete session from in-memory table. - delete(m.sessions, r.Name) - return &empty.Empty{}, nil -} - -// ExecuteSql is a placeholder for SpannerClient.ExecuteSql. -func (m *MockCloudSpannerClient) ExecuteSql(ctx context.Context, r *sppb.ExecuteSqlRequest, opts ...grpc.CallOption) (*sppb.ResultSet, error) { - m.ready() - m.ReceivedRequests <- r - - m.mu.Lock() - defer m.mu.Unlock() - return &sppb.ResultSet{Stats: &sppb.ResultSetStats{RowCount: &sppb.ResultSetStats_RowCountExact{7}}}, nil -} - -// ExecuteBatchDml is a placeholder for SpannerClient.ExecuteBatchDml. -func (m *MockCloudSpannerClient) ExecuteBatchDml(ctx context.Context, r *sppb.ExecuteBatchDmlRequest, opts ...grpc.CallOption) (*sppb.ExecuteBatchDmlResponse, error) { - m.ready() - m.ReceivedRequests <- r - - m.mu.Lock() - defer m.mu.Unlock() - return &sppb.ExecuteBatchDmlResponse{Status: &pbs.Status{Code: 0}, ResultSets: []*sppb.ResultSet{}}, nil -} - -// ExecuteStreamingSql is a mock implementation of SpannerClient.ExecuteStreamingSql. -func (m *MockCloudSpannerClient) ExecuteStreamingSql(ctx context.Context, r *sppb.ExecuteSqlRequest, opts ...grpc.CallOption) (sppb.Spanner_ExecuteStreamingSqlClient, error) { - m.ready() - m.ReceivedRequests <- r - - m.mu.Lock() - defer m.mu.Unlock() - wantReq := &sppb.ExecuteSqlRequest{ - Session: "mocksession", - Transaction: &sppb.TransactionSelector{ - Selector: &sppb.TransactionSelector_SingleUse{ - SingleUse: &sppb.TransactionOptions{ - Mode: &sppb.TransactionOptions_ReadOnly_{ - ReadOnly: &sppb.TransactionOptions_ReadOnly{ - TimestampBound: &sppb.TransactionOptions_ReadOnly_Strong{ - Strong: true, - }, - ReturnReadTimestamp: false, - }, - }, - }, - }, - }, - Sql: "mockquery", - Params: &proto3.Struct{ - Fields: map[string]*proto3.Value{"var1": {Kind: &proto3.Value_StringValue{StringValue: "abc"}}}, - }, - ParamTypes: map[string]*sppb.Type{"var1": {Code: sppb.TypeCode_STRING}}, - } - if !proto.Equal(r, wantReq) { - return nil, fmt.Errorf("got query request: %v, want: %v", r, wantReq) - } - return nil, errors.New("query never succeeds on mock client") -} - -// StreamingRead is a placeholder for SpannerClient.StreamingRead. -func (m *MockCloudSpannerClient) StreamingRead(ctx context.Context, r *sppb.ReadRequest, opts ...grpc.CallOption) (sppb.Spanner_StreamingReadClient, error) { - m.ready() - m.ReceivedRequests <- r - - m.mu.Lock() - defer m.mu.Unlock() - wantReq := &sppb.ReadRequest{ - Session: "mocksession", - Transaction: &sppb.TransactionSelector{ - Selector: &sppb.TransactionSelector_SingleUse{ - SingleUse: &sppb.TransactionOptions{ - Mode: &sppb.TransactionOptions_ReadOnly_{ - ReadOnly: &sppb.TransactionOptions_ReadOnly{ - TimestampBound: &sppb.TransactionOptions_ReadOnly_Strong{ - Strong: true, - }, - ReturnReadTimestamp: false, - }, - }, - }, - }, - }, - Table: "t_mock", - Columns: []string{"col1", "col2"}, - KeySet: &sppb.KeySet{ - Keys: []*proto3.ListValue{ - { - Values: []*proto3.Value{ - {Kind: &proto3.Value_StringValue{StringValue: "foo"}}, - }, - }, - }, - Ranges: []*sppb.KeyRange{}, - All: false, - }, - } - if !proto.Equal(r, wantReq) { - return nil, fmt.Errorf("got query request: %v, want: %v", r, wantReq) - } - return nil, errors.New("read never succeeds on mock client") -} - -// BeginTransaction is a placeholder for SpannerClient.BeginTransaction. -func (m *MockCloudSpannerClient) BeginTransaction(ctx context.Context, r *sppb.BeginTransactionRequest, opts ...grpc.CallOption) (*sppb.Transaction, error) { - m.ready() - m.ReceivedRequests <- r - - m.mu.Lock() - defer m.mu.Unlock() - resp := &sppb.Transaction{Id: []byte("transaction-1")} - if _, ok := r.Options.Mode.(*sppb.TransactionOptions_ReadOnly_); ok { - resp.ReadTimestamp = &pbt.Timestamp{Seconds: 3, Nanos: 4} - } - return resp, nil -} - -// Commit is a placeholder for SpannerClient.Commit. -func (m *MockCloudSpannerClient) Commit(ctx context.Context, r *sppb.CommitRequest, opts ...grpc.CallOption) (*sppb.CommitResponse, error) { - m.ready() - m.ReceivedRequests <- r - - m.mu.Lock() - defer m.mu.Unlock() - return &sppb.CommitResponse{CommitTimestamp: &pbt.Timestamp{Seconds: 1, Nanos: 2}}, nil -} - -// Rollback is a placeholder for SpannerClient.Rollback. -func (m *MockCloudSpannerClient) Rollback(ctx context.Context, r *sppb.RollbackRequest, opts ...grpc.CallOption) (*empty.Empty, error) { - m.ready() - m.ReceivedRequests <- r - - m.mu.Lock() - defer m.mu.Unlock() - return nil, nil -} - -// PartitionQuery is a placeholder for SpannerServer.PartitionQuery. -func (m *MockCloudSpannerClient) PartitionQuery(ctx context.Context, r *sppb.PartitionQueryRequest, opts ...grpc.CallOption) (*sppb.PartitionResponse, error) { - m.ready() - m.ReceivedRequests <- r - - return nil, errors.New("Unimplemented") -} - -// PartitionRead is a placeholder for SpannerServer.PartitionRead. -func (m *MockCloudSpannerClient) PartitionRead(ctx context.Context, r *sppb.PartitionReadRequest, opts ...grpc.CallOption) (*sppb.PartitionResponse, error) { - m.ready() - m.ReceivedRequests <- r - - return nil, errors.New("Unimplemented") -} - -// Freeze stalls all requests. -func (m *MockCloudSpannerClient) Freeze() { - m.mu.Lock() - defer m.mu.Unlock() - m.freezed = make(chan struct{}) -} - -// Unfreeze restores processing requests. -func (m *MockCloudSpannerClient) Unfreeze() { - m.mu.Lock() - defer m.mu.Unlock() - close(m.freezed) -} - -// ready checks conditions before executing requests -// TODO: add checks for injected errors, actions -func (m *MockCloudSpannerClient) ready() { - m.mu.Lock() - freezed := m.freezed - m.mu.Unlock() - // check if client should be freezed - <-freezed -} diff --git a/spanner/oc_test.go b/spanner/oc_test.go index 6ab39fa70e3e..ff2de2b3abbd 100644 --- a/spanner/oc_test.go +++ b/spanner/oc_test.go @@ -20,9 +20,6 @@ import ( "time" "cloud.google.com/go/internal/testutil" - stestutil "cloud.google.com/go/spanner/internal/testutil" - "google.golang.org/api/option" - "google.golang.org/grpc" ) // Check that stats are being exported. @@ -32,22 +29,10 @@ func TestOCStats(t *testing.T) { te := testutil.NewTestExporter() defer te.Unregister() - ms := stestutil.NewMockCloudSpanner(t, trxTs) - ms.Serve() - ctx := context.Background() - c, err := NewClientWithConfig(ctx, "projects/P/instances/I/databases/D", - ClientConfig{SessionPoolConfig: SessionPoolConfig{ - MinOpened: 0, - }}, - option.WithEndpoint(ms.Addr()), - option.WithGRPCDialOption(grpc.WithInsecure()), - option.WithoutAuthentication()) - if err != nil { - t.Fatal(err) - } - defer c.Close() + _, c, teardown := setupMockedTestServer(t) + defer teardown() - c.Single().ReadRow(ctx, "Users", Key{"alice"}, []string{"email"}) + c.Single().ReadRow(context.Background(), "Users", Key{"alice"}, []string{"email"}) // Wait until we see data from the view. select { case <-te.Stats: diff --git a/spanner/read.go b/spanner/read.go index f0d16345565f..f152649b3679 100644 --- a/spanner/read.go +++ b/spanner/read.go @@ -19,6 +19,7 @@ package spanner import ( "bytes" "context" + "fmt" "io" "log" "sync/atomic" @@ -383,6 +384,8 @@ func (d *resumableStreamDecoder) changeState(target resumableStreamDecoderState) if d.state == queueingRetryable && d.state != target { // Reset bytesBetweenResumeTokens because it is only meaningful/changed // under queueingRetryable state. + fmt.Println("target:", target) + fmt.Println("d.bytesBetweenResumeTokens is reset to 0:", d.bytesBetweenResumeTokens) d.bytesBetweenResumeTokens = 0 } d.state = target @@ -552,15 +555,20 @@ func (d *resumableStreamDecoder) next() bool { func (d *resumableStreamDecoder) tryRecv(retryer gax.Retryer) { var res *sppb.PartialResultSet res, d.err = d.stream.Recv() + fmt.Println("res:", res, "d.err:", d.err) if d.err == nil { d.q.push(res) + fmt.Println("d.state == queueingRetryable:", d.state == queueingRetryable) + fmt.Println("!d.isNewResumeToken(res.ResumeToken)", !d.isNewResumeToken(res.ResumeToken)) if d.state == queueingRetryable && !d.isNewResumeToken(res.ResumeToken) { d.bytesBetweenResumeTokens += int32(proto.Size(res)) } + fmt.Println("d.bytesBetweenResumeTokens:", d.bytesBetweenResumeTokens) d.changeState(d.state) return } if d.err == io.EOF { + fmt.Println("d.err is io.EOF") d.err = nil d.changeState(finished) return diff --git a/spanner/read_test.go b/spanner/read_test.go index 74c4122daeda..c1754c447a36 100644 --- a/spanner/read_test.go +++ b/spanner/read_test.go @@ -28,6 +28,7 @@ import ( . "cloud.google.com/go/spanner/internal/testutil" "github.com/golang/protobuf/proto" proto3 "github.com/golang/protobuf/ptypes/struct" + structpb "github.com/golang/protobuf/ptypes/struct" "github.com/googleapis/gax-go/v2" "google.golang.org/api/iterator" sppb "google.golang.org/genproto/googleapis/spanner/v1" @@ -1199,16 +1200,30 @@ func (sr *sReceiver) waitn(n int) error { func TestQueueBytes(t *testing.T) { restore := setMaxBytesBetweenResumeTokens() defer restore() - ms := NewMockCloudSpanner(t, trxTs) - ms.Serve() - defer ms.Stop() - cc := dialMock(t, ms) - defer cc.Close() - mc := sppb.NewSpannerClient(cc) + + server, c, teardown := setupMockedTestServer(t) + defer teardown() + mc, err := c.sc.nextClient() + if err != nil { + t.Fatalf("failed to create a grpc client") + } + + // Let server send maxBuffers / 2 rows. + wantQueueBytes := setupStatementResult(t, server, maxBuffers/2, true) + fmt.Println("wantQueueBytes:", wantQueueBytes) + + var formattedDatabase string = fmt.Sprintf("projects/%s/instances/%s/databases/%s", "[PROJECT]", "[INSTANCE]", "[DATABASE]") + var request = &sppb.CreateSessionRequest{ + Database: formattedDatabase, + } + session, err := mc.CreateSession(context.Background(), request) + if err != nil { + t.Fatalf("failed to create a session") + } + sr := &sReceiver{ c: make(chan int, 1000), // will never block in this test } - wantQueueBytes := 0 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() r := newResumableStreamDecoder( @@ -1216,6 +1231,7 @@ func TestQueueBytes(t *testing.T) { nil, func(ct context.Context, resumeToken []byte) (streamingReceiver, error) { r, err := mc.ExecuteStreamingSql(ct, &sppb.ExecuteSqlRequest{ + Session: session.Name, Sql: "SELECT t.key key, t.value value FROM t_mock t", ResumeToken: resumeToken, }) @@ -1224,62 +1240,54 @@ func TestQueueBytes(t *testing.T) { }, nil, ) - go func() { - for r.next() { - } - }() - // Let server send maxBuffers / 2 rows. - for i := 0; i < maxBuffers/2; i++ { - wantQueueBytes += proto.Size(&sppb.PartialResultSet{ - Metadata: kvMeta, - Values: []*proto3.Value{ - {Kind: &proto3.Value_StringValue{StringValue: keyStr(i)}}, - {Kind: &proto3.Value_StringValue{StringValue: valStr(i)}}, - }, - }) - ms.AddMsg(nil, false) - } + + r.next() + if err := sr.waitn(maxBuffers/2 + 1); err != nil { - t.Fatalf("failed to wait for the first %v recv() calls: %v", maxBuffers, err) + t.Fatalf("failed to wait for the first %v recv() calls: %v", maxBuffers/2, err) } if int32(wantQueueBytes) != r.bytesBetweenResumeTokens { t.Errorf("r.bytesBetweenResumeTokens = %v, want %v", r.bytesBetweenResumeTokens, wantQueueBytes) } - // Now send a resume token to drain the queue. - ms.AddMsg(nil, true) - // Wait for all rows to be processes. - if err := sr.waitn(1); err != nil { - t.Fatalf("failed to wait for rows to be processed: %v", err) - } - if r.bytesBetweenResumeTokens != 0 { - t.Errorf("r.bytesBetweenResumeTokens = %v, want 0", r.bytesBetweenResumeTokens) - } - // Let server send maxBuffers - 1 rows. - wantQueueBytes = 0 - for i := 0; i < maxBuffers-1; i++ { - wantQueueBytes += proto.Size(&sppb.PartialResultSet{ - Metadata: kvMeta, - Values: []*proto3.Value{ - {Kind: &proto3.Value_StringValue{StringValue: keyStr(i)}}, - {Kind: &proto3.Value_StringValue{StringValue: valStr(i)}}, - }, - }) - ms.AddMsg(nil, false) - } - if err := sr.waitn(maxBuffers - 1); err != nil { - t.Fatalf("failed to wait for %v rows to be processed: %v", maxBuffers-1, err) - } - if int32(wantQueueBytes) != r.bytesBetweenResumeTokens { - t.Errorf("r.bytesBetweenResumeTokens = %v, want 0", r.bytesBetweenResumeTokens) - } - // Trigger a state transition: queueingRetryable -> queueingUnretryable. - ms.AddMsg(nil, false) - if err := sr.waitn(1); err != nil { - t.Fatalf("failed to wait for state transition: %v", err) - } - if r.bytesBetweenResumeTokens != 0 { - t.Errorf("r.bytesBetweenResumeTokens = %v, want 0", r.bytesBetweenResumeTokens) - } + + // // Now send a resume token to drain the queue. + // // ms.AddMsg(nil, true) + // wantQueueBytes = setupStatementResult(t, server, 1, true) + // fmt.Println("wantQueueBytes:", wantQueueBytes) + // // Wait for all rows to be processes. + // if err := sr.waitn(1); err != nil { + // t.Fatalf("failed to wait for rows to be processed: %v", err) + // } + // if r.bytesBetweenResumeTokens != 0 { + // t.Errorf("r.bytesBetweenResumeTokens = %v, want 0", r.bytesBetweenResumeTokens) + // } + + // // Let server send maxBuffers - 1 rows. + // wantQueueBytes = 0 + // for i := 0; i < maxBuffers-1; i++ { + // wantQueueBytes += proto.Size(&sppb.PartialResultSet{ + // Metadata: kvMeta, + // Values: []*proto3.Value{ + // {Kind: &proto3.Value_StringValue{StringValue: keyStr(i)}}, + // {Kind: &proto3.Value_StringValue{StringValue: valStr(i)}}, + // }, + // }) + // // ms.AddMsg(nil, false) + // } + // if err := sr.waitn(maxBuffers - 1); err != nil { + // t.Fatalf("failed to wait for %v rows to be processed: %v", maxBuffers-1, err) + // } + // if int32(wantQueueBytes) != r.bytesBetweenResumeTokens { + // t.Errorf("r.bytesBetweenResumeTokens = %v, want 0", r.bytesBetweenResumeTokens) + // } + // // Trigger a state transition: queueingRetryable -> queueingUnretryable. + // // ms.AddMsg(nil, false) + // if err := sr.waitn(1); err != nil { + // t.Fatalf("failed to wait for state transition: %v", err) + // } + // if r.bytesBetweenResumeTokens != 0 { + // t.Errorf("r.bytesBetweenResumeTokens = %v, want 0", r.bytesBetweenResumeTokens) + // } } // Verify that client can deal with resume token correctly @@ -1483,66 +1491,63 @@ func TestResumeToken(t *testing.T) { func TestGrpcReconnect(t *testing.T) { restore := setMaxBytesBetweenResumeTokens() defer restore() - ms := NewMockCloudSpanner(t, trxTs) - ms.Serve() - defer ms.Stop() - cc := dialMock(t, ms) - defer cc.Close() - mc := sppb.NewSpannerClient(cc) - retry := make(chan int) - row := make(chan int) - var err error - go func() { - r := 0 - // Establish a stream to mock cloud spanner server. - iter := stream(context.Background(), nil, - func(ct context.Context, resumeToken []byte) (streamingReceiver, error) { - if r > 0 { - // This RPC attempt is a retry, signal it. - retry <- r - } - r++ - return mc.ExecuteStreamingSql(ct, &sppb.ExecuteSqlRequest{ - Sql: "SELECT t.key key, t.value value FROM t_mock t", - ResumeToken: resumeToken, - }) - }, - nil, - func(error) {}) - defer iter.Stop() - for { - _, err = iter.Next() - if err == iterator.Done { - err = nil - break - } - if err != nil { - break - } - row <- 0 - } - }() - // Add a message and wait for the receipt. - ms.AddMsg(nil, true) - select { - case <-row: - case <-time.After(10 * time.Second): - t.Fatalf("expect stream to be established within 10 seconds, but it didn't") + server, c, teardown := setupMockedTestServer(t) + defer teardown() + mc, err := c.sc.nextClient() + if err != nil { + t.Fatalf("failed to create a grpc client") } - // Error injection: force server to close all connections. - ms.Stop() - // Test to see if client respond to the real RPC failure correctly by - // retrying RPC. - select { - case r, ok := <-retry: - if ok && r == 1 { + + setupStatementResult(t, server, 3, false) + + var formattedDatabase string = fmt.Sprintf("projects/%s/instances/%s/databases/%s", "[PROJECT]", "[INSTANCE]", "[DATABASE]") + var request = &sppb.CreateSessionRequest{ + Database: formattedDatabase, + } + session, err := mc.CreateSession(context.Background(), request) + if err != nil { + t.Fatalf("failed to create a session") + } + + // Simulate an unavailable error to interrupt the stream of PartialResultSet + // in order to test the grpc retrying mechanism. + server.TestSpanner.AddPartialResultSetError( + "SELECT t.key key, t.value value FROM t_mock t", + PartialResultSetExecutionTime{ + ResumeToken: EncodeResumeToken(2), + Err: status.Errorf(codes.Unavailable, "server is unavailable"), + }, + ) + + // The retry is counted from the second call. + r := -1 + // Establish a stream to mock cloud spanner server. + iter := stream(context.Background(), nil, + func(ct context.Context, resumeToken []byte) (streamingReceiver, error) { + r++ + return mc.ExecuteStreamingSql(ct, &sppb.ExecuteSqlRequest{ + Session: session.Name, + Sql: "SELECT t.key key, t.value value FROM t_mock t", + ResumeToken: resumeToken, + }) + + }, + nil, + func(error) {}) + defer iter.Stop() + for { + _, err := iter.Next() + if err == iterator.Done { + err = nil break } + if err != nil { + break + } + } + if r != 1 { t.Errorf("retry count = %v, want 1", r) - case <-time.After(10 * time.Second): - t.Errorf("client library failed to respond after 10 seconds, aborting") - return } } @@ -1550,26 +1555,31 @@ func TestGrpcReconnect(t *testing.T) { func TestCancelTimeout(t *testing.T) { restore := setMaxBytesBetweenResumeTokens() defer restore() - ms := NewMockCloudSpanner(t, trxTs) - ms.Serve() - defer ms.Stop() - cc := dialMock(t, ms) - defer cc.Close() - mc := sppb.NewSpannerClient(cc) + _, c, teardown := setupMockedTestServer(t) + defer teardown() + mc, err := c.sc.nextClient() + if err != nil { + t.Fatalf("failed to create a grpc client") + } + + var formattedDatabase string = fmt.Sprintf("projects/%s/instances/%s/databases/%s", "[PROJECT]", "[INSTANCE]", "[DATABASE]") + var request = &sppb.CreateSessionRequest{ + Database: formattedDatabase, + } + session, err := mc.CreateSession(context.Background(), request) + if err != nil { + t.Fatalf("failed to create a session") + } done := make(chan int) - go func() { - for { - ms.AddMsg(nil, true) - } - }() + // Test cancelling query. ctx, cancel := context.WithCancel(context.Background()) - var err error go func() { // Establish a stream to mock cloud spanner server. iter := stream(ctx, nil, func(ct context.Context, resumeToken []byte) (streamingReceiver, error) { return mc.ExecuteStreamingSql(ct, &sppb.ExecuteSqlRequest{ + Session: session.Name, Sql: "SELECT t.key key, t.value value FROM t_mock t", ResumeToken: resumeToken, }) @@ -1605,6 +1615,7 @@ func TestCancelTimeout(t *testing.T) { iter := stream(ctx, nil, func(ct context.Context, resumeToken []byte) (streamingReceiver, error) { return mc.ExecuteStreamingSql(ct, &sppb.ExecuteSqlRequest{ + Session: session.Name, Sql: "SELECT t.key key, t.value value FROM t_mock t", ResumeToken: resumeToken, }) @@ -1634,31 +1645,90 @@ func TestCancelTimeout(t *testing.T) { } } +func setupStatementResult(t *testing.T, server *MockedSpannerInMemTestServer, rowCount int, sameResumeToken bool) int { + cols := []string{"key", "value"} + selectValues := make([][]string, rowCount) + for i := 0; i < rowCount; i++ { + selectValues[i] = []string{keyStr(i + 1), valStr(i + 1)} + } + + fields := make([]*sppb.StructType_Field, len(cols)) + for i, col := range cols { + fields[i] = &sppb.StructType_Field{ + Name: col, + Type: &sppb.Type{Code: sppb.TypeCode_STRING}, + } + } + rowType := &sppb.StructType{ + Fields: fields, + } + metadata := &sppb.ResultSetMetadata{ + RowType: rowType, + } + rows := make([]*structpb.ListValue, len(selectValues)) + for i, values := range selectValues { + rowValues := make([]*structpb.Value, len(cols)) + for j, value := range values { + rowValues[j] = &structpb.Value{ + Kind: &structpb.Value_StringValue{StringValue: value}, + } + } + rows[i] = &structpb.ListValue{ + Values: rowValues, + } + } + resultSet := &sppb.ResultSet{ + Metadata: metadata, + Rows: rows, + } + result := &StatementResult{Type: StatementResultResultSet, ResultSet: resultSet, SameResumeToken: sameResumeToken} + server.TestSpanner.PutStatementResult("SELECT t.key key, t.value value FROM t_mock t", result) + + prs, err := result.ToPartialResultSets([]byte{}) + if err != nil { + t.Fatalf("fail to convert ResultSet to an array of artialResultSet: %v", err) + } + bytes := 0 + for _, p := range prs { + bytes += proto.Size(p) + } + return bytes +} + func TestRowIteratorDo(t *testing.T) { restore := setMaxBytesBetweenResumeTokens() defer restore() - ms := NewMockCloudSpanner(t, trxTs) - ms.Serve() - defer ms.Stop() - cc := dialMock(t, ms) - defer cc.Close() - mc := sppb.NewSpannerClient(cc) - for i := 0; i < 3; i++ { - ms.AddMsg(nil, false) + server, c, teardown := setupMockedTestServer(t) + defer teardown() + mc, err := c.sc.nextClient() + if err != nil { + t.Fatalf("failed to create a grpc client") + } + + setupStatementResult(t, server, 3, false) + + var formattedDatabase string = fmt.Sprintf("projects/%s/instances/%s/databases/%s", "[PROJECT]", "[INSTANCE]", "[DATABASE]") + var request = &sppb.CreateSessionRequest{ + Database: formattedDatabase, + } + session, err := mc.CreateSession(context.Background(), request) + if err != nil { + t.Fatalf("failed to create a session") } - ms.AddMsg(io.EOF, true) + nRows := 0 iter := stream(context.Background(), nil, func(ct context.Context, resumeToken []byte) (streamingReceiver, error) { return mc.ExecuteStreamingSql(ct, &sppb.ExecuteSqlRequest{ + Session: session.Name, Sql: "SELECT t.key key, t.value value FROM t_mock t", ResumeToken: resumeToken, }) }, nil, func(error) {}) - err := iter.Do(func(r *Row) error { nRows++; return nil }) + err = iter.Do(func(r *Row) error { nRows++; return nil }) if err != nil { t.Errorf("Using Do: %v", err) } @@ -1670,20 +1740,29 @@ func TestRowIteratorDo(t *testing.T) { func TestRowIteratorDoWithError(t *testing.T) { restore := setMaxBytesBetweenResumeTokens() defer restore() - ms := NewMockCloudSpanner(t, trxTs) - ms.Serve() - defer ms.Stop() - cc := dialMock(t, ms) - defer cc.Close() - mc := sppb.NewSpannerClient(cc) - for i := 0; i < 3; i++ { - ms.AddMsg(nil, false) + server, c, teardown := setupMockedTestServer(t) + defer teardown() + mc, err := c.sc.nextClient() + if err != nil { + t.Fatalf("failed to create a grpc client") + } + + setupStatementResult(t, server, 3, false) + + var formattedDatabase string = fmt.Sprintf("projects/%s/instances/%s/databases/%s", "[PROJECT]", "[INSTANCE]", "[DATABASE]") + var request = &sppb.CreateSessionRequest{ + Database: formattedDatabase, + } + session, err := mc.CreateSession(context.Background(), request) + if err != nil { + t.Fatalf("failed to create a session") } - ms.AddMsg(io.EOF, true) + iter := stream(context.Background(), nil, func(ct context.Context, resumeToken []byte) (streamingReceiver, error) { return mc.ExecuteStreamingSql(ct, &sppb.ExecuteSqlRequest{ + Session: session.Name, Sql: "SELECT t.key key, t.value value FROM t_mock t", ResumeToken: resumeToken, }) @@ -1691,7 +1770,7 @@ func TestRowIteratorDoWithError(t *testing.T) { nil, func(error) {}) injected := errors.New("Failed iterator") - err := iter.Do(func(r *Row) error { return injected }) + err = iter.Do(func(r *Row) error { return injected }) if err != injected { t.Errorf("got <%v>, want <%v>", err, injected) } @@ -1701,27 +1780,36 @@ func TestIteratorStopEarly(t *testing.T) { ctx := context.Background() restore := setMaxBytesBetweenResumeTokens() defer restore() - ms := NewMockCloudSpanner(t, trxTs) - ms.Serve() - defer ms.Stop() - cc := dialMock(t, ms) - defer cc.Close() - mc := sppb.NewSpannerClient(cc) - ms.AddMsg(nil, false) - ms.AddMsg(nil, false) - ms.AddMsg(io.EOF, true) + server, c, teardown := setupMockedTestServer(t) + defer teardown() + mc, err := c.sc.nextClient() + if err != nil { + t.Fatalf("failed to create a grpc client") + } + + setupStatementResult(t, server, 3, false) + + var formattedDatabase string = fmt.Sprintf("projects/%s/instances/%s/databases/%s", "[PROJECT]", "[INSTANCE]", "[DATABASE]") + var request = &sppb.CreateSessionRequest{ + Database: formattedDatabase, + } + session, err := mc.CreateSession(context.Background(), request) + if err != nil { + t.Fatalf("failed to create a session") + } iter := stream(ctx, nil, func(ct context.Context, resumeToken []byte) (streamingReceiver, error) { return mc.ExecuteStreamingSql(ct, &sppb.ExecuteSqlRequest{ + Session: session.Name, Sql: "SELECT t.key key, t.value value FROM t_mock t", ResumeToken: resumeToken, }) }, nil, func(error) {}) - _, err := iter.Next() + _, err = iter.Next() if err != nil { t.Fatalf("before Stop: %v", err) }