From ca7667194007394bdcade8058fa84c1fe19c06b1 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH <57220027+harshachinta@users.noreply.github.com> Date: Tue, 7 Nov 2023 14:27:33 +0530 Subject: [PATCH] feat(spanner): Add DML, DQL, Mutation, Txn Actions and Utility methods for executor framework (#8976) * feat(spanner): add code for executor framework * feat(spanner): lint fix * feat(spanner): check test flakiness * feat(spanner): revert - check test flakiness * feat(spanner): update context * feat(spanner): add start and finish transaction actions * feat(spanner): remove the fixed TODO * feat(spanner): add input stream handling * feat(spanner): code fixes --- .../cloudexecutor/executor/actions/dml.go | 64 ++++ .../cloudexecutor/executor/actions/dql.go | 166 +++++++++ .../actions/execution_flow_context.go | 71 ++++ .../executor/actions/mutation.go | 176 ++++++++++ .../executor/actions/partition.go | 2 +- .../executor/actions/transaction.go | 204 +++++++++++ .../executor/internal/inputstream/handler.go | 217 +++++++++++- .../executor/internal/outputstream/handler.go | 2 - .../utility/executor_to_spanner_conversion.go | 31 -- .../executor_to_spanner_value_converter.go | 330 ++++++++++++++++++ .../executor/internal/utility/key.go | 162 +++++++++ .../spanner_to_executor_value_converter.go | 311 +++++++++++++++++ 12 files changed, 1690 insertions(+), 46 deletions(-) create mode 100644 spanner/test/cloudexecutor/executor/actions/dml.go create mode 100644 spanner/test/cloudexecutor/executor/actions/mutation.go create mode 100644 spanner/test/cloudexecutor/executor/actions/transaction.go delete mode 100644 spanner/test/cloudexecutor/executor/internal/utility/executor_to_spanner_conversion.go create mode 100644 spanner/test/cloudexecutor/executor/internal/utility/executor_to_spanner_value_converter.go create mode 100644 spanner/test/cloudexecutor/executor/internal/utility/key.go create mode 100644 spanner/test/cloudexecutor/executor/internal/utility/spanner_to_executor_value_converter.go diff --git a/spanner/test/cloudexecutor/executor/actions/dml.go b/spanner/test/cloudexecutor/executor/actions/dml.go new file mode 100644 index 000000000000..f712b5670a5f --- /dev/null +++ b/spanner/test/cloudexecutor/executor/actions/dml.go @@ -0,0 +1,64 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package actions + +import ( + "context" + "log" + + "cloud.google.com/go/spanner" + "cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/outputstream" + "cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/utility" + executorpb "cloud.google.com/go/spanner/test/cloudexecutor/proto" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// DmlActionHandler holds the necessary components required for DML action. +type DmlActionHandler struct { + Action *executorpb.DmlAction + FlowContext *ExecutionFlowContext + OutcomeSender *outputstream.OutcomeSender +} + +// ExecuteAction executes a DML update action request, store the results in the outputstream.OutcomeSender. +func (h *DmlActionHandler) ExecuteAction(ctx context.Context) error { + h.FlowContext.mu.Lock() + defer h.FlowContext.mu.Unlock() + log.Printf("Executing dml update %s\n %v\n", h.FlowContext.transactionSeed, h.Action) + stmt, err := utility.BuildQuery(h.Action.GetUpdate()) + if err != nil { + return h.OutcomeSender.FinishWithError(err) + } + + var iter *spanner.RowIterator + txn, err := h.FlowContext.getTransactionForWrite() + if err != nil { + return h.OutcomeSender.FinishWithError(err) + } + h.OutcomeSender.InitForQuery() + iter = txn.Query(ctx, stmt) + defer iter.Stop() + log.Printf("Parsing DML result %s\n", h.FlowContext.transactionSeed) + err = processResults(iter, 0, h.OutcomeSender, h.FlowContext) + if err != nil { + if status.Code(err) == codes.Aborted { + return h.OutcomeSender.FinishWithTransactionRestarted() + } + return h.OutcomeSender.FinishWithError(err) + } + h.OutcomeSender.AppendDmlRowsModified(iter.RowCount) + return h.OutcomeSender.FinishSuccessfully() +} diff --git a/spanner/test/cloudexecutor/executor/actions/dql.go b/spanner/test/cloudexecutor/executor/actions/dql.go index 165a636245ee..021032d78a5c 100644 --- a/spanner/test/cloudexecutor/executor/actions/dql.go +++ b/spanner/test/cloudexecutor/executor/actions/dql.go @@ -15,13 +15,179 @@ package actions import ( + "context" + "fmt" + "log" + "cloud.google.com/go/spanner" "cloud.google.com/go/spanner/apiv1/spannerpb" "cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/outputstream" "cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/utility" + executorpb "cloud.google.com/go/spanner/test/cloudexecutor/proto" + "google.golang.org/api/iterator" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) +// ReadActionHandler holds the necessary components required for executorpb.ReadAction. +type ReadActionHandler struct { + Action *executorpb.ReadAction + FlowContext *ExecutionFlowContext + OutcomeSender *outputstream.OutcomeSender +} + +// ExecuteAction executes a read action request, store the results in the OutcomeSender. +func (h *ReadActionHandler) ExecuteAction(ctx context.Context) error { + h.FlowContext.mu.Lock() + defer h.FlowContext.mu.Unlock() + log.Printf("Executing read %s:\n %v", h.FlowContext.transactionSeed, h.Action) + action := h.Action + var err error + + var typeList []*spannerpb.Type + if action.Index != nil { + typeList, err = extractTypes(action.GetTable(), action.GetColumn(), h.FlowContext.tableMetadata) + if err != nil { + return h.OutcomeSender.FinishWithError(status.Error(codes.InvalidArgument, fmt.Sprintf("Can't extract types from metadata: %s", err))) + } + } else { + typeList, err = h.FlowContext.tableMetadata.GetKeyColumnTypes(action.GetTable()) + if err != nil { + return h.OutcomeSender.FinishWithError(status.Error(codes.InvalidArgument, fmt.Sprintf("Can't extract types from metadata: %s", err))) + } + } + + keySet, err := utility.KeySetProtoToCloudKeySet(action.GetKeys(), typeList) + if err != nil { + return h.OutcomeSender.FinishWithError(status.Error(codes.InvalidArgument, fmt.Sprintf("Can't convert rowSet: %s", err))) + } + + var iter *spanner.RowIterator + if h.FlowContext.currentActiveTransaction == None { + return h.OutcomeSender.FinishWithError(status.Error(codes.InvalidArgument, "no active transaction")) + } else if h.FlowContext.currentActiveTransaction == Batch { + return h.OutcomeSender.FinishWithError(status.Error(codes.InvalidArgument, "can't execute regular read in a batch transaction")) + } else if h.FlowContext.currentActiveTransaction == Read { + txn, err := h.FlowContext.getTransactionForRead() + if err != nil { + return h.OutcomeSender.FinishWithError(err) + } + h.OutcomeSender.InitForRead(action.GetTable(), action.Index) + h.FlowContext.numPendingReads++ + if action.Index != nil { + iter = txn.ReadUsingIndex(ctx, action.GetTable(), action.GetIndex(), keySet, action.GetColumn()) + } else { + iter = txn.Read(ctx, action.GetTable(), keySet, action.GetColumn()) + } + } else if h.FlowContext.currentActiveTransaction == ReadWrite { + txn, err := h.FlowContext.getTransactionForWrite() + if err != nil { + return h.OutcomeSender.FinishWithError(err) + } + h.OutcomeSender.InitForRead(action.GetTable(), action.Index) + h.FlowContext.numPendingReads++ + if action.Index != nil { + iter = txn.ReadUsingIndex(ctx, action.GetTable(), action.GetIndex(), keySet, action.GetColumn()) + } else { + iter = txn.Read(ctx, action.GetTable(), keySet, action.GetColumn()) + } + } + defer iter.Stop() + log.Printf("Parsing read result %s\n", h.FlowContext.transactionSeed) + err = processResults(iter, int64(action.GetLimit()), h.OutcomeSender, h.FlowContext) + if err != nil { + h.FlowContext.finishRead(status.Code(err)) + if status.Code(err) == codes.Aborted { + return h.OutcomeSender.FinishWithTransactionRestarted() + } + return h.OutcomeSender.FinishWithError(err) + } + h.FlowContext.finishRead(codes.OK) + return h.OutcomeSender.FinishSuccessfully() +} + +// QueryActionHandler holds the necessary components required for executorpb.QueryAction. +type QueryActionHandler struct { + Action *executorpb.QueryAction + FlowContext *ExecutionFlowContext + OutcomeSender *outputstream.OutcomeSender +} + +// ExecuteAction executes a query action request, store the results in the OutcomeSender. +func (h *QueryActionHandler) ExecuteAction(ctx context.Context) error { + h.FlowContext.mu.Lock() + defer h.FlowContext.mu.Unlock() + log.Printf("Executing query %s\n %v\n", h.FlowContext.transactionSeed, h.Action) + stmt, err := utility.BuildQuery(h.Action) + if err != nil { + return h.OutcomeSender.FinishWithError(err) + } + + var iter *spanner.RowIterator + if h.FlowContext.currentActiveTransaction == None { + return h.OutcomeSender.FinishWithError(status.Error(codes.InvalidArgument, "no active transaction")) + } else if h.FlowContext.currentActiveTransaction == Batch { + return h.OutcomeSender.FinishWithError(status.Error(codes.InvalidArgument, "can't execute regular read in a batch transaction")) + } else if h.FlowContext.currentActiveTransaction == Read { + txn, err := h.FlowContext.getTransactionForRead() + if err != nil { + return h.OutcomeSender.FinishWithError(err) + } + h.OutcomeSender.InitForQuery() + h.FlowContext.numPendingReads++ + iter = txn.Query(ctx, stmt) + } else if h.FlowContext.currentActiveTransaction == ReadWrite { + txn, err := h.FlowContext.getTransactionForWrite() + if err != nil { + return h.OutcomeSender.FinishWithError(err) + } + h.OutcomeSender.InitForQuery() + h.FlowContext.numPendingReads++ + iter = txn.Query(ctx, stmt) + } + defer iter.Stop() + log.Printf("Parsing query result %s\n", h.FlowContext.transactionSeed) + err = processResults(iter, 0, h.OutcomeSender, h.FlowContext) + if err != nil { + h.FlowContext.finishRead(status.Code(err)) + if status.Code(err) == codes.Aborted { + return h.OutcomeSender.FinishWithTransactionRestarted() + } + return h.OutcomeSender.FinishWithError(err) + } + h.FlowContext.finishRead(codes.OK) + return h.OutcomeSender.FinishSuccessfully() +} + +// processResults processes a ResultSet from a read/query/dml and store the results in the OutcomeSender. func processResults(iter *spanner.RowIterator, limit int64, outcomeSender *outputstream.OutcomeSender, flowContext *ExecutionFlowContext) error { + var rowCount int64 = 0 + log.Printf("Iterating result set: %s\n", flowContext.transactionSeed) + for { + row, err := iter.Next() + if err == iterator.Done { + return nil + } + if err != nil { + return err + } + spannerRow, rowType, err := utility.ConvertSpannerRow(row) + if err != nil { + return err + } + outcomeSender.SetRowType(rowType) + // outcomeSender.rowType = rowType + err = outcomeSender.AppendRow(spannerRow) + if err != nil { + return err + } + rowCount++ + if limit > 0 && rowCount >= limit { + log.Printf("Stopping at row limit: %d", limit) + break + } + } + log.Printf("Successfully processed result: %s\n", flowContext.transactionSeed) return nil } diff --git a/spanner/test/cloudexecutor/executor/actions/execution_flow_context.go b/spanner/test/cloudexecutor/executor/actions/execution_flow_context.go index 99c5e2175203..cbd85bdf14f0 100644 --- a/spanner/test/cloudexecutor/executor/actions/execution_flow_context.go +++ b/spanner/test/cloudexecutor/executor/actions/execution_flow_context.go @@ -17,10 +17,13 @@ package actions import ( "context" "log" + "strings" "sync" + "time" "cloud.google.com/go/spanner" "cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/utility" + executorpb "cloud.google.com/go/spanner/test/cloudexecutor/proto" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -122,3 +125,71 @@ func (c *ExecutionFlowContext) initReadState() { c.readAborted = false c.numPendingReads = 0 } + +// clear clears the transaction related variables. +func (c *ExecutionFlowContext) clear() { + c.roTxn = nil + c.rwTxn = nil + c.tableMetadata = nil +} + +// finish attempts to finish the transaction by either committing it or exiting without committing. +// In order to follow the ExecuteActions protocol, we must distinguish Spanner-generated errors +// (e.g. RowNotFound) and internal errors (e.g. a precondition is not met). Errors returned from +// Spanner populate the status of SpannerActionOutcome. Internal errors, in contrast, break the +// stubby call. For this reason, finish() has two return values dedicated to errors. If any of +// these errors is not nil, other return values are undefined. +// Return values in order: +// 1. Whether or not the transaction is restarted. It will be true if commit has been attempted, +// but Spanner returned aborted and restarted instead. When that happens, all reads and writes +// should be replayed, followed by another commit attempt. +// 2. Commit timestamp. It's returned only if commit has succeeded. +// 3. Spanner error -- an error that Spanner client returned. +// 4. Internal error. +func (c *ExecutionFlowContext) finish(ctx context.Context, txnFinishMode executorpb.FinishTransactionAction_Mode) (bool, *time.Time, error, error) { + if txnFinishMode == executorpb.FinishTransactionAction_COMMIT { + var err error + ts, err := c.rwTxn.Commit(ctx) + if err != nil { + log.Printf("Transaction finished with error %v", err) + if status.Code(err) == codes.Aborted { + log.Println("Transaction Aborted. Sending status to outcome sender to restart the transaction.") + return true, nil, nil, nil + } + // Filter expected errors + if status.Code(err) == codes.Unknown && strings.Contains(err.Error(), "Transaction outcome unknown") { + return false, nil, spanner.ToSpannerError(status.Error(codes.DeadlineExceeded, "Transaction outcome unknown")), nil + } + // TODO(harsha): check if this is an internal or spanner error + return false, nil, err, nil + } + return false, &ts, nil, nil + } else if txnFinishMode == executorpb.FinishTransactionAction_ABANDON { + log.Printf("Transaction Abandoned") + c.rwTxn.Rollback(ctx) + return false, nil, nil, nil + } else { + return false, nil, nil, spanner.ToSpannerError(status.Errorf(codes.InvalidArgument, "Unsupported finish mode %s", txnFinishMode.String())) + } +} + +// CloseOpenTransactions cleans up all the active transactions if the stubby call is closing. +func (c *ExecutionFlowContext) CloseOpenTransactions() { + c.mu.Lock() + defer c.mu.Unlock() + if c.roTxn != nil { + log.Println("A read only transaction was active when stubby call closed.") + c.roTxn.Close() + } + if c.rwTxn != nil { + log.Println("Abandon a read-write transaction that was active when stubby call closed.") + _, _, _, err := c.finish(context.Background(), executorpb.FinishTransactionAction_ABANDON) + if err != nil { + log.Printf("Failed to abandon a read-write transaction: %v\n", err) + } + } + if c.batchTxn != nil { + log.Println("A batch transaction was active when stubby call closed.") + c.batchTxn.Close() + } +} diff --git a/spanner/test/cloudexecutor/executor/actions/mutation.go b/spanner/test/cloudexecutor/executor/actions/mutation.go new file mode 100644 index 000000000000..c282ccbfcd18 --- /dev/null +++ b/spanner/test/cloudexecutor/executor/actions/mutation.go @@ -0,0 +1,176 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package actions + +import ( + "context" + "fmt" + "log" + + "cloud.google.com/go/spanner" + "cloud.google.com/go/spanner/apiv1/spannerpb" + "cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/outputstream" + "cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/utility" + executorpb "cloud.google.com/go/spanner/test/cloudexecutor/proto" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// WriteActionHandler holds the necessary components required for Write action. +type WriteActionHandler struct { + Action *executorpb.MutationAction + FlowContext *ExecutionFlowContext + OutcomeSender *outputstream.OutcomeSender +} + +// ExecuteAction that execute a Write action request. +func (h *WriteActionHandler) ExecuteAction(ctx context.Context) error { + log.Printf("executing write action: %v", h.Action) + h.FlowContext.mu.Lock() + defer h.FlowContext.mu.Unlock() + + m, err := createMutation(h.Action, h.FlowContext.tableMetadata) + if err != nil { + return h.OutcomeSender.FinishWithError(err) + } + + _, err = h.FlowContext.dbClient.Apply(ctx, m) + if err != nil { + return h.OutcomeSender.FinishWithError(err) + } + return h.OutcomeSender.FinishSuccessfully() +} + +// MutationActionHandler holds the necessary components required for Mutation action. +type MutationActionHandler struct { + Action *executorpb.MutationAction + FlowContext *ExecutionFlowContext + OutcomeSender *outputstream.OutcomeSender +} + +// ExecuteAction that execute a Mutation action request. +func (h *MutationActionHandler) ExecuteAction(ctx context.Context) error { + log.Printf("Buffering mutation %v", h.Action) + h.FlowContext.mu.Lock() + defer h.FlowContext.mu.Unlock() + txn, err := h.FlowContext.getTransactionForWrite() + if err != nil { + return h.OutcomeSender.FinishWithError(err) + } + m, err := createMutation(h.Action, h.FlowContext.tableMetadata) + if err != nil { + return h.OutcomeSender.FinishWithError(err) + } + + err = txn.BufferWrite(m) + if err != nil { + return h.OutcomeSender.FinishWithError(err) + } + return h.OutcomeSender.FinishSuccessfully() +} + +// createMutation creates cloud spanner.Mutation from given executorpb.MutationAction. +func createMutation(action *executorpb.MutationAction, tableMetadata *utility.TableMetadataHelper) ([]*spanner.Mutation, error) { + prevTable := "" + var m []*spanner.Mutation + for _, mod := range action.Mod { + table := mod.GetTable() + if table == "" { + table = prevTable + } + if table == "" { + return nil, spanner.ToSpannerError(status.Error(codes.InvalidArgument, fmt.Sprintf("table name is missing from mod: action %s ", action.String()))) + } + prevTable = table + log.Printf("executing mutation mod: \n%s", mod.String()) + + switch { + case mod.Insert != nil: + ia := mod.Insert + cloudRows, err := cloudValuesFromExecutorValueLists(ia.GetValues(), ia.GetType()) + if err != nil { + return nil, err + } + for _, cloudRow := range cloudRows { + m = append(m, spanner.Insert(table, ia.GetColumn(), cloudRow)) + } + case mod.Update != nil: + ua := mod.Update + cloudRows, err := cloudValuesFromExecutorValueLists(ua.GetValues(), ua.GetType()) + if err != nil { + return nil, err + } + for _, cloudRow := range cloudRows { + m = append(m, spanner.Update(table, ua.GetColumn(), cloudRow)) + } + case mod.InsertOrUpdate != nil: + ia := mod.InsertOrUpdate + cloudRows, err := cloudValuesFromExecutorValueLists(ia.GetValues(), ia.GetType()) + if err != nil { + return nil, err + } + for _, cloudRow := range cloudRows { + m = append(m, spanner.InsertOrUpdate(table, ia.GetColumn(), cloudRow)) + } + case mod.Replace != nil: + ia := mod.Replace + cloudRows, err := cloudValuesFromExecutorValueLists(ia.GetValues(), ia.GetType()) + if err != nil { + return nil, err + } + for _, cloudRow := range cloudRows { + m = append(m, spanner.Replace(table, ia.GetColumn(), cloudRow)) + } + case mod.DeleteKeys != nil: + keyColTypes, err := tableMetadata.GetKeyColumnTypes(table) + if err != nil { + return nil, err + } + keySet, err := utility.KeySetProtoToCloudKeySet(mod.DeleteKeys, keyColTypes) + m = append(m, spanner.Delete(table, keySet)) + default: + return nil, spanner.ToSpannerError(status.Errorf(codes.InvalidArgument, "unsupported mod: %s", mod.String())) + } + } + return m, nil +} + +// cloudValuesFromExecutorValueLists produces rows of Cloud Spanner values given []*executorpb.ValueList and []*spannerpb.Type. +// Each ValueList results in a row, and all of them should have the same column types. +func cloudValuesFromExecutorValueLists(valueLists []*executorpb.ValueList, types []*spannerpb.Type) ([][]any, error) { + var cloudRows [][]any + for _, rowValues := range valueLists { + log.Printf("Converting ValueList: %s\n", rowValues) + if len(rowValues.GetValue()) != len(types) { + return nil, spanner.ToSpannerError(status.Error(codes.InvalidArgument, "number of values should be equal to number of types")) + } + + var cloudRow []any + for i, v := range rowValues.GetValue() { + isNull := false + switch v.GetValueType().(type) { + case *executorpb.Value_IsNull: + isNull = true + } + val, err := utility.ExecutorValueToSpannerValue(types[i], v, isNull) + if err != nil { + return nil, err + } + cloudRow = append(cloudRow, val) + } + cloudRows = append(cloudRows, cloudRow) + } + return cloudRows, nil +} diff --git a/spanner/test/cloudexecutor/executor/actions/partition.go b/spanner/test/cloudexecutor/executor/actions/partition.go index 74fee6b5be6b..b7075eb58454 100644 --- a/spanner/test/cloudexecutor/executor/actions/partition.go +++ b/spanner/test/cloudexecutor/executor/actions/partition.go @@ -209,7 +209,7 @@ func (h *PartitionedUpdate) ExecuteAction(ctx context.Context) error { opts := h.Action.GetOptions() stmt := spanner.Statement{SQL: h.Action.GetUpdate().GetSql()} - count, err := h.FlowContext.dbClient.PartitionedUpdateWithOptions(h.FlowContext.TxnContext, stmt, spanner.QueryOptions{ + count, err := h.FlowContext.dbClient.PartitionedUpdateWithOptions(ctx, stmt, spanner.QueryOptions{ Priority: opts.GetRpcPriority(), RequestTag: opts.GetTag(), }) diff --git a/spanner/test/cloudexecutor/executor/actions/transaction.go b/spanner/test/cloudexecutor/executor/actions/transaction.go new file mode 100644 index 000000000000..bccccebe2d22 --- /dev/null +++ b/spanner/test/cloudexecutor/executor/actions/transaction.go @@ -0,0 +1,204 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package actions + +import ( + "context" + "log" + "time" + + "cloud.google.com/go/spanner" + sppb "cloud.google.com/go/spanner/apiv1/spannerpb" + "cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/outputstream" + "cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/utility" + executorpb "cloud.google.com/go/spanner/test/cloudexecutor/proto" + "google.golang.org/api/option" + spb "google.golang.org/genproto/googleapis/rpc/status" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/timestamppb" +) + +// StartTxnHandler holds the necessary components and options required for start transaction action. +type StartTxnHandler struct { + Action *executorpb.StartTransactionAction + FlowContext *ExecutionFlowContext + OutcomeSender *outputstream.OutcomeSender + Options []option.ClientOption +} + +// ExecuteAction that starts a read-write or read-only transaction. +func (h *StartTxnHandler) ExecuteAction(ctx context.Context) error { + h.FlowContext.mu.Lock() + defer h.FlowContext.mu.Unlock() + if h.FlowContext.Database == "" { + return h.OutcomeSender.FinishWithError(spanner.ToSpannerError(status.Error(codes.InvalidArgument, "database path must be set for this action"))) + } + if h.Action.GetTransactionSeed() != "" { + h.FlowContext.transactionSeed = h.Action.GetTransactionSeed() + } + metadata := &utility.TableMetadataHelper{} + metadata.InitFrom(h.Action) + h.FlowContext.tableMetadata = metadata + + // TODO(harsha) where do I close the client? defer client.Close() + client, err := spanner.NewClient(ctx, h.FlowContext.Database, h.Options...) + if err != nil { + return h.OutcomeSender.FinishWithError(err) + } + h.FlowContext.dbClient = client + if h.FlowContext.isTransactionActiveLocked() { + return h.OutcomeSender.FinishWithError(spanner.ToSpannerError(status.Error(codes.InvalidArgument, "already in a transaction"))) + } + if h.Action.Concurrency != nil { + log.Printf("starting read-only transaction %s:\n %v", h.FlowContext.transactionSeed, h.Action) + timestampBound, err := timestampBoundsFromConcurrency(h.Action.GetConcurrency()) + if err != nil { + return h.OutcomeSender.FinishWithError(err) + } + // Start a read-only transaction. + var txn *spanner.ReadOnlyTransaction + singleUseReadOnlyTransactionNeeded := isSingleUseReadOnlyTransactionNeeded(h.Action.GetConcurrency()) + if singleUseReadOnlyTransactionNeeded { + txn = client.Single().WithTimestampBound(timestampBound) + } else { + txn = client.ReadOnlyTransaction().WithTimestampBound(timestampBound) + } + h.FlowContext.roTxn = txn + h.FlowContext.currentActiveTransaction = Read + } else { + log.Printf("starting read-write transaction %s:\n %v", h.FlowContext.transactionSeed, h.Action) + // Start a read-write transaction. + var txn *spanner.ReadWriteStmtBasedTransaction + if h.Action.GetExecutionOptions().GetOptimistic() { + txn, err = spanner.NewReadWriteStmtBasedTransactionWithOptions(ctx, client, spanner.TransactionOptions{ReadLockMode: sppb.TransactionOptions_ReadWrite_OPTIMISTIC}) + } else { + txn, err = spanner.NewReadWriteStmtBasedTransaction(ctx, client) + } + if err != nil { + return h.OutcomeSender.FinishWithError(err) + } + h.FlowContext.rwTxn = txn + h.FlowContext.currentActiveTransaction = ReadWrite + } + h.FlowContext.initReadState() + return h.OutcomeSender.FinishSuccessfully() +} + +// FinishTxnHandler holds the necessary components and options required for finish transaction action. +type FinishTxnHandler struct { + Action *executorpb.FinishTransactionAction + FlowContext *ExecutionFlowContext + OutcomeSender *outputstream.OutcomeSender +} + +// ExecuteAction that finish a transaction. For read-write transaction, either commit or abandon the transaction is allowed. +// Batch transaction is not supported here. +func (h *FinishTxnHandler) ExecuteAction(ctx context.Context) error { + h.FlowContext.mu.Lock() + defer h.FlowContext.mu.Unlock() + log.Printf("Finishing transaction %s\n %v", h.FlowContext.transactionSeed, h.Action) + + if h.FlowContext.numPendingReads > 0 { + return h.OutcomeSender.FinishWithError(spanner.ToSpannerError(status.Error(codes.FailedPrecondition, "Reads pending when trying to finish"))) + } + outcome := &executorpb.SpannerActionOutcome{Status: &spb.Status{Code: int32(codes.OK)}} + + // Finish active transaction in given finishMode, then send outcome back to client. + if h.FlowContext.roTxn != nil { + // Finish a read-only transaction. Note that timestamp may not be available + // if there were no reads or queries. + ts, err := h.FlowContext.roTxn.Timestamp() + if err != nil { + return h.OutcomeSender.FinishWithError(err) + } + outcome.CommitTime = timestamppb.New(ts) + + h.FlowContext.roTxn.Close() + h.FlowContext.clear() + return h.OutcomeSender.SendOutcome(outcome) + } + + if h.FlowContext.rwTxn != nil { + // Finish a read-write transaction. + txnFinishMode := h.Action.GetMode() + restarted, ts, spanErr, internalErr := h.FlowContext.finish(ctx, txnFinishMode) + if internalErr != nil { + log.Printf("Unexpected error when finishing read-write transaction: %s", internalErr.Error()) + return h.OutcomeSender.FinishWithError(internalErr) + } else if spanErr != nil { + outcome.Status = utility.ErrToStatus(spanErr) + h.FlowContext.clear() + } else if restarted { + log.Println("Transaction restarted") + transactionRestarted := true + outcome.TransactionRestarted = &transactionRestarted + } else { + if ts != nil { + outcome.CommitTime = timestamppb.New(*ts) + } + h.FlowContext.clear() + } + return h.OutcomeSender.SendOutcome(outcome) + } + + if h.FlowContext.batchTxn != nil { + return h.OutcomeSender.FinishWithError(spanner.ToSpannerError(status.Error(codes.InvalidArgument, "Can't commit/abort a batch transaction"))) + } + return h.OutcomeSender.FinishWithError(spanner.ToSpannerError(status.Error(codes.InvalidArgument, "no currently active transaction"))) +} + +// isSingleUseReadOnlyTransactionNeeded decides type of read-only transaction based on concurrency. +func isSingleUseReadOnlyTransactionNeeded(c *executorpb.Concurrency) bool { + switch c.GetConcurrencyMode().(type) { + case *executorpb.Concurrency_MinReadTimestampMicros: + return true + case *executorpb.Concurrency_MaxStalenessSeconds: + return true + default: + return false + } +} + +// timestampFromMicros converts micros to time.Time +func timestampFromMicros(micros int64) time.Time { + seconds := micros / 1000000 + nanos := (micros % 1000000) * 1000 + return time.Unix(seconds, nanos) +} + +// timestampBoundsFromConcurrency converts executorpb.Concurrency to spanner.TimestampBound. +func timestampBoundsFromConcurrency(c *executorpb.Concurrency) (spanner.TimestampBound, error) { + switch c.GetConcurrencyMode().(type) { + case *executorpb.Concurrency_StalenessSeconds: + secs := c.GetStalenessSeconds() + dur := time.Duration(secs) * time.Second + return spanner.ExactStaleness(dur), nil + case *executorpb.Concurrency_MinReadTimestampMicros: + return spanner.MinReadTimestamp(timestampFromMicros(c.GetMinReadTimestampMicros())), nil + case *executorpb.Concurrency_MaxStalenessSeconds: + secs := c.GetMaxStalenessSeconds() + dur := time.Duration(secs) * time.Second + return spanner.MaxStaleness(dur), nil + case *executorpb.Concurrency_ExactTimestampMicros: + return spanner.ReadTimestamp(timestampFromMicros(c.GetExactTimestampMicros())), nil + case *executorpb.Concurrency_Strong: + return spanner.StrongRead(), nil + case *executorpb.Concurrency_Batch: + return spanner.TimestampBound{}, spanner.ToSpannerError(status.Error(codes.InvalidArgument, "batch mode should not be in snapshot transaction")) + default: + return spanner.StrongRead(), spanner.ToSpannerError(status.Errorf(codes.InvalidArgument, "unsupported concurrency mode %s", c.String())) + } +} diff --git a/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go b/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go index fd639d193c4a..f5b819e49392 100644 --- a/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go +++ b/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go @@ -19,35 +19,228 @@ package inputstream import ( "context" + "fmt" + "io" + "log" "sync" + "cloud.google.com/go/spanner" + "cloud.google.com/go/spanner/test/cloudexecutor/executor/actions" + "cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/outputstream" executorpb "cloud.google.com/go/spanner/test/cloudexecutor/proto" "google.golang.org/api/option" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) -// CloudStreamHandler handles a single streaming ExecuteActions request by performing incoming +// CloudStreamHandler handles a streaming ExecuteActions request by performing incoming // actions. It maintains a state associated with the request, such as current transaction. -// -// CloudStreamHandler uses contexts (context.Context) to coordinate execution of asynchronous -// actions. The Stubby stream's context becomes a parent for all individual actions' contexts. This -// is done so that we don't leak anything when the stream is closed. -// -// startTxnHandler is a bit different from other actions. Read-write transactions that it -// starts outlive the action itself, so the Stubby stream's context is used for transactions -// instead of the action's context. -// -// For more info about contexts in Go, read golang.org/pkg/context type CloudStreamHandler struct { // members below should be set by the caller Stream executorpb.SpannerExecutorProxy_ExecuteActionAsyncServer ServerContext context.Context Options []option.ClientOption // members below represent internal state - mu sync.Mutex // protects mutable internal state + executionFlowContext *actions.ExecutionFlowContext + mu sync.Mutex // protects mutable internal state } // Execute executes the given ExecuteActions request, blocking until it's done. It takes care of // properly closing the request stream in the end. func (h *CloudStreamHandler) Execute() error { + log.Println("ExecuteActionAsync RPC called. Start handling input stream") + + var c *actions.ExecutionFlowContext + func() { + h.mu.Lock() + defer h.mu.Unlock() + c = &actions.ExecutionFlowContext{} + h.executionFlowContext = c + }() + + // In case this function returns abruptly, or client misbehaves, make sure to dispose of + // transactions. + defer func() { + c.CloseOpenTransactions() + }() + + ctx := context.Background() + // Main loop that receives and executes actions. + for { + req, err := h.Stream.Recv() + if err == io.EOF { + log.Println("Client called Done, half-closed the stream") + break + } + if err != nil { + log.Printf("Failed to receive request from client: %v", err) + return err + } + if err = h.startHandlingRequest(ctx, req); err != nil { + log.Printf("Failed to handle request %v, Client ends the stream with error: %v", req, err) + // TODO(sriharshach): should we throw the error here instead of nil? + return nil + } + } + log.Println("Done executing actions") return nil } + +// startHandlingRequest takes care of the given request. It picks an actionHandler and starts +// a goroutine in which that action will be executed. +func (h *CloudStreamHandler) startHandlingRequest(ctx context.Context, req *executorpb.SpannerAsyncActionRequest) error { + log.Printf("start handling request %v", req) + h.mu.Lock() + defer h.mu.Unlock() + + outcomeSender := outputstream.NewOutcomeSender(req.GetActionId(), h.Stream) + + inputAction := req.GetAction() + if inputAction == nil { + log.Println("Invalid SpannerAsyncActionRequest, input action is nil") + return outcomeSender.FinishWithError(spanner.ToSpannerError(status.Error(codes.InvalidArgument, "Invalid request"))) + } + + // Get a new action handler based on the input action. + actionHandler, err := h.newActionHandler(inputAction, outcomeSender) + if err != nil { + return outcomeSender.FinishWithError(err) + } + + // Create a channel to receive the error from the goroutine. + errCh := make(chan error, 1) + successCh := make(chan bool, 1) + + go func() { + err := actionHandler.ExecuteAction(ctx) + if err != nil { + log.Printf("Failed to execute action with error %v: %v", inputAction, err) + errCh <- err + } else { + successCh <- true + } + }() + + // Wait for the goroutine to finish or return an error if one occurs. + select { + case err := <-errCh: + // An error occurred in the goroutine. + log.Printf("Client ends the stream with error. Failed to execute action %v with error: %v", inputAction, err) + return err + case <-successCh: + // Success signal received. + log.Println("Action executed successfully") + return nil + } +} + +// newActionHandler instantiates an actionHandler for executing the given action. +func (h *CloudStreamHandler) newActionHandler(action *executorpb.SpannerAction, outcomeSender *outputstream.OutcomeSender) (cloudActionHandler, error) { + if action.DatabasePath != "" { + h.executionFlowContext.Database = action.DatabasePath + } + switch action.GetAction().(type) { + case *executorpb.SpannerAction_Start: + return &actions.StartTxnHandler{ + Action: action.GetStart(), + FlowContext: h.executionFlowContext, + OutcomeSender: outcomeSender, + Options: h.Options, + }, nil + case *executorpb.SpannerAction_Finish: + return &actions.FinishTxnHandler{ + Action: action.GetFinish(), + FlowContext: h.executionFlowContext, + OutcomeSender: outcomeSender, + }, nil + case *executorpb.SpannerAction_Admin: + adminAction := &actions.AdminActionHandler{ + Action: action.GetAdmin(), + FlowContext: h.executionFlowContext, + OutcomeSender: outcomeSender, + Options: h.Options, + } + return adminAction, nil + case *executorpb.SpannerAction_Read: + return &actions.ReadActionHandler{ + Action: action.GetRead(), + FlowContext: h.executionFlowContext, + OutcomeSender: outcomeSender, + }, nil + case *executorpb.SpannerAction_Query: + return &actions.QueryActionHandler{ + Action: action.GetQuery(), + FlowContext: h.executionFlowContext, + OutcomeSender: outcomeSender, + }, nil + case *executorpb.SpannerAction_Mutation: + return &actions.MutationActionHandler{ + Action: action.GetMutation(), + FlowContext: h.executionFlowContext, + OutcomeSender: outcomeSender, + }, nil + case *executorpb.SpannerAction_Write: + return &actions.WriteActionHandler{ + Action: action.GetWrite().GetMutation(), + FlowContext: h.executionFlowContext, + OutcomeSender: outcomeSender, + }, nil + case *executorpb.SpannerAction_Dml: + return &actions.DmlActionHandler{ + Action: action.GetDml(), + FlowContext: h.executionFlowContext, + OutcomeSender: outcomeSender, + }, nil + case *executorpb.SpannerAction_StartBatchTxn: + return &actions.StartBatchTxnHandler{ + Action: action.GetStartBatchTxn(), + FlowContext: h.executionFlowContext, + OutcomeSender: outcomeSender, + Options: h.Options, + }, nil + case *executorpb.SpannerAction_GenerateDbPartitionsRead: + return &actions.PartitionReadActionHandler{ + Action: action.GetGenerateDbPartitionsRead(), + FlowContext: h.executionFlowContext, + OutcomeSender: outcomeSender, + }, nil + case *executorpb.SpannerAction_GenerateDbPartitionsQuery: + return &actions.PartitionQueryActionHandler{ + Action: action.GetGenerateDbPartitionsQuery(), + FlowContext: h.executionFlowContext, + OutcomeSender: outcomeSender, + }, nil + case *executorpb.SpannerAction_ExecutePartition: + return &actions.ExecutePartition{ + Action: action.GetExecutePartition(), + FlowContext: h.executionFlowContext, + OutcomeSender: outcomeSender, + }, nil + case *executorpb.SpannerAction_PartitionedUpdate: + return &actions.PartitionedUpdate{ + Action: action.GetPartitionedUpdate(), + FlowContext: h.executionFlowContext, + OutcomeSender: outcomeSender, + }, nil + case *executorpb.SpannerAction_CloseBatchTxn: + return &actions.CloseBatchTxnHandler{ + Action: action.GetCloseBatchTxn(), + FlowContext: h.executionFlowContext, + OutcomeSender: outcomeSender, + }, nil + case *executorpb.SpannerAction_BatchDml: + return &actions.BatchDmlHandler{ + Action: action.GetBatchDml(), + FlowContext: h.executionFlowContext, + OutcomeSender: outcomeSender, + }, nil + default: + return nil, outcomeSender.FinishWithError(status.Error(codes.Unimplemented, fmt.Sprintf("not implemented yet %T", action.GetAction()))) + } +} + +// cloudActionHandler is an interface representing an entity responsible for executing a particular +// kind of SpannerActions. +type cloudActionHandler interface { + ExecuteAction(context.Context) error +} diff --git a/spanner/test/cloudexecutor/executor/internal/outputstream/handler.go b/spanner/test/cloudexecutor/executor/internal/outputstream/handler.go index 86f0f0207643..d37dd22d20f0 100644 --- a/spanner/test/cloudexecutor/executor/internal/outputstream/handler.go +++ b/spanner/test/cloudexecutor/executor/internal/outputstream/handler.go @@ -125,8 +125,6 @@ func (s *OutcomeSender) FinishWithTransactionRestarted() error { // FinishWithError sends the last outcome with given error status. func (s *OutcomeSender) FinishWithError(err error) error { s.buildOutcome() - //TODO(harsha:oct10) uncomment below line and comment s.partialOutcome.Status = errToStatus(err) - //s.partialOutcome.Status = &status.Status{Code: int32(gstatus.Code(err)), Message: err.Error()} s.partialOutcome.Status = utility.ErrToStatus(err) return s.flush() } diff --git a/spanner/test/cloudexecutor/executor/internal/utility/executor_to_spanner_conversion.go b/spanner/test/cloudexecutor/executor/internal/utility/executor_to_spanner_conversion.go deleted file mode 100644 index 6e7748d63988..000000000000 --- a/spanner/test/cloudexecutor/executor/internal/utility/executor_to_spanner_conversion.go +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright 2023 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package utility - -import ( - "cloud.google.com/go/spanner" - "cloud.google.com/go/spanner/apiv1/spannerpb" - executorpb "cloud.google.com/go/spanner/test/cloudexecutor/proto" -) - -// BuildQuery constructs a spanner.Statement query and bind the params from the input executor query. -func BuildQuery(queryAction *executorpb.QueryAction) (spanner.Statement, error) { - return spanner.Statement{}, nil -} - -// KeySetProtoToCloudKeySet converts an executor KeySet to a Cloud Spanner KeySet instance. -func KeySetProtoToCloudKeySet(keySetProto *executorpb.KeySet, typeList []*spannerpb.Type) (spanner.KeySet, error) { - return spanner.AllKeys(), nil -} diff --git a/spanner/test/cloudexecutor/executor/internal/utility/executor_to_spanner_value_converter.go b/spanner/test/cloudexecutor/executor/internal/utility/executor_to_spanner_value_converter.go new file mode 100644 index 000000000000..31a66d7ca87c --- /dev/null +++ b/spanner/test/cloudexecutor/executor/internal/utility/executor_to_spanner_value_converter.go @@ -0,0 +1,330 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utility + +import ( + "encoding/json" + "fmt" + "log" + "math/big" + "reflect" + "time" + + "cloud.google.com/go/civil" + "cloud.google.com/go/spanner" + "cloud.google.com/go/spanner/apiv1/spannerpb" + executorpb "cloud.google.com/go/spanner/test/cloudexecutor/proto" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// BuildQuery constructs a spanner.Statement query and bind the params from the input executorpb.QueryAction. +func BuildQuery(queryAction *executorpb.QueryAction) (spanner.Statement, error) { + stmt := spanner.Statement{SQL: queryAction.GetSql(), Params: make(map[string]interface{})} + for _, param := range queryAction.GetParams() { + value, err := ExecutorValueToSpannerValue(param.GetType(), param.GetValue(), param.GetValue().GetIsNull()) + if err != nil { + return spanner.Statement{}, err + } + stmt.Params[param.GetName()] = value + } + return stmt, nil +} + +// ExecutorValueToSpannerValue converts executorpb.Value with given spannerpb.Type into a cloud spanner interface. +// Parameter null indicates whether this value is NULL. +func ExecutorValueToSpannerValue(t *spannerpb.Type, v *executorpb.Value, null bool) (any, error) { + if v.GetIsCommitTimestamp() { + return spanner.NullTime{Time: spanner.CommitTimestamp, Valid: true}, nil + } + switch t.GetCode() { + case spannerpb.TypeCode_INT64: + return spanner.NullInt64{Int64: v.GetIntValue(), Valid: !null}, nil + case spannerpb.TypeCode_FLOAT64: + return spanner.NullFloat64{Float64: v.GetDoubleValue(), Valid: !null}, nil + case spannerpb.TypeCode_STRING: + return spanner.NullString{StringVal: v.GetStringValue(), Valid: !null}, nil + case spannerpb.TypeCode_BYTES: + if null { + return []byte(nil), nil + } + out := v.GetBytesValue() + if out == nil { + out = make([]byte, 0) + } + return out, nil + case spannerpb.TypeCode_BOOL: + if null { + return spanner.NullBool{}, nil + } + return spanner.NullBool{Bool: v.GetBoolValue(), Valid: !null}, nil + case spannerpb.TypeCode_TIMESTAMP: + if null { + return spanner.NullTime{Time: time.Unix(0, 0), Valid: false}, nil + } + if v.GetIsCommitTimestamp() || v.GetBytesValue() != nil { + return spanner.NullTime{Time: spanner.CommitTimestamp, Valid: true}, nil + } + return spanner.NullTime{Time: time.Unix(v.GetTimestampValue().Seconds, int64(v.GetTimestampValue().Nanos)), Valid: true}, nil + case spannerpb.TypeCode_DATE: + epoch := civil.DateOf(time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC)) + y := epoch.AddDays(int(v.GetDateDaysValue())) + return spanner.NullDate{Date: y, Valid: !null}, nil + case spannerpb.TypeCode_NUMERIC: + if null { + return spanner.NullNumeric{Numeric: big.Rat{}, Valid: false}, nil + } + x := v.GetStringValue() + y, ok := (&big.Rat{}).SetString(x) + if !ok { + return nil, spanner.ToSpannerError(status.Error(codes.InvalidArgument, fmt.Sprintf("unexpected string value %q for numeric number", x))) + } + return spanner.NullNumeric{Numeric: *y, Valid: true}, nil + case spannerpb.TypeCode_JSON: + if null { + return spanner.NullJSON{}, nil + } + x := v.GetStringValue() + var y interface{} + err := json.Unmarshal([]byte(x), &y) + if err != nil { + return nil, err + } + return spanner.NullJSON{Value: y, Valid: true}, nil + case spannerpb.TypeCode_STRUCT: + return executorStructValueToSpannerValue(t, v.GetStructValue(), null) + case spannerpb.TypeCode_ARRAY: + return executorArrayValueToSpannerValue(t, v, null) + default: + return nil, status.Errorf(codes.InvalidArgument, "executorValueToSpannerValue: unsupported type %s while converting from value proto.", t.GetCode().String()) + } +} + +// executorStructValueToSpannerValue converts executorpb.Value with spannerpb.Type of TypeCode_STRUCT to a dynamically +// created pointer to a Go struct value with a type derived from t. If null is set, returns a nil pointer +// of the Go struct type for NULL struct values. +func executorStructValueToSpannerValue(t *spannerpb.Type, v *executorpb.ValueList, null bool) (any, error) { + var fieldValues []*executorpb.Value + fieldTypes := t.GetStructType().GetFields() + if !null { + fieldValues = v.GetValue() + if len(fieldValues) != len(fieldTypes) { + return nil, spanner.ToSpannerError(status.Error(codes.InvalidArgument, "Mismatch between number of expected fields and specified values for struct type")) + } + } + + cloudFields := make([]reflect.StructField, 0, len(fieldTypes)) + cloudFieldVals := make([]any, 0, len(fieldTypes)) + + // Convert the fields to Go types and build the struct's dynamic type. + for i := 0; i < len(fieldTypes); i++ { + var techValue *executorpb.Value + var isnull bool + + if null { + isnull = true + techValue = nil + } else { + isnull = isNullTechValue(fieldValues[i]) + techValue = fieldValues[i] + } + + // Go structs do not allow empty and duplicate field names and lowercase field names + // make the field unexported. We use struct tags for specifying field names. + cloudFieldVal, err := ExecutorValueToSpannerValue(fieldTypes[i].Type, techValue, isnull) + if err != nil { + return nil, err + } + if cloudFieldVal == nil { + return nil, status.Errorf(codes.Internal, "Was not able to calculate the type for %s", fieldTypes[i].Type) + } + + cloudFields = append(cloudFields, + reflect.StructField{ + Name: fmt.Sprintf("Field_%d", i), + Type: reflect.TypeOf(cloudFieldVal), + Tag: reflect.StructTag(fmt.Sprintf(`spanner:"%s"`, fieldTypes[i].Name)), + }) + cloudFieldVals = append(cloudFieldVals, cloudFieldVal) + } + + cloudStructType := reflect.StructOf(cloudFields) + if null { + // Return a nil pointer to Go struct with the built struct type. + return reflect.Zero(reflect.PtrTo(cloudStructType)).Interface(), nil + } + // For a non-null struct, set the field values. + cloudStruct := reflect.New(cloudStructType) + for i, fieldVal := range cloudFieldVals { + cloudStruct.Elem().Field(i).Set(reflect.ValueOf(fieldVal)) + } + // Returns a pointer to the Go struct. + return cloudStruct.Interface(), nil +} + +// isNullTechValue returns whether an executorpb.Value is Value_IsNull or not. +func isNullTechValue(tv *executorpb.Value) bool { + switch tv.GetValueType().(type) { + case *executorpb.Value_IsNull: + return true + default: + return false + } +} + +// executorArrayValueToSpannerValue converts executorpb.Value with spannerpb.Type of TypeCode_ARRAY into a cloud Spanner's interface. +func executorArrayValueToSpannerValue(t *spannerpb.Type, v *executorpb.Value, null bool) (any, error) { + switch t.GetArrayElementType().GetCode() { + case spannerpb.TypeCode_INT64: + if null { + return ([]spanner.NullInt64)(nil), nil + } + out := make([]spanner.NullInt64, 0) + for _, value := range v.GetArrayValue().GetValue() { + out = append(out, spanner.NullInt64{value.GetIntValue(), !value.GetIsNull()}) + } + return out, nil + case spannerpb.TypeCode_FLOAT64: + if null { + return ([]spanner.NullFloat64)(nil), nil + } + out := make([]spanner.NullFloat64, 0) + for _, value := range v.GetArrayValue().GetValue() { + out = append(out, spanner.NullFloat64{value.GetDoubleValue(), !value.GetIsNull()}) + } + return out, nil + case spannerpb.TypeCode_STRING: + if null { + return ([]spanner.NullString)(nil), nil + } + out := make([]spanner.NullString, 0) + for _, value := range v.GetArrayValue().GetValue() { + out = append(out, spanner.NullString{value.GetStringValue(), !value.GetIsNull()}) + } + return out, nil + case spannerpb.TypeCode_BYTES: + if null { + return ([][]byte)(nil), nil + } + out := make([][]byte, 0) + for _, value := range v.GetArrayValue().GetValue() { + if !value.GetIsNull() { + out = append(out, value.GetBytesValue()) + } + } + return out, nil + case spannerpb.TypeCode_BOOL: + if null { + return ([]spanner.NullBool)(nil), nil + } + out := make([]spanner.NullBool, 0) + for _, value := range v.GetArrayValue().GetValue() { + out = append(out, spanner.NullBool{Bool: value.GetBoolValue(), Valid: !value.GetIsNull()}) + } + return out, nil + case spannerpb.TypeCode_TIMESTAMP: + if null { + return ([]spanner.NullTime)(nil), nil + } + out := make([]spanner.NullTime, 0) + for _, value := range v.GetArrayValue().GetValue() { + spannerValue, err := ExecutorValueToSpannerValue(t.GetArrayElementType(), value, value.GetIsNull()) + if err != nil { + return nil, err + } + if v, ok := spannerValue.(spanner.NullTime); ok { + out = append(out, v) + } + } + return out, nil + case spannerpb.TypeCode_DATE: + if null { + return ([]spanner.NullDate)(nil), nil + } + out := make([]spanner.NullDate, 0) + for _, value := range v.GetArrayValue().GetValue() { + spannerValue, err := ExecutorValueToSpannerValue(t.GetArrayElementType(), value, value.GetIsNull()) + if err != nil { + return nil, err + } + if v, ok := spannerValue.(spanner.NullDate); ok { + out = append(out, v) + } + } + return out, nil + case spannerpb.TypeCode_NUMERIC: + if null { + return ([]spanner.NullNumeric)(nil), nil + } + out := make([]spanner.NullNumeric, 0) + for _, value := range v.GetArrayValue().GetValue() { + if value.GetIsNull() { + out = append(out, spanner.NullNumeric{Numeric: big.Rat{}, Valid: false}) + } else { + y, ok := (&big.Rat{}).SetString(value.GetStringValue()) + if !ok { + return nil, spanner.ToSpannerError(status.Errorf(codes.InvalidArgument, "unexpected string value %q for numeric number", value.GetStringValue())) + } + out = append(out, spanner.NullNumeric{*y, true}) + } + } + return out, nil + case spannerpb.TypeCode_STRUCT: + if null { + log.Println("Failing again due to passing untyped nil value for array of structs. Might need to change to typed nil similar to other types") + } + // Non-NULL array of structs + structElemType := t.GetArrayElementType() + in := v.GetArrayValue() + + // Create a dummy struct value to get the element type. + dummyStructPtr, err := executorStructValueToSpannerValue(structElemType, nil, true) + if err != nil { + return nil, err + } + goStructType := reflect.TypeOf(dummyStructPtr) + + out := reflect.MakeSlice(reflect.SliceOf(goStructType), 0, len(in.GetValue())) + for _, value := range in.GetValue() { + cv, err := executorStructValueToSpannerValue(structElemType, value.GetStructValue(), false) + if err != nil { + return nil, err + } + et := reflect.TypeOf(cv) + if !reflect.DeepEqual(et, goStructType) { + return nil, spanner.ToSpannerError(status.Errorf(codes.InvalidArgument, "Mismatch between computed struct array element type %v and received element type %v", goStructType, et)) + } + out = reflect.Append(out, reflect.ValueOf(cv)) + } + return out.Interface(), nil + case spannerpb.TypeCode_JSON: + if null { + return ([]spanner.NullJSON)(nil), nil + } + out := make([]spanner.NullJSON, 0) + for _, value := range v.GetArrayValue().GetValue() { + spannerValue, err := ExecutorValueToSpannerValue(t.GetArrayElementType(), value, value.GetIsNull()) + if err != nil { + return nil, err + } + if v, ok := spannerValue.(spanner.NullJSON); ok { + out = append(out, v) + } + } + return out, nil + default: + return nil, spanner.ToSpannerError(status.Errorf(codes.InvalidArgument, "executorArrayValueToSpannerValue: unsupported array element type while converting from executor proto of type: %s", t.GetArrayElementType().GetCode().String())) + } +} diff --git a/spanner/test/cloudexecutor/executor/internal/utility/key.go b/spanner/test/cloudexecutor/executor/internal/utility/key.go new file mode 100644 index 000000000000..a6e80f511a44 --- /dev/null +++ b/spanner/test/cloudexecutor/executor/internal/utility/key.go @@ -0,0 +1,162 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utility + +import ( + "math/big" + "time" + + "cloud.google.com/go/civil" + "cloud.google.com/go/spanner" + "cloud.google.com/go/spanner/apiv1/spannerpb" + sppb "cloud.google.com/go/spanner/apiv1/spannerpb" + executorpb "cloud.google.com/go/spanner/test/cloudexecutor/proto" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// KeySetProtoToCloudKeySet converts executorpb.KeySet to spanner.KeySet. +func KeySetProtoToCloudKeySet(keySetProto *executorpb.KeySet, typeList []*spannerpb.Type) (spanner.KeySet, error) { + if keySetProto.GetAll() { + return spanner.AllKeys(), nil + } + cloudKeySet := spanner.KeySets() + for _, techKey := range keySetProto.GetPoint() { + cloudKey, err := keyProtoToCloudKey(techKey, typeList) + if err != nil { + return nil, err + } + cloudKeySet = spanner.KeySets(cloudKey, cloudKeySet) + } + for _, techRange := range keySetProto.GetRange() { + cloudRange, err := keyRangeProtoToCloudKeyRange(techRange, typeList) + if err != nil { + return nil, err + } + cloudKeySet = spanner.KeySets(cloudKeySet, cloudRange) + } + return cloudKeySet, nil +} + +// keyProtoToCloudKey converts executorpb.ValueList to spanner.Key. +func keyProtoToCloudKey(keyProto *executorpb.ValueList, typeList []*spannerpb.Type) (spanner.Key, error) { + if len(typeList) < len(keyProto.GetValue()) { + return nil, spanner.ToSpannerError(status.Errorf(codes.InvalidArgument, "There's more key parts in %s than column types in %s", keyProto, typeList)) + } + + var cloudKey spanner.Key + for i, part := range keyProto.GetValue() { + typePb := typeList[i] + key, err := executorKeyValueToCloudValue(part, typePb) + if err != nil { + return nil, err + } + cloudKey = append(cloudKey, key) + } + return cloudKey, nil +} + +// executorKeyValueToCloudValue converts executorpb.Value of the given type to an interface value suitable for +// Cloud Spanner API. +func executorKeyValueToCloudValue(part *executorpb.Value, typePb *spannerpb.Type) (any, error) { + switch v := part.ValueType.(type) { + // Check the value type + case *executorpb.Value_IsNull: + // Check the column type if the value is nil. + switch typePb.GetCode() { + case sppb.TypeCode_BOOL: + return spanner.NullBool{}, nil + case sppb.TypeCode_INT64: + return spanner.NullInt64{}, nil + case sppb.TypeCode_STRING: + return spanner.NullString{}, nil + case sppb.TypeCode_BYTES: + return []byte(nil), nil + case sppb.TypeCode_FLOAT64: + return spanner.NullFloat64{}, nil + case sppb.TypeCode_DATE: + return spanner.NullDate{}, nil + case sppb.TypeCode_TIMESTAMP: + return spanner.NullTime{}, nil + case sppb.TypeCode_NUMERIC: + return spanner.NullNumeric{}, nil + case sppb.TypeCode_JSON: + return spanner.NullJSON{}, nil + default: + return nil, spanner.ToSpannerError(status.Errorf(codes.InvalidArgument, "Unsupported null key part type: %s", typePb.GetCode().String())) + } + case *executorpb.Value_IntValue: + return v.IntValue, nil + case *executorpb.Value_BoolValue: + return v.BoolValue, nil + case *executorpb.Value_DoubleValue: + return v.DoubleValue, nil + case *executorpb.Value_BytesValue: + switch typePb.GetCode() { + case sppb.TypeCode_STRING: + return string(v.BytesValue), nil + case sppb.TypeCode_BYTES: + return v.BytesValue, nil + default: + return nil, spanner.ToSpannerError(status.Errorf(codes.InvalidArgument, "Unsupported key part type: %s", typePb.GetCode().String())) + } + case *executorpb.Value_StringValue: + switch typePb.GetCode() { + case sppb.TypeCode_NUMERIC: + y, ok := (&big.Rat{}).SetString(v.StringValue) + if !ok { + return nil, spanner.ToSpannerError(status.Errorf(codes.InvalidArgument, "Unexpected string value %q for numeric number", v.StringValue)) + } + return *y, nil + default: + return v.StringValue, nil + } + case *executorpb.Value_TimestampValue: + return v.TimestampValue.AsTime(), nil + case *executorpb.Value_DateDaysValue: + epoch := civil.DateOf(time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC)) + y := epoch.AddDays(int(v.DateDaysValue)) + return y, nil + } + return nil, spanner.ToSpannerError(status.Errorf(codes.InvalidArgument, "Unsupported key part %s with type %s", part, typePb)) +} + +// keyRangeProtoToCloudKeyRange converts executorpb.KeyRange to spanner.KeyRange. +func keyRangeProtoToCloudKeyRange(keyRangeProto *executorpb.KeyRange, typeList []*spannerpb.Type) (spanner.KeyRange, error) { + start, err := keyProtoToCloudKey(keyRangeProto.GetStart(), typeList) + if err != nil { + return spanner.KeyRange{}, err + } + end, err := keyProtoToCloudKey(keyRangeProto.GetLimit(), typeList) + if err != nil { + return spanner.KeyRange{}, err + } + if keyRangeProto.Type == nil { + // default + return spanner.KeyRange{Start: start, End: end, Kind: spanner.ClosedOpen}, nil + } + switch keyRangeProto.GetType() { + case executorpb.KeyRange_CLOSED_CLOSED: + return spanner.KeyRange{Start: start, End: end, Kind: spanner.ClosedClosed}, nil + case executorpb.KeyRange_CLOSED_OPEN: + return spanner.KeyRange{Start: start, End: end, Kind: spanner.ClosedOpen}, nil + case executorpb.KeyRange_OPEN_CLOSED: + return spanner.KeyRange{Start: start, End: end, Kind: spanner.OpenClosed}, nil + case executorpb.KeyRange_OPEN_OPEN: + return spanner.KeyRange{Start: start, End: end, Kind: spanner.OpenOpen}, nil + default: + return spanner.KeyRange{}, spanner.ToSpannerError(status.Errorf(codes.InvalidArgument, "Unrecognized key range type %s", keyRangeProto.GetType().String())) + } +} diff --git a/spanner/test/cloudexecutor/executor/internal/utility/spanner_to_executor_value_converter.go b/spanner/test/cloudexecutor/executor/internal/utility/spanner_to_executor_value_converter.go new file mode 100644 index 000000000000..20ada1df8229 --- /dev/null +++ b/spanner/test/cloudexecutor/executor/internal/utility/spanner_to_executor_value_converter.go @@ -0,0 +1,311 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utility + +import ( + "log" + "math/big" + "time" + + "cloud.google.com/go/civil" + "cloud.google.com/go/spanner" + sppb "cloud.google.com/go/spanner/apiv1/spannerpb" + executorpb "cloud.google.com/go/spanner/test/cloudexecutor/proto" + proto3 "github.com/golang/protobuf/ptypes/struct" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/timestamppb" +) + +// ConvertSpannerRow takes a Cloud Spanner Row and translates it to executorpb.ValueList and sppb.StructType. +// The result is always a struct, in which each value corresponds to a column of the Row. +func ConvertSpannerRow(row *spanner.Row) (*executorpb.ValueList, *sppb.StructType, error) { + rowBuilder := &executorpb.ValueList{} + rowTypeBuilder := &sppb.StructType{} + for i := 0; i < row.Size(); i++ { + rowTypeBuilderField := &sppb.StructType_Field{Name: row.ColumnName(i), Type: row.ColumnType(i)} + rowTypeBuilder.Fields = append(rowTypeBuilder.Fields, rowTypeBuilderField) + v, err := extractRowValue(row, i, row.ColumnType(i)) + if err != nil { + return nil, nil, err + } + rowBuilder.Value = append(rowBuilder.Value, v) + } + return rowBuilder, rowTypeBuilder, nil +} + +// extractRowValue extracts a single column's value at given index i from result row. +func extractRowValue(row *spanner.Row, i int, t *sppb.Type) (*executorpb.Value, error) { + val := &executorpb.Value{} + _, isNull := row.ColumnValue(i).Kind.(*proto3.Value_NullValue) + if isNull { + val.ValueType = &executorpb.Value_IsNull{IsNull: true} + return val, nil + } + var err error + // nested row + if t.GetCode() == sppb.TypeCode_ARRAY && t.GetArrayElementType().GetCode() == sppb.TypeCode_STRUCT { + log.Println("with in array that is unimplemented") + } + switch t.GetCode() { + case sppb.TypeCode_BOOL: + var v bool + err = row.Column(i, &v) + if err != nil { + return nil, err + } + val.ValueType = &executorpb.Value_BoolValue{BoolValue: v} + case sppb.TypeCode_FLOAT64: + var v float64 + err = row.Column(i, &v) + if err != nil { + return nil, err + } + val.ValueType = &executorpb.Value_DoubleValue{DoubleValue: v} + case sppb.TypeCode_INT64: + var v int64 + err = row.Column(i, &v) + if err != nil { + return nil, err + } + val.ValueType = &executorpb.Value_IntValue{IntValue: v} + case sppb.TypeCode_STRING: + var v string + err = row.Column(i, &v) + if err != nil { + return nil, err + } + val.ValueType = &executorpb.Value_StringValue{StringValue: v} + case sppb.TypeCode_BYTES: + var v []byte + err = row.Column(i, &v) + if err != nil { + return nil, err + } + val.ValueType = &executorpb.Value_BytesValue{BytesValue: v} + case sppb.TypeCode_TIMESTAMP: + var v time.Time + err = row.Column(i, &v) + if err != nil { + return nil, err + } + val.ValueType = &executorpb.Value_TimestampValue{TimestampValue: ×tamppb.Timestamp{Seconds: v.Unix(), Nanos: int32(v.Nanosecond())}} + case sppb.TypeCode_DATE: + var v civil.Date + err = row.Column(i, &v) + if err != nil { + return nil, err + } + epoch := civil.DateOf(time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC)) + val.ValueType = &executorpb.Value_DateDaysValue{DateDaysValue: int32(v.DaysSince(epoch))} + case sppb.TypeCode_NUMERIC: + var numeric big.Rat + err = row.Column(i, &numeric) + if err != nil { + return nil, err + } + v := spanner.NumericString(&numeric) + val.ValueType = &executorpb.Value_StringValue{StringValue: v} + case sppb.TypeCode_JSON: + var v spanner.NullJSON + err = row.Column(i, &v) + if err != nil { + return nil, err + } + val.ValueType = &executorpb.Value_StringValue{StringValue: v.String()} + case sppb.TypeCode_ARRAY: + val, err = extractRowArrayValue(row, i, t.GetArrayElementType()) + if err != nil { + return nil, err + } + default: + return nil, spanner.ToSpannerError(status.Errorf(codes.InvalidArgument, "extractRowValue: unable to extract value: type %s not supported", t.GetCode())) + } + return val, nil +} + +// extractRowArrayValue extracts a single column's array value at given index i from result row. +func extractRowArrayValue(row *spanner.Row, i int, t *sppb.Type) (*executorpb.Value, error) { + val := &executorpb.Value{} + var err error + switch t.GetCode() { + case sppb.TypeCode_BOOL: + arrayValue := &executorpb.ValueList{} + var v []*bool + err = row.Column(i, &v) + if err != nil { + return nil, err + } + for _, booleanValue := range v { + if booleanValue == nil { + value := &executorpb.Value{ValueType: &executorpb.Value_IsNull{IsNull: true}} + arrayValue.Value = append(arrayValue.Value, value) + } else { + value := &executorpb.Value{ValueType: &executorpb.Value_BoolValue{BoolValue: *booleanValue}} + arrayValue.Value = append(arrayValue.Value, value) + } + } + val.ValueType = &executorpb.Value_ArrayValue{ArrayValue: arrayValue} + val.ArrayType = &sppb.Type{Code: sppb.TypeCode_BOOL} + case sppb.TypeCode_FLOAT64: + arrayValue := &executorpb.ValueList{} + var v []*float64 + err = row.Column(i, &v) + if err != nil { + return nil, err + } + for _, vv := range v { + if vv == nil { + value := &executorpb.Value{ValueType: &executorpb.Value_IsNull{IsNull: true}} + arrayValue.Value = append(arrayValue.Value, value) + } else { + value := &executorpb.Value{ValueType: &executorpb.Value_DoubleValue{DoubleValue: *vv}} + arrayValue.Value = append(arrayValue.Value, value) + } + } + val.ValueType = &executorpb.Value_ArrayValue{ArrayValue: arrayValue} + val.ArrayType = &sppb.Type{Code: sppb.TypeCode_FLOAT64} + case sppb.TypeCode_INT64: + arrayValue := &executorpb.ValueList{} + var v []*int64 + err = row.Column(i, &v) + if err != nil { + return nil, err + } + for _, vv := range v { + if vv == nil { + value := &executorpb.Value{ValueType: &executorpb.Value_IsNull{IsNull: true}} + arrayValue.Value = append(arrayValue.Value, value) + } else { + value := &executorpb.Value{ValueType: &executorpb.Value_IntValue{IntValue: *vv}} + arrayValue.Value = append(arrayValue.Value, value) + } + } + val.ValueType = &executorpb.Value_ArrayValue{ArrayValue: arrayValue} + val.ArrayType = &sppb.Type{Code: sppb.TypeCode_INT64} + case sppb.TypeCode_STRING: + arrayValue := &executorpb.ValueList{} + var v []*string + err = row.Column(i, &v) + if err != nil { + return nil, err + } + for _, vv := range v { + if vv == nil { + value := &executorpb.Value{ValueType: &executorpb.Value_IsNull{IsNull: true}} + arrayValue.Value = append(arrayValue.Value, value) + } else { + value := &executorpb.Value{ValueType: &executorpb.Value_StringValue{StringValue: *vv}} + arrayValue.Value = append(arrayValue.Value, value) + } + } + val.ValueType = &executorpb.Value_ArrayValue{ArrayValue: arrayValue} + val.ArrayType = &sppb.Type{Code: sppb.TypeCode_STRING} + case sppb.TypeCode_BYTES: + arrayValue := &executorpb.ValueList{} + var v [][]byte + err = row.Column(i, &v) + if err != nil { + return nil, err + } + for _, vv := range v { + if vv == nil { + value := &executorpb.Value{ValueType: &executorpb.Value_IsNull{IsNull: true}} + arrayValue.Value = append(arrayValue.Value, value) + } else { + value := &executorpb.Value{ValueType: &executorpb.Value_BytesValue{BytesValue: vv}} + arrayValue.Value = append(arrayValue.Value, value) + } + } + val.ValueType = &executorpb.Value_ArrayValue{ArrayValue: arrayValue} + val.ArrayType = &sppb.Type{Code: sppb.TypeCode_BYTES} + case sppb.TypeCode_DATE: + arrayValue := &executorpb.ValueList{} + var v []*civil.Date + err = row.Column(i, &v) + if err != nil { + return nil, err + } + epoch := civil.DateOf(time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC)) + for _, vv := range v { + if vv == nil { + value := &executorpb.Value{ValueType: &executorpb.Value_IsNull{IsNull: true}} + arrayValue.Value = append(arrayValue.Value, value) + } else { + value := &executorpb.Value{ValueType: &executorpb.Value_DateDaysValue{DateDaysValue: int32(vv.DaysSince(epoch))}} + arrayValue.Value = append(arrayValue.Value, value) + } + } + val.ValueType = &executorpb.Value_ArrayValue{ArrayValue: arrayValue} + val.ArrayType = &sppb.Type{Code: sppb.TypeCode_DATE} + case sppb.TypeCode_TIMESTAMP: + arrayValue := &executorpb.ValueList{} + var v []*time.Time + err = row.Column(i, &v) + if err != nil { + return nil, err + } + for _, vv := range v { + if vv == nil { + value := &executorpb.Value{ValueType: &executorpb.Value_IsNull{IsNull: true}} + arrayValue.Value = append(arrayValue.Value, value) + } else { + value := &executorpb.Value{ValueType: &executorpb.Value_TimestampValue{TimestampValue: ×tamppb.Timestamp{Seconds: vv.Unix(), Nanos: int32(vv.Nanosecond())}}} + arrayValue.Value = append(arrayValue.Value, value) + } + } + val.ValueType = &executorpb.Value_ArrayValue{ArrayValue: arrayValue} + val.ArrayType = &sppb.Type{Code: sppb.TypeCode_TIMESTAMP} + case sppb.TypeCode_NUMERIC: + arrayValue := &executorpb.ValueList{} + var v []*big.Rat + err = row.Column(i, &v) + if err != nil { + return nil, err + } + for _, vv := range v { + if vv == nil { + value := &executorpb.Value{ValueType: &executorpb.Value_IsNull{IsNull: true}} + arrayValue.Value = append(arrayValue.Value, value) + } else { + value := &executorpb.Value{ValueType: &executorpb.Value_StringValue{StringValue: spanner.NumericString(vv)}} + arrayValue.Value = append(arrayValue.Value, value) + } + } + val.ValueType = &executorpb.Value_ArrayValue{ArrayValue: arrayValue} + val.ArrayType = &sppb.Type{Code: sppb.TypeCode_NUMERIC} + case sppb.TypeCode_JSON: + arrayValue := &executorpb.ValueList{} + var v []spanner.NullJSON + err = row.Column(i, &v) + if err != nil { + return nil, err + } + for _, vv := range v { + if !vv.Valid { + value := &executorpb.Value{ValueType: &executorpb.Value_IsNull{IsNull: true}} + arrayValue.Value = append(arrayValue.Value, value) + } else { + value := &executorpb.Value{ValueType: &executorpb.Value_StringValue{StringValue: vv.String()}} + arrayValue.Value = append(arrayValue.Value, value) + } + } + val.ValueType = &executorpb.Value_ArrayValue{ArrayValue: arrayValue} + val.ArrayType = &sppb.Type{Code: sppb.TypeCode_JSON} + default: + return nil, spanner.ToSpannerError(status.Errorf(codes.InvalidArgument, "extractRowArrayValue: unable to extract value: type %s not supported", t.GetCode())) + } + return val, nil +}