Skip to content

Commit

Permalink
feat(spanner): Executor framework server and worker proxy (#8714)
Browse files Browse the repository at this point in the history
* feat(spanner): add executor code

* feat(spanner): add license headers

* feat(spanner): add proto and autogenerated code

* feat(spanner): add cloud_executor file which has helper methods

* feat(spanner): make code modular

* feat(spanner): move autogenerated protos to a different PR

* feat(spanner): rename file

* feat(spanner): rename

* feat(spanner): use string.join

* feat(spanner): add file responsibility

* feat(spanner): coder refactoring

* feat(spanner): coder refactoring

* feat(spanner): update go.mod

* feat(spanner): lint fixes

* feat(spanner): lint fixes
  • Loading branch information
harshachinta authored Nov 2, 2023
1 parent e22e70f commit 6b931ee
Show file tree
Hide file tree
Showing 7 changed files with 645 additions and 1 deletion.
2 changes: 1 addition & 1 deletion spanner/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/google/go-cmp v0.6.0
github.com/googleapis/gax-go/v2 v2.12.0
go.opencensus.io v0.24.0
golang.org/x/oauth2 v0.13.0
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2
google.golang.org/api v0.149.0
google.golang.org/genproto v0.0.0-20231016165738-49dd2c1f3d0b
Expand All @@ -33,7 +34,6 @@ require (
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/oauth2 v0.13.0 // indirect
golang.org/x/sync v0.4.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
Expand Down
48 changes: 48 additions & 0 deletions spanner/test/cloudexecutor/executor/executor_proxy_server_impl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

// executor_proxy_server_impl.go contains the implementation of the executor proxy RPC.
// This RPC gets invoked through the gRPC stream exposed via proxy port by worker_proxy.go file.

import (
"context"

"cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/inputstream"
executorpb "cloud.google.com/go/spanner/test/cloudexecutor/proto"
"google.golang.org/api/option"
)

// CloudProxyServer holds the cloud executor server.
type CloudProxyServer struct {
serverContext context.Context
options []option.ClientOption
}

// NewCloudProxyServer initializes and returns a new CloudProxyServer instance.
func NewCloudProxyServer(ctx context.Context, opts []option.ClientOption) (*CloudProxyServer, error) {
return &CloudProxyServer{serverContext: ctx, options: opts}, nil
}

// ExecuteActionAsync is implementation of ExecuteActionAsync in SpannerExecutorProxyServer. It's a
// streaming method in which client and server exchange SpannerActions and SpannerActionOutcomes.
func (s *CloudProxyServer) ExecuteActionAsync(inputStream executorpb.SpannerExecutorProxy_ExecuteActionAsyncServer) error {
handler := &inputstream.CloudStreamHandler{
Stream: inputStream,
ServerContext: s.serverContext,
Options: s.options,
}
return handler.Execute()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package inputstream

// input_stream_handler.go is responsible for handling input requests to the server and
// handles mapping from executor actions (SpannerAsyncActionRequest) to client library code.

import (
"context"
"sync"

executorpb "cloud.google.com/go/spanner/test/cloudexecutor/proto"
"google.golang.org/api/option"
)

// CloudStreamHandler handles a single streaming ExecuteActions request by performing incoming
// actions. It maintains a state associated with the request, such as current transaction.
//
// CloudStreamHandler uses contexts (context.Context) to coordinate execution of asynchronous
// actions. The Stubby stream's context becomes a parent for all individual actions' contexts. This
// is done so that we don't leak anything when the stream is closed.
//
// startTxnHandler is a bit different from other actions. Read-write transactions that it
// starts outlive the action itself, so the Stubby stream's context is used for transactions
// instead of the action's context.
//
// For more info about contexts in Go, read golang.org/pkg/context
type CloudStreamHandler struct {
// members below should be set by the caller
Stream executorpb.SpannerExecutorProxy_ExecuteActionAsyncServer
ServerContext context.Context
Options []option.ClientOption
// members below represent internal state
mu sync.Mutex // protects mutable internal state
}

// Execute executes the given ExecuteActions request, blocking until it's done. It takes care of
// properly closing the request stream in the end.
func (h *CloudStreamHandler) Execute() error {
return nil
}
213 changes: 213 additions & 0 deletions spanner/test/cloudexecutor/executor/internal/outputstream/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package outputstream

import (
"log"

"cloud.google.com/go/spanner"
"cloud.google.com/go/spanner/apiv1/spannerpb"
"cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/utility"
executorpb "cloud.google.com/go/spanner/test/cloudexecutor/proto"
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
)

// if OutcomeSender.rowCount exceed maxRowsPerBatch value, we should send rows back to the client in batch.
const maxRowsPerBatch = 100

// OutcomeSender is a utility class used for sending action outcomes back to the client. For read
// actions, it buffers rows and sends partial read results in batches.
type OutcomeSender struct {
actionID int32
stream executorpb.SpannerExecutorProxy_ExecuteActionAsyncServer

// partialOutcome accumulates rows and other relevant information
partialOutcome *executorpb.SpannerActionOutcome
readResult *executorpb.ReadResult
queryResult *executorpb.QueryResult

// All the relevant variables below should be set before first outcome is sent back,
// and unused variables should leave null.
timestamp *timestamppb.Timestamp
hasReadResult bool
hasQueryResult bool
hasChangeStreamRecords bool
table string // name of the table being read
index *string // name of the secondary index used for read
requestIndex *int32 // request index (for multireads)
rowType *spannerpb.StructType

// Current row count in read/query result
rowCount int64
// modified row count in dml result
rowsModified []int64
}

// NewOutcomeSender returns an OutcomeSender with default fields set.
func NewOutcomeSender(actionID int32, stream executorpb.SpannerExecutorProxy_ExecuteActionAsyncServer) *OutcomeSender {
return &OutcomeSender{
actionID: actionID,
stream: stream,
hasReadResult: false,
hasQueryResult: false,
}
}

// SetTimestamp sets the timestamp for commit.
func (s *OutcomeSender) SetTimestamp(timestamp *timestamppb.Timestamp) {
s.timestamp = timestamp
}

// SetRowType sets the rowType for appending row.
func (s *OutcomeSender) SetRowType(rowType *spannerpb.StructType) {
s.rowType = rowType
}

// InitForRead init the sender for read action, then set the table and index if there exists.
func (s *OutcomeSender) InitForRead(table string, index *string) {
s.hasReadResult = true
s.table = table
if index != nil {
s.index = index
}
}

// InitForQuery init the sender for query action
func (s *OutcomeSender) InitForQuery() {
s.hasQueryResult = true
}

// InitForBatchRead init the sender for batch read action, then set the table and index if there exists.
func (s *OutcomeSender) InitForBatchRead(table string, index *string) {
s.InitForRead(table, index)
// Cloud API supports only simple batch reads (not multi reads), so request index is always 0.
requestIndex := int32(0)
s.requestIndex = &requestIndex
}

// AppendDmlRowsModified add rows modified in dml to result
func (s *OutcomeSender) AppendDmlRowsModified(rowsModified int64) {
s.rowsModified = append(s.rowsModified, rowsModified)
}

// FinishSuccessfully sends the last outcome with OK status.
func (s *OutcomeSender) FinishSuccessfully() error {
s.buildOutcome()
s.partialOutcome.Status = &spb.Status{Code: int32(codes.OK)}
return s.flush()
}

// FinishWithTransactionRestarted sends the last outcome with aborted error,
// this will set the TransactionRestarted to true
func (s *OutcomeSender) FinishWithTransactionRestarted() error {
s.buildOutcome()
transactionRestarted := true
s.partialOutcome.TransactionRestarted = &transactionRestarted
s.partialOutcome.Status = &spb.Status{Code: int32(codes.OK)}
return s.flush()
}

// FinishWithError sends the last outcome with given error status.
func (s *OutcomeSender) FinishWithError(err error) error {
s.buildOutcome()
//TODO(harsha:oct10) uncomment below line and comment s.partialOutcome.Status = errToStatus(err)
//s.partialOutcome.Status = &status.Status{Code: int32(gstatus.Code(err)), Message: err.Error()}
s.partialOutcome.Status = utility.ErrToStatus(err)
return s.flush()
}

// AppendRow adds another row to buffer. If buffer hits its size limit, the buffered rows will be sent back.
func (s *OutcomeSender) AppendRow(row *executorpb.ValueList) error {
if !s.hasReadResult && !s.hasQueryResult {
return spanner.ToSpannerError(status.Error(codes.InvalidArgument, "either hasReadResult or hasQueryResult should be true"))
}
if s.rowType == nil {
return spanner.ToSpannerError(status.Error(codes.InvalidArgument, "rowType should be set first"))
}
s.buildOutcome()
if s.hasReadResult {
s.readResult.Row = append(s.readResult.Row, row)
s.rowCount++
} else if s.hasQueryResult {
s.queryResult.Row = append(s.queryResult.Row, row)
s.rowCount++
}
if s.rowCount >= maxRowsPerBatch {
return s.flush()
}
return nil
}

// buildOutcome will build the partialOutcome if not exists using relevant variables.
func (s *OutcomeSender) buildOutcome() {
if s.partialOutcome != nil {
return
}
s.partialOutcome = &executorpb.SpannerActionOutcome{
CommitTime: s.timestamp,
}
if s.hasReadResult {
s.readResult = &executorpb.ReadResult{
Table: s.table,
Index: s.index,
RowType: s.rowType,
RequestIndex: s.requestIndex,
}
} else if s.hasQueryResult {
s.queryResult = &executorpb.QueryResult{
RowType: s.rowType,
}
}
}

// flush sends partialOutcome to stream and clear the internal state
func (s *OutcomeSender) flush() error {
if s == nil || s.partialOutcome == nil {
log.Println("outcomeSender.flush() is called when there is no partial outcome to send. This is an internal error that should never happen")
return spanner.ToSpannerError(status.Error(codes.InvalidArgument, "either outcome sender or partial outcome is nil"))
}
s.partialOutcome.DmlRowsModified = s.rowsModified
if s.hasReadResult {
s.partialOutcome.ReadResult = s.readResult
} else if s.hasQueryResult {
s.partialOutcome.QueryResult = s.queryResult
}
err := s.SendOutcome(s.partialOutcome)
s.partialOutcome = nil
s.readResult = nil
s.queryResult = nil
s.rowCount = 0
s.rowsModified = []int64{}
return err
}

// SendOutcome sends the given SpannerActionOutcome.
func (s *OutcomeSender) SendOutcome(outcome *executorpb.SpannerActionOutcome) error {
log.Printf("sending result %v actionId %d", outcome, s.actionID)
resp := &executorpb.SpannerAsyncActionResponse{
ActionId: s.actionID,
Outcome: outcome,
}
err := s.stream.Send(resp)
if err != nil {
log.Printf("Failed to send outcome with error: %s", err.Error())
} else {
log.Printf("Sent result %v actionId %d", outcome, s.actionID)
}
return err
}
Loading

0 comments on commit 6b931ee

Please sign in to comment.